# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
type => filebeat
}
}
filter {
if [type] == "filebeat" {
if "written" in [message] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:sftp_write:int}" }
remove_field => "message"
}
grok {
match => { "syslog_message" => "/(?<datedir>\d{4}\d{2}\d{2})/%{GREEDYDATA:filename}" }
remove_field => "syslog_message"
}
grok {
match => { "filename" => "(?<ab_code>\d{3})_%{DATA:source_ip}_%{DATESTAMP_EVENTLOG:file_timestamp}_" }
}
if [ab_code] == "100" {
mutate {
add_field => {
"sftp_username" => "BJ100"
}
}
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
date {
match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "syslog_timestamp"
}
date {
match => ["file_timestamp", "yyyyMMddHHmmss"]
target => "file_timestamp"
}
ruby {
init => "require 'time'"
code => "duration = (event.get('syslog_timestamp') - event.get('file_timestamp')) rescue nil; event.set('Upload_delay', duration); "
}
}
if "processDNS" in [source] {
grok {
match => { "source" => "/opt/apps/processDNS/logs/DNS-%{USERNAME:sftp_username}_2qS-putted-%{DATA:datedir}.log" }
}
# grok {
# match => { "message" => "%{DATESTAMP:event_timestamp} %{DATA:event_flag}: %{DATA:filename} was putted." }
# add_field => {
# "event_status" => "putted"
# }
# }
# date {
# match => [ "event_timestamp", "yyyy/MM/dd HH:mm:ss" ]
# }
}
}
}
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}