ELK-simple-tutorial

v0.0.1 on 2019-02-21 18:52:34

v0.0.2 on 2019-11-08 15:56:02

本文简述elk三剑客的简单使用场景,不包括安装部分。 日志采集总体流程为:filebeat -> logstash -> elasticsearch -> Kibana/Grafana

filebeat

filebeat配置文件为filebeat.yml,添加需要ship的日志文件路径

sed -n '/^#/!p' /opt/apps/filebeat/filebeat.yml|egrep -v "^$|#"
##################
filebeat.inputs:
- type: log
  enabled: false
  paths:
    - /var/log/sftp.log
  tags: ["SFTP"]
- type: log
  enabled: false
  paths:
    - /opt/apps/processDNS/logs/*.log
  tags: ["DNS"]
filebeat.config:
  inputs:
    enabled: true
    path: inputs.d/*.yml
    reload.enabled: true
    reload.period: 10s
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:
output.logstash:
  hosts: ["logstash-host:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  1. 以上配置添加了/var/log/sftp.log和/opt/apps/processDNS/logs/文件下以.log结尾的文件并对应添加了相应的tags标签

  2. 开启自动reload,修改配置文件后不需要重启filebeat,本例中只对filebeat.input配置块进行实时监听并reload。

  3. 配置输出logstash的主机名

logstash

logstash 主要对filebeat输入的日志进行筛选匹配等一系列操作。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
    type => filebeat
  }
}

filter {
  if [type] == "filebeat" {
        if "written" in [message] {
                grok {
                  match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:sftp_write:int}" }
                  remove_field => "message"
                }
                grok {
                    match => { "syslog_message" => "/(?<datedir>\d{4}\d{2}\d{2})/%{GREEDYDATA:filename}" }
                    remove_field => "syslog_message"
                }
                grok {
                    match => { "filename" => "(?<ab_code>\d{3})_%{DATA:source_ip}_%{DATESTAMP_EVENTLOG:file_timestamp}_" }
                }
                if [ab_code] == "100" {
                    mutate {
                        add_field => {
                            "sftp_username" => "BJ100"
                        }
                    }
                }

                date {
                  match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
                }
                date {
                    match => ["syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
                    target => "syslog_timestamp"
                }
                date {
                    match => ["file_timestamp", "yyyyMMddHHmmss"]
                    target => "file_timestamp"
                }
                ruby {
                  init => "require 'time'"
                  code => "duration = (event.get('syslog_timestamp') - event.get('file_timestamp')) rescue nil; event.set('Upload_delay', duration); "
                }
        }
        if "processDNS" in [source] {

                grok {
                  match => { "source" => "/opt/apps/processDNS/logs/DNS-%{USERNAME:sftp_username}_2qS-putted-%{DATA:datedir}.log" }
                }

                # grok {
                #   match => { "message" => "%{DATESTAMP:event_timestamp} %{DATA:event_flag}: %{DATA:filename} was putted." }
                #   add_field => {
                #         "event_status" => "putted"
                #     }
                # }
                # date {
                #   match => [ "event_timestamp", "yyyy/MM/dd HH:mm:ss" ]
                # }
        }
    }
}


output {

  elasticsearch {
    hosts => ["es1:9200", "es2:9200", "es3:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }

  elasticsearch {
    hosts => ["http://127.0.0.1:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }

}
  1. 使用filter插件对通过filebeat接受到的日志进行grok匹配相应字段(enrich),便于后续统计分析。

  2. output可以指定多个es输出源

Kibana

用于分析可视化数据,具体使用参考官方文档。 如需非本机访问要将默认配置文件kibana.yml中监听地址改成"0.0.0.0"

Grfana

设定数据源为es(支持多种数据源),然后编写query制图,可以设定alert。制图这一块和kibana极为相似。 相较于kibana的优势在于支持权限控制ACL及告警机制,制图也略胜一点。

白话文elk使用-CRS3

整个集群运行监控和维护的主要流程如下: 首先使用filebeat以及metricbeat采集系统的各项指标日志,这一块的内容kibana里有对应的指导安装部署的教程。 我们生产环境中使用这两个组件分别采集的日志是各sftp用户的syslog日志和系统日志,安装的过程可以直接使用rpm包的形式,十分方便。 目前管理机上有对应的内部源/etc/yum.repos.d/elasticstack.repo下发至待安装集群就可以使用ansible批量安装了。

ansible host-group -m yum -a 'name=filebeat stats=latest'

[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=http://10.252.208.189/repos/elasticstack/yum/elastic-7.x
gpgcheck=0
enabled=1
type=rpm-md

关于在同一台机器上启动多个filebeat实例有个小tips:

  1. 将/usr/lib/systemd/system/filebeat.service拷贝一份例如/usr/lib/systemd/system/filebeat@cdn.service

  2. 建立一个新的配置文件目录/etc/filebeat2/, 从/etc/filebeat/复制一份配置文件过去,针对需求改动配置文件

  3. 创建filebeat的数据存储目录和日志存储目录,然后修改service文件中对应的配置文件,存储目录等。

  4. 执行systemctl daemon-reload然后用systemctl start filebeat@cdn && systemctl enable filebeat@cdn启动新的filebeat@cdn服务以及设定开机自启动。

下面的配置文件是我的service文件,可以看到需要改动的地方不多。

[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation=https://www.elastic.co/products/beats/filebeat
Wants=network-online.target
After=network-online.target

[Service]

Environment="BEAT_LOG_OPTS=-e"
Environment="BEAT_CONFIG_OPTS=-c /etc/filebeat2/filebeat.yml"
Environment="BEAT_PATH_OPTS=-path.home /usr/share/filebeat -path.config /etc/filebeat2 -path.data /var/lib/filebeat2 -path.logs /var/log/filebeat2"
ExecStart=/usr/share/filebeat/bin/filebeat $BEAT_LOG_OPTS $BEAT_CONFIG_OPTS $BEAT_PATH_OPTS
Restart=always

[Install]
WantedBy=multi-user.target
filebeat.inputs:
- type: log
  enabled: false
  paths:
     - /opt/apps/scripts/processCDN/logs/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:
  host: "nrgl-core-mpp-b-7:5601"
output.elasticsearch:
  hosts: ["http://nrgl-core-mpp-b-7:9200","http://nrgl-core-mpp-c-10:9200","http://nrgl-core-mpp-c-15:9200"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

以上的配置采集的是system日志,需要将filebeat配置文件/etc/filebeat/modules.d/system.yml.disabled改成/etc/filebeat/modules.d/system.yml开启这个模块。

filebeat.inputs:
- type: log
  enabled: true
  paths:
     - /var/log/sftp/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s
setup.template.settings:
  index.number_of_shards: 3
setup.kibana:
  host: "nrgl-core-mpp-b-7:5601"
output.logstash:
  hosts: ["nrgl-core-mpp-c-9:5044"]
processors:
  - drop_event:
      when:
        not:
          contains:
            message: "written"
  - add_host_metadata: ~
  - add_cloud_metadata: ~

以上配置文件采集的是sftp用户日志,在processors里进行了日志事件的过滤,只提取sftp日志里包含“written”字段的日志事件Nov 3 03:41:16 xxx internal-sftp[75685]: close "/upload/filename.log.gz.tmp" bytes read 0 written 2671 发送到logstash里进行filter、enrich和字段匹配grok等操作。

Logstash 部署在nrgl-core-mpp-c-9这台机器上,rpm包的形式安装。主要配置文件/etc/logstash/logstash-filebeat-input-out-es.conf

# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
    type => filebeat_CDN
  }
}

filter {
  if [type] == "filebeat_CDN" {
        if "written" in [message] {
                grok {
                  # CN_access_2107775201014001_BCS-CDN-2075-660349_20190723T105001Z_1.log.gz.tmp
                  # CN_access_1975750101001_SCS-CDN-0002-CSX01_20190815153000_1.log.gz.tmp
                  match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{USERNAME:cdn_sftp_username} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: close \"%{DATA:syslog_message}\" bytes read 0 written %{NUMBER:cdn_sftp_written:int}" }
                  remove_field => "message"
                }
                grok {
                    match => { "syslog_message" => "/%{DATA:upload_dir}/%{GREEDYDATA:filename}" }
                    remove_field => "syslog_message"
                }
                grok {
                    match => { "filename" => "CN_%{DATA:cdn_log_Type}_%{DATA:cdn_log_NodeId}_%{DATA:cdn_log_DeviceId}_%{DATA:cdn_log_timestamp}_" }
                }
                date {
                    match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
                }
                date {
                    match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
                    target => "syslog_timestamp"
                }

                if "Z" in [cdn_log_timestamp] {
                    date {
                        match => [ "cdn_log_timestamp", "yyyyMMdd'T'HHmmssZ" ]
                        timezone => "UTC"
                        target => "cdn_log_timestamp"
                    }
                } else {
                    date {
                        match => [ "cdn_log_timestamp", "yyyyMMddHHmmss" ]
                        target => "cdn_log_timestamp"
                    }
                }

                ruby {
                    init => "require 'time'"
                    code => "duration = (event.get('syslog_timestamp') - event.get('cdn_log_timestamp')) rescue nil; event.set('cdn_log_upload_delay', duration); "
                }
        }

        if "_grokparsefailure" in [tags] {
            drop { }
        }
    }
}


output {
  elasticsearch {
    hosts => ["http://nrgl-core-mpp-b-7:9200","http://nrgl-core-mpp-c-10:9200","http://nrgl-core-mpp-c-15:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }
}

这一块的配置文件需要了解logstash的配置文档,建议参考官方的文档针对需求有目的的学习。 以上配置主要是对filebeat发来的日志作各种正则匹配提取各项字段,以及通过文件名提取的日志打包时间戳和上传至服务器所产生的syslog日志时间戳计算时延并保存到字段cdn_log_upload_delay里。最后输出到elasticsearch供后续分析查询。

最后一块就是日志分析和展示了,这一块主要通过kibana和grafana里做,多使用以及多看别人是怎么用的就好了,每个人都有自己的风格和特点,不同的场景和需求下能出各式各样的图表,我也没啥值得拿出来的经验分享了。

整个过程是一个比较实际的应用场景,各个环节的调优和负载均衡都没有深入的研究,后续还有很多内容值得去探索发现。

Last updated