Create by Junyangz AT 2018-08-01 10:53:46 based on dmy's docs.
Last edited by Junyangz AT 2018-08-01 13:32:51.
Flume
Intoduction Flume
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
安装
Copy tar -zxvf apache-flume-1.7.0-bin.tar.gz
配置及运行
Use the Kafka sink to send data to Kafka from a Flume source.refer-doc
添加PATH环境变量
Copy cp flume-env.sh.template flume-env.sh
export JAVA_OPTS = "-Xms2048m -Xmx4096m -Dcom.sun.management.jmxremote"
conf/spool1-kafka.properties 重要配置参数:
Copy a1.sources.r1.spoolDir = /home1/flume/spool/dns/1 # 监听目录需要提前创建好
a1.sinks.k1.kafka.bootstrap.servers = hadoop-slave01:9092,hadoop-slave02:9092 # broker列表(部分)
a1.sinks.k1.kafka.topic = test # topic名称
conf/spool[2-n]-kafka.properties同上
启动
Copy flume-ng agent -n a1 -c conf -f conf/spool1-kafka.properties &
flume-ng agent -n a2 -c conf -f conf/spool2-kafka.properties &
...
#start_flume.sh
Kafka
Intoduction Kafka
Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
Kafka安装
Copy tar -zxvf kafka_2.10-0.10.0.0.tgz
配置Kafka
config/server.properties
Copy broker.id=0 #每一个boker都有一个唯一的id作为它们的名字,一般是从0开始,依次加1。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消费情况
delete.topic.enable=true #直接删除 topic
auto.create.topics.enable=false #默认为true,生产环境通常置为false
auto.leader.rebalance.enable=true #balancing leadership,默认即为 true
listerners=PLAINTEXT: #client3:9092
log.dirs=/opt/apps/kafka/logs #kafka数据的存放地址,多个地址用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
default.replication.factor=3
min.insync.replicas=2 #当producer设置 request.required.acks 为-1时, min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 repicas 的写数据都是成功的),如果这个数目没有达到, producer 会产生异常(默认为1)
queued.max.requests #在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数(默认为 500)
zookeeper.connect=slave10:2181,slave11:2181,slave12:2181 #指定zookeeper连接字符串,格式如hostname:port
bin/kafka-server-start.sh
添加以下代码,开启 JMX(便于监控):
Copy if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS = "-Xmx1G -Xms1G"
export JMX_PORT = "9999"
fi
修改上面的 Java 设置:
测试机上目前的配置如下:
-Xmx6g -Xms6g -XX:PermSize=128m -XX:MaxPermSize=256m
LinkedIn 的 Java 配置:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
启动Kafka
Kafka 集群中的节点要关闭防火墙 ,不然会报如下错误:
Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5b1413a8 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to client3:9092 (id: 1 rack: null) failed
Copy kafka-server-start.sh config/server.properties &
kafka-server-stop.sh
Kafka Manager
A tool for managing Apache Kafka. https://github.com/yahoo/kafka-manager
安装前准备
想要看到读取、写入速度,kafka 需要开启 JMX
下载
Copy git clone https://github.com/yahoo/kafka-manager
编译
由于需要的环境是 Java 8+ , 如果 java 不在环境变量中,在编译和运行时需要指定 Java 8+
Copy cd kafka-manager
PATH = /home/hadoop-user/jdk1.8.0_131/bin: $PATH
JAVA_HOME = /home/hadoop-user/jdk1.8.0_131
sbt -java-home/home/hadoop-user/jdk1.8.0_131 clean dist
解压
编译好的包在 kafka-manager/target/universal 中,将其移动到指定目录进行解压。
Copy unzip kafka-manager-1.3.3.6.zip
配置
conf/application.conf
Copy kafka-manager.zkhosts="slave10:2181,slave11:2181,slave12:2181"
启动
编写启动脚本:
Copy vim start.sh
#nohup ./kafka-manager &
#默认地,kafka manager 使用 9000 端口,可以添加以下参数进行修改:
#nohup ./kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080 &
#如果 java 8 不在环境变量中,增加 -java-home 参数:
#nohup ./kafka-manager -java-home /home/hadoop-user/jdk1.8.0_131 &
使用
配置 cluster:输入Zookeeper Hosts,选择Kafka版本,打开JMX Polling
Reference
1.Using Kafka with Flume
2.dmy