ELK-simple-tutorial

v0.0.1 on 2019-02-21 18:52:34

v0.0.2 on 2019-11-08 15:56:02

本文简述elk三剑客的简单使用场景,不包括安装部分。 日志采集总体流程为:filebeat -> logstash -> elasticsearch -> Kibana/Grafana

filebeat

filebeat配置文件为filebeat.yml,添加需要ship的日志文件路径

sed -n '/^#/!p' /opt/apps/filebeat/filebeat.yml|egrep -v "^$|#"
##################
filebeat.inputs:
- type: log
enabled: false
paths:
- /var/log/sftp.log
tags: ["SFTP"]
- type: log
enabled: false
paths:
- /opt/apps/processDNS/logs/*.log
tags: ["DNS"]
filebeat.config:
inputs:
enabled: true
path: inputs.d/*.yml
reload.enabled: true
reload.period: 10s
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.logstash:
hosts: ["logstash-host:5044"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
  1. 以上配置添加了/var/log/sftp.log和/opt/apps/processDNS/logs/文件下以.log结尾的文件并对应添加了相应的tags标签

  2. 开启自动reload,修改配置文件后不需要重启filebeat,本例中只对filebeat.input配置块进行实时监听并reload。

  3. 配置输出logstash的主机名

logstash

logstash 主要对filebeat输入的日志进行筛选匹配等一系列操作。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
type => filebeat
}
}
filter {
if [type] == "filebeat" {
if "written" in [message] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:sftp_write:int}" }
remove_field => "message"
}
grok {
match => { "syslog_message" => "/(?<datedir>\d{4}\d{2}\d{2})/%{GREEDYDATA:filename}" }
remove_field => "syslog_message"
}
grok {
match => { "filename" => "(?<ab_code>\d{3})_%{DATA:source_ip}_%{DATESTAMP_EVENTLOG:file_timestamp}_" }
}
if [ab_code] == "100" {
mutate {
add_field => {
"sftp_username" => "BJ100"
}
}
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
date {
match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "syslog_timestamp"
}
date {
match => ["file_timestamp", "yyyyMMddHHmmss"]
target => "file_timestamp"
}
ruby {
init => "require 'time'"
code => "duration = (event.get('syslog_timestamp') - event.get('file_timestamp')) rescue nil; event.set('Upload_delay', duration); "
}
}
if "processDNS" in [source] {
grok {
match => { "source" => "/opt/apps/processDNS/logs/DNS-%{USERNAME:sftp_username}_2qS-putted-%{DATA:datedir}.log" }
}
# grok {
# match => { "message" => "%{DATESTAMP:event_timestamp} %{DATA:event_flag}: %{DATA:filename} was putted." }
# add_field => {
# "event_status" => "putted"
# }
# }
# date {
# match => [ "event_timestamp", "yyyy/MM/dd HH:mm:ss" ]
# }
}
}
}
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
  1. 使用filter插件对通过filebeat接受到的日志进行grok匹配相应字段(enrich),便于后续统计分析。

  2. output可以指定多个es输出源

Kibana

用于分析可视化数据,具体使用参考官方文档。 如需非本机访问要将默认配置文件kibana.yml中监听地址改成"0.0.0.0"

Grfana

设定数据源为es(支持多种数据源),然后编写query制图,可以设定alert。制图这一块和kibana极为相似。 相较于kibana的优势在于支持权限控制ACL及告警机制,制图也略胜一点。

白话文elk使用-CRS3

整个集群运行监控和维护的主要流程如下: 首先使用filebeat以及metricbeat采集系统的各项指标日志,这一块的内容kibana里有对应的指导安装部署的教程。 我们生产环境中使用这两个组件分别采集的日志是各sftp用户的syslog日志和系统日志,安装的过程可以直接使用rpm包的形式,十分方便。 目前管理机上有对应的内部源/etc/yum.repos.d/elasticstack.repo下发至待安装集群就可以使用ansible批量安装了。

ansible host-group -m yum -a 'name=filebeat stats=latest'

[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=http://10.252.208.189/repos/elasticstack/yum/elastic-7.x
gpgcheck=0
enabled=1
type=rpm-md

关于在同一台机器上启动多个filebeat实例有个小tips:

  1. 将/usr/lib/systemd/system/filebeat.service拷贝一份例如/usr/lib/systemd/system/[email protected]

  2. 建立一个新的配置文件目录/etc/filebeat2/, 从/etc/filebeat/复制一份配置文件过去,针对需求改动配置文件

  3. 创建filebeat的数据存储目录和日志存储目录,然后修改service文件中对应的配置文件,存储目录等。

  4. 执行systemctl daemon-reload然后用systemctl start [email protected] && systemctl enable [email protected]启动新的[email protected]服务以及设定开机自启动。

下面的配置文件是我的service文件,可以看到需要改动的地方不多。

[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation=https://www.elastic.co/products/beats/filebeat
Wants=network-online.target
After=network-online.target
[Service]
Environment="BEAT_LOG_OPTS=-e"
Environment="BEAT_CONFIG_OPTS=-c /etc/filebeat2/filebeat.yml"
Environment="BEAT_PATH_OPTS=-path.home /usr/share/filebeat -path.config /etc/filebeat2 -path.data /var/lib/filebeat2 -path.logs /var/log/filebeat2"
ExecStart=/usr/share/filebeat/bin/filebeat $BEAT_LOG_OPTS $BEAT_CONFIG_OPTS $BEAT_PATH_OPTS
Restart=always
[Install]
WantedBy=multi-user.target
filebeat.inputs:
- type: log
enabled: false
paths:
- /opt/apps/scripts/processCDN/logs/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
host: "nrgl-core-mpp-b-7:5601"
output.elasticsearch:
hosts: ["http://nrgl-core-mpp-b-7:9200","http://nrgl-core-mpp-c-10:9200","http://nrgl-core-mpp-c-15:9200"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~

以上的配置采集的是system日志,需要将filebeat配置文件/etc/filebeat/modules.d/system.yml.disabled改成/etc/filebeat/modules.d/system.yml开启这个模块。

filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/sftp/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
host: "nrgl-core-mpp-b-7:5601"
output.logstash:
hosts: ["nrgl-core-mpp-c-9:5044"]
processors:
- drop_event:
when:
not:
contains:
message: "written"
- add_host_metadata: ~
- add_cloud_metadata: ~

以上配置文件采集的是sftp用户日志,在processors里进行了日志事件的过滤,只提取sftp日志里包含“written”字段的日志事件Nov 3 03:41:16 xxx internal-sftp[75685]: close "/upload/filename.log.gz.tmp" bytes read 0 written 2671 发送到logstash里进行filter、enrich和字段匹配grok等操作。

Logstash 部署在nrgl-core-mpp-c-9这台机器上,rpm包的形式安装。主要配置文件/etc/logstash/logstash-filebeat-input-out-es.conf

# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
type => filebeat_CDN
}
}
filter {
if [type] == "filebeat_CDN" {
if "written" in [message] {
grok {
# CN_access_2107775201014001_BCS-CDN-2075-660349_20190723T105001Z_1.log.gz.tmp
# CN_access_1975750101001_SCS-CDN-0002-CSX01_20190815153000_1.log.gz.tmp
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{USERNAME:cdn_sftp_username} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:cdn_sftp_written:int}" }
remove_field => "message"
}
grok {
match => { "syslog_message" => "/%{DATA:upload_dir}/%{GREEDYDATA:filename}" }
remove_field => "syslog_message"
}
grok {
match => { "filename" => "CN_%{DATA:cdn_log_Type}_%{DATA:cdn_log_NodeId}_%{DATA:cdn_log_DeviceId}_%{DATA:cdn_log_timestamp}_" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
target => "syslog_timestamp"
}
if "Z" in [cdn_log_timestamp] {
date {
match => [ "cdn_log_timestamp", "yyyyMMdd'T'HHmmssZ" ]
timezone => "UTC"
target => "cdn_log_timestamp"
}
} else {
date {
match => [ "cdn_log_timestamp", "yyyyMMddHHmmss" ]
target => "cdn_log_timestamp"
}
}
ruby {
init => "require 'time'"
code => "duration = (event.get('syslog_timestamp') - event.get('cdn_log_timestamp')) rescue nil; event.set('cdn_log_upload_delay', duration); "
}
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
}
output {
elasticsearch {
hosts => ["http://nrgl-core-mpp-b-7:9200","http://nrgl-core-mpp-c-10:9200","http://nrgl-core-mpp-c-15:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}

这一块的配置文件需要了解logstash的配置文档,建议参考官方的文档针对需求有目的的学习。 以上配置主要是对filebeat发来的日志作各种正则匹配提取各项字段,以及通过文件名提取的日志打包时间戳和上传至服务器所产生的syslog日志时间戳计算时延并保存到字段cdn_log_upload_delay里。最后输出到elasticsearch供后续分析查询。

最后一块就是日志分析和展示了,这一块主要通过kibana和grafana里做,多使用以及多看别人是怎么用的就好了,每个人都有自己的风格和特点,不同的场景和需求下能出各式各样的图表,我也没啥值得拿出来的经验分享了。

整个过程是一个比较实际的应用场景,各个环节的调优和负载均衡都没有深入的研究,后续还有很多内容值得去探索发现。