v0.0.1 on 2019-02-21 18:52:34
v0.0.2 on 2019-11-08 15:56:02
本文简述elk三剑客的简单使用场景,不包括安装部分。 日志采集总体流程为:filebeat -> logstash -> elasticsearch -> Kibana/Grafana
filebeat
filebeat配置文件为filebeat.yml,添加需要ship的日志文件路径
Copy sed -n '/^#/!p' /opt/apps/filebeat/filebeat.yml | egrep -v "^$|#"
##################
filebeat.inputs:
- type: log
enabled: false
paths:
- /var/log/sftp.log
tags: [ "SFTP" ]
- type: log
enabled: false
paths:
- /opt/apps/processDNS/logs/*.log
tags: [ "DNS" ]
filebeat.config:
inputs:
enabled: true
path: inputs.d/*.yml
reload.enabled: true
reload.period: 10s
filebeat.config.modules:
path: ${path.config} /modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.logstash:
hosts: [ "logstash-host:5044" ]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
以上配置添加了/var/log/sftp.log和/opt/apps/processDNS/logs/文件下以.log结尾的文件并对应添加了相应的tags标签
开启自动reload,修改配置文件后不需要重启filebeat,本例中只对filebeat.input配置块进行实时监听并reload。
logstash
logstash 主要对filebeat输入的日志进行筛选匹配等一系列操作。
Copy # Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port = > 5044
type = > filebeat
}
}
filter {
if [type] == "filebeat" {
if "written" in [message] {
grok {
match = > { "message" = > "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:sftp_write:int}" }
remove_field = > "message"
}
grok {
match = > { "syslog_message" = > "/(?<datedir>\d{4}\d{2}\d{2})/%{GREEDYDATA:filename}" }
remove_field = > "syslog_message"
}
grok {
match = > { "filename" = > "(?<ab_code>\d{3})_%{DATA:source_ip}_%{DATESTAMP_EVENTLOG:file_timestamp}_" }
}
if [ab_code] == "100" {
mutate {
add_field = > {
"sftp_username" = > "BJ100"
}
}
}
date {
match = > [ "syslog_timestamp" , "MMM d HH:mm:ss" , "MMM dd HH:mm:ss" ]
}
date {
match = > [ "syslog_timestamp" , "MMM d HH:mm:ss" , "MMM dd HH:mm:ss" ]
target = > "syslog_timestamp"
}
date {
match = > [ "file_timestamp" , "yyyyMMddHHmmss" ]
target = > "file_timestamp"
}
ruby {
init = > "require 'time'"
code = > "duration = (event.get('syslog_timestamp') - event.get('file_timestamp')) rescue nil; event.set('Upload_delay', duration); "
}
}
if "processDNS" in [source] {
grok {
match = > { "source" = > "/opt/apps/processDNS/logs/DNS-%{USERNAME:sftp_username}_2qS-putted-%{DATA:datedir}.log" }
}
# grok {
# match => { "message" => "%{DATESTAMP:event_timestamp} %{DATA:event_flag}: %{DATA:filename} was putted." }
# add_field => {
# "event_status" => "putted"
# }
# }
# date {
# match => [ "event_timestamp", "yyyy/MM/dd HH:mm:ss" ]
# }
}
}
}
output {
elasticsearch {
hosts = > [ "es1:9200" , "es2:9200" , "es3:9200" ]
index = > "logstash-%{+YYYY.MM.dd}"
}
elasticsearch {
hosts = > [ "http://127.0.0.1:9200" ]
index = > "logstash-%{+YYYY.MM.dd}"
}
}
使用filter插件对通过filebeat接受到的日志进行grok匹配相应字段(enrich),便于后续统计分析。
Kibana
用于分析可视化数据,具体使用参考官方文档。 如需非本机访问要将默认配置文件kibana.yml中监听地址改成"0.0.0.0"
Grfana
设定数据源为es(支持多种数据源),然后编写query制图,可以设定alert。制图这一块和kibana极为相似。 相较于kibana的优势在于支持权限控制ACL及告警机制,制图也略胜一点。
白话文elk使用-CRS3
整个集群运行监控和维护的主要流程如下: 首先使用filebeat以及metricbeat采集系统的各项指标日志,这一块的内容kibana里有对应的指导安装部署的教程。 我们生产环境中使用这两个组件分别采集的日志是各sftp用户的syslog日志和系统日志,安装的过程可以直接使用rpm包的形式,十分方便。 目前管理机上有对应的内部源/etc/yum.repos.d/elasticstack.repo下发至待安装集群就可以使用ansible批量安装了。
ansible host-group -m yum -a 'name=filebeat stats=latest'
Copy [elasticsearch-7.x]
name= Elasticsearch repository for 7.x packages
baseurl= http://10.252.208.189/repos/elasticstack/yum/elastic-7.x
gpgcheck= 0
enabled= 1
type= rpm-md
关于在同一台机器上启动多个filebeat实例有个小tips:
将/usr/lib/systemd/system/filebeat.service拷贝一份例如/usr/lib/systemd/system/filebeat@cdn.service
建立一个新的配置文件目录/etc/filebeat2/, 从/etc/filebeat/复制一份配置文件过去,针对需求改动配置文件
创建filebeat的数据存储目录和日志存储目录,然后修改service文件中对应的配置文件,存储目录等。
执行systemctl daemon-reload
然后用systemctl start filebeat@cdn && systemctl enable filebeat@cdn
启动新的filebeat@cdn服务以及设定开机自启动。
下面的配置文件是我的service文件,可以看到需要改动的地方不多。
Copy [Unit]
Description= Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation= https://www.elastic.co/products/beats/filebeat
Wants= network-online.target
After= network-online.target
[Service]
Environment= "BEAT_LOG_OPTS=-e"
Environment= "BEAT_CONFIG_OPTS=-c /etc/filebeat2/filebeat.yml"
Environment= "BEAT_PATH_OPTS=-path.home /usr/share/filebeat -path.config /etc/filebeat2 -path.data /var/lib/filebeat2 -path.logs /var/log/filebeat2"
ExecStart= /usr/share/filebeat/bin/filebeat $BEAT_LOG_OPTS $BEAT_CONFIG_OPTS $BEAT_PATH_OPTS
Restart= always
[Install]
WantedBy= multi-user.target
Copy filebeat.inputs :
- type : log
enabled : false
paths :
- /opt/apps/scripts/processCDN/logs/*.log
filebeat.config.modules :
path : ${path.config}/modules.d/*.yml
reload.enabled : true
reload.period : 10s
setup.template.settings :
index.number_of_shards : 3
setup.kibana :
host : "nrgl-core-mpp-b-7:5601"
output.elasticsearch :
hosts : [ "http://nrgl-core-mpp-b-7:9200" , "http://nrgl-core-mpp-c-10:9200" , "http://nrgl-core-mpp-c-15:9200" ]
processors :
- add_host_metadata : ~
- add_cloud_metadata : ~
以上的配置采集的是system日志,需要将filebeat配置文件/etc/filebeat/modules.d/system.yml.disabled改成/etc/filebeat/modules.d/system.yml开启这个模块。
Copy filebeat.inputs :
- type : log
enabled : true
paths :
- /var/log/sftp/*.log
filebeat.config.modules :
path : ${path.config}/modules.d/*.yml
reload.enabled : true
reload.period : 10s
setup.template.settings :
index.number_of_shards : 3
setup.kibana :
host : "nrgl-core-mpp-b-7:5601"
output.logstash :
hosts : [ "nrgl-core-mpp-c-9:5044" ]
processors :
- drop_event :
when :
not :
contains :
message : "written"
- add_host_metadata : ~
- add_cloud_metadata : ~
以上配置文件采集的是sftp用户日志,在processors里进行了日志事件的过滤,只提取sftp日志里包含“written”字段的日志事件Nov 3 03:41:16 xxx internal-sftp[75685]: close "/upload/filename.log.gz.tmp" bytes read 0 written 2671
发送到logstash里进行filter、enrich和字段匹配grok等操作。
Logstash 部署在nrgl-core-mpp-c-9
这台机器上,rpm包的形式安装。主要配置文件/etc/logstash/logstash-filebeat-input-out-es.conf
Copy # Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
type => filebeat_CDN
}
}
filter {
if [type] == "filebeat_CDN" {
if "written" in [message] {
grok {
# CN_access_2107775201014001_BCS-CDN-2075-660349_20190723T105001Z_1.log.gz.tmp
# CN_access_1975750101001_SCS-CDN-0002-CSX01_20190815153000_1.log.gz.tmp
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{USERNAME:cdn_sftp_username} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:cdn_sftp_written:int}" }
remove_field => "message"
}
grok {
match => { "syslog_message" => "/%{DATA:upload_dir}/%{GREEDYDATA:filename}" }
remove_field => "syslog_message"
}
grok {
match => { "filename" => "CN_%{DATA:cdn_log_Type}_%{DATA:cdn_log_NodeId}_%{DATA:cdn_log_DeviceId}_%{DATA:cdn_log_timestamp}_" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "syslog_timestamp"
}
if "Z" in [cdn_log_timestamp] {
date {
match => [ "cdn_log_timestamp", "yyyyMMdd'T'HHmmssZ" ]
timezone => "UTC"
target => "cdn_log_timestamp"
}
} else {
date {
match => [ "cdn_log_timestamp", "yyyyMMddHHmmss" ]
target => "cdn_log_timestamp"
}
}
ruby {
init => "require 'time'"
code => "duration = (event.get('syslog_timestamp') - event.get('cdn_log_timestamp')) rescue nil; event.set('cdn_log_upload_delay', duration); "
}
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
}
output {
elasticsearch {
hosts => ["http://nrgl-core-mpp-b-7:9200","http://nrgl-core-mpp-c-10:9200","http://nrgl-core-mpp-c-15:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
这一块的配置文件需要了解logstash的配置文档,建议参考官方的文档针对需求有目的的学习。 以上配置主要是对filebeat发来的日志作各种正则匹配提取各项字段,以及通过文件名提取的日志打包时间戳和上传至服务器所产生的syslog日志时间戳计算时延并保存到字段cdn_log_upload_delay
里。最后输出到elasticsearch供后续分析查询。
最后一块就是日志分析和展示了,这一块主要通过kibana和grafana里做,多使用以及多看别人是怎么用的就好了,每个人都有自己的风格和特点,不同的场景和需求下能出各式各样的图表,我也没啥值得拿出来的经验分享了。
整个过程是一个比较实际的应用场景,各个环节的调优和负载均衡都没有深入的研究,后续还有很多内容值得去探索发现。