Use Kafka with Flume - CRS2

Create by Junyangz AT 2018-08-01 10:53:46 based on dmy's docs.

Last edited by Junyangz AT 2018-08-01 13:32:51.

Flume

Intoduction Flume

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

DevGuide_image00.png

安装

  1. 安装前准备

    安装jdk

  2. 下载

    http://flume.apache.org/download.html

    apache-flume-1.7.0-bin.tar.gz

  3. 解压

tar -zxvf apache-flume-1.7.0-bin.tar.gz

配置及运行

Use the Kafka sink to send data to Kafka from a Flume source.refer-doc

添加PATH环境变量

  • conf/flume-env.sh

cp flume-env.sh.template flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx4096m -Dcom.sun.management.jmxremote"
  • conf/spool1-kafka.properties 重要配置参数:

a1.sources.r1.spoolDir = /home1/flume/spool/dns/1 # 监听目录需要提前创建好
a1.sinks.k1.kafka.bootstrap.servers = hadoop-slave01:9092,hadoop-slave02:9092 # broker列表(部分)
a1.sinks.k1.kafka.topic = test # topic名称
  • conf/spool[2-n]-kafka.properties同上

启动

flume-ng agent -n a1 -c conf -f conf/spool1-kafka.properties &
flume-ng agent -n a2 -c conf -f conf/spool2-kafka.properties &
...
#start_flume.sh

Kafka

Intoduction Kafka

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

kafka_diagram

Kafka安装

  1. 安装前准备

    1. 安装jdk

    2. 启动zookeeper

  2. 下载

    http://kafka.apache.org/downloads

    kafka_2.10-0.10.0.0.tgz

  3. 解压

tar -zxvf kafka_2.10-0.10.0.0.tgz

配置Kafka

  1. 添加PATH环境变量

  2. config/server.properties

    broker.id=0 #每一个boker都有一个唯一的id作为它们的名字,一般是从0开始,依次加1。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消费情况
    delete.topic.enable=true #直接删除 topic
    auto.create.topics.enable=false #默认为true,生产环境通常置为false
    auto.leader.rebalance.enable=true #balancing leadership,默认即为 true
    listerners=PLAINTEXT: #client3:9092
    log.dirs=/opt/apps/kafka/logs #kafka数据的存放地址,多个地址用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
    default.replication.factor=3
    min.insync.replicas=2 #当producer设置 request.required.acks 为-1时, min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 repicas 的写数据都是成功的),如果这个数目没有达到, producer 会产生异常(默认为1)
    queued.max.requests #在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数(默认为 500)
    zookeeper.connect=slave10:2181,slave11:2181,slave12:2181 #指定zookeeper连接字符串,格式如hostname:port
  3. bin/kafka-server-start.sh

    1. 添加以下代码,开启 JMX(便于监控):

      if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
      export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
      export JMX_PORT="9999"
      fi
    2. 修改上面的 Java 设置:

      测试机上目前的配置如下:

      -Xmx6g -Xms6g -XX:PermSize=128m -XX:MaxPermSize=256m

      LinkedIn 的 Java 配置:

      -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

启动Kafka

Kafka 集群中的节点要关闭防火墙,不然会报如下错误:

Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5b1413a8 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to client3:9092 (id: 1 rack: null) failed

kafka-server-start.sh config/server.properties &
kafka-server-stop.sh

Kafka Manager

A tool for managing Apache Kafka. https://github.com/yahoo/kafka-manager

  1. 安装前准备

    • 安装 sbt,jdk8

    • 想要看到读取、写入速度,kafka 需要开启 JMX

  2. 下载

    git clone https://github.com/yahoo/kafka-manager
  3. 编译

    由于需要的环境是 Java 8+, 如果 java 不在环境变量中,在编译和运行时需要指定 Java 8+

    cd kafka-manager
    PATH=/home/hadoop-user/jdk1.8.0_131/bin:$PATH
    JAVA_HOME=/home/hadoop-user/jdk1.8.0_131
    sbt -java-home/home/hadoop-user/jdk1.8.0_131 clean dist
  4. 解压

    编译好的包在 kafka-manager/target/universal 中,将其移动到指定目录进行解压。

    unzip kafka-manager-1.3.3.6.zip
  5. 配置

    1. conf/application.conf

      kafka-manager.zkhosts="slave10:2181,slave11:2181,slave12:2181"
  6. 启动

    1. 编写启动脚本:

      vim start.sh
      #nohup ./kafka-manager &
      #默认地,kafka manager 使用 9000 端口,可以添加以下参数进行修改:
      #nohup ./kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080 &
      #如果 java 8 不在环境变量中,增加 -java-home 参数:
      #nohup ./kafka-manager -java-home /home/hadoop-user/jdk1.8.0_131 &
    2. 启动:

      sh start.sh
  7. 使用

    1. Web访问9000端口

    2. 创建 cluster

    3. 配置 cluster:输入Zookeeper Hosts,选择Kafka版本,打开JMX Polling

Reference

1.Using Kafka with Flume

2.dmy