# Use Kafka with Flume - CRS2

Create by Junyangz AT 2018-08-01 10:53:46 based on dmy's docs.

Last edited by Junyangz AT 2018-08-01 13:32:51.

## Flume

### Intoduction Flume

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

![DevGuide\_image00.png](https://flume.apache.org/_images/DevGuide_image00.png)

### 安装

1. 安装前准备

   安装jdk
2. 下载

   <http://flume.apache.org/download.html>

   apache-flume-1.7.0-bin.tar.gz
3. 解压

```bash
tar -zxvf apache-flume-1.7.0-bin.tar.gz
```

### 配置及运行

Use the Kafka sink to send data to Kafka from a Flume source.[refer-doc](https://www.cloudera.com/documentation/kafka/2-0-x/topics/kafka_flume.html#concept_rsb_tyb_kv__section_zgc_tyb_kv)

#### 添加PATH环境变量

* conf/flume-env.sh

```bash
cp flume-env.sh.template flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx4096m -Dcom.sun.management.jmxremote"
```

* conf/spool1-kafka.properties 重要配置参数：

```
a1.sources.r1.spoolDir = /home1/flume/spool/dns/1   # 监听目录需要提前创建好
a1.sinks.k1.kafka.bootstrap.servers = hadoop-slave01:9092,hadoop-slave02:9092   # broker列表（部分）
a1.sinks.k1.kafka.topic = test   # topic名称
```

* conf/spool\[2-n]-kafka.properties同上

#### 启动

```bash
flume-ng agent -n a1 -c conf -f conf/spool1-kafka.properties &
flume-ng agent -n a2 -c conf -f conf/spool2-kafka.properties &
...
#start_flume.sh
```

## Kafka

### Intoduction Kafka

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

![kafka\_diagram](https://kafka.apache.org/images/kafka_diagram.png)

### Kafka安装

1. 安装前准备
   1. 安装jdk
   2. 启动zookeeper
2. 下载

   <http://kafka.apache.org/downloads>

   kafka\_2.10-0.10.0.0.tgz
3. 解压

```bash
tar -zxvf kafka_2.10-0.10.0.0.tgz
```

### 配置Kafka

1. 添加PATH环境变量
2. **config/server.properties**

   ```
    broker.id=0  #每一个boker都有一个唯一的id作为它们的名字，一般是从0开始，依次加1。当该服务器的IP地址发生改变时，broker.id没有变化，则不会影响consumers的消费情况
    delete.topic.enable=true #直接删除 topic
    auto.create.topics.enable=false  #默认为true，生产环境通常置为false
    auto.leader.rebalance.enable=true  #balancing leadership，默认即为 true
    listerners=PLAINTEXT: #client3:9092
    log.dirs=/opt/apps/kafka/logs  #kafka数据的存放地址，多个地址用逗号分割，多个目录分布在不同磁盘上可以提高读写性能
    default.replication.factor=3
    min.insync.replicas=2  #当producer设置 request.required.acks 为-1时， min.insync.replicas 指定 replicas 的最小数目（必须确认每一个 repicas 的写数据都是成功的），如果这个数目没有达到， producer 会产生异常（默认为1）
    queued.max.requests  #在网络线程停止读取新请求之前，可以排队等待I/O线程处理的最大请求个数（默认为 500）
    zookeeper.connect=slave10:2181,slave11:2181,slave12:2181  #指定zookeeper连接字符串，格式如hostname:port
   ```
3. **bin/kafka-server-start.sh**
   1. **添加以下代码，开启 JMX（便于监控）：**

      ```bash
       if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
       export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
       export JMX_PORT="9999"
       fi
      ```
   2. **修改上面的 Java 设置：**

      测试机上目前的配置如下：

      > -Xmx6g -Xms6g -XX:PermSize=128m -XX:MaxPermSize=256m

      LinkedIn 的 Java 配置：

      > -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

### 启动Kafka

Kafka 集群中的节点要**关闭防火墙**，不然会报如下错误：

> Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest\@5b1413a8 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to client3:9092 (id: 1 rack: null) failed

```bash
kafka-server-start.sh config/server.properties &
kafka-server-stop.sh
```

## Kafka Manager

A tool for managing Apache Kafka. <https://github.com/yahoo/kafka-manager>

1. 安装前准备
   * 安装 sbt，jdk8
   * 想要看到读取、写入速度，kafka 需要开启 JMX
2. 下载

   ```bash
    git clone https://github.com/yahoo/kafka-manager
   ```
3. 编译

   由于需要的环境是 **Java 8+**， 如果 java 不在环境变量中，在编译和运行时需要指定 Java 8+

   ```bash
    cd kafka-manager
    PATH=/home/hadoop-user/jdk1.8.0_131/bin:$PATH
    JAVA_HOME=/home/hadoop-user/jdk1.8.0_131
    sbt -java-home/home/hadoop-user/jdk1.8.0_131 clean dist
   ```
4. 解压

   编译好的包在 **kafka-manager/target/universal** 中，将其移动到指定目录进行解压。

   ```bash
    unzip kafka-manager-1.3.3.6.zip
   ```
5. 配置
   1. **conf/application.conf**

      ```
       kafka-manager.zkhosts="slave10:2181,slave11:2181,slave12:2181"
      ```
6. 启动
   1. 编写启动脚本：

      ```bash
       vim start.sh
       #nohup ./kafka-manager &
       #默认地，kafka manager 使用 9000 端口，可以添加以下参数进行修改：
       #nohup ./kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080 &
       #如果 java 8 不在环境变量中，增加 -java-home 参数：
       #nohup ./kafka-manager -java-home /home/hadoop-user/jdk1.8.0_131 &
      ```
   2. 启动：

      ```bash
       sh start.sh
      ```
7. 使用
   1. Web访问9000端口
   2. 创建 cluster
   3. 配置 cluster：输入Zookeeper Hosts，选择Kafka版本，打开JMX Polling

## Reference

1.[Using Kafka with Flume](https://www.cloudera.com/documentation/kafka/2-0-x/topics/kafka_flume.html#concept_rsb_tyb_kv__section_zgc_tyb_kv)

2.dmy


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.junyangz.com/ops/use-kafka-with-flume-crs2.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
