Create by Junyangz AT 2018-08-01 10:53:46 based on dmy's docs.
Last edited by Junyangz AT 2018-08-01 13:32:51.
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
安装前准备
安装jdk
解压
tar -zxvf apache-flume-1.7.0-bin.tar.gz
Use the Kafka sink to send data to Kafka from a Flume source.refer-doc
conf/flume-env.sh
cp flume-env.sh.template flume-env.shexport JAVA_OPTS="-Xms2048m -Xmx4096m -Dcom.sun.management.jmxremote"
conf/spool1-kafka.properties 重要配置参数:
a1.sources.r1.spoolDir = /home1/flume/spool/dns/1 # 监听目录需要提前创建好a1.sinks.k1.kafka.bootstrap.servers = hadoop-slave01:9092,hadoop-slave02:9092 # broker列表(部分)a1.sinks.k1.kafka.topic = test # topic名称
conf/spool[2-n]-kafka.properties同上
flume-ng agent -n a1 -c conf -f conf/spool1-kafka.properties &flume-ng agent -n a2 -c conf -f conf/spool2-kafka.properties &...#start_flume.sh
Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
安装前准备
安装jdk
启动zookeeper
解压
tar -zxvf kafka_2.10-0.10.0.0.tgz
添加PATH环境变量
config/server.properties
broker.id=0 #每一个boker都有一个唯一的id作为它们的名字,一般是从0开始,依次加1。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消费情况delete.topic.enable=true #直接删除 topicauto.create.topics.enable=false #默认为true,生产环境通常置为falseauto.leader.rebalance.enable=true #balancing leadership,默认即为 truelisterners=PLAINTEXT: #client3:9092log.dirs=/opt/apps/kafka/logs #kafka数据的存放地址,多个地址用逗号分割,多个目录分布在不同磁盘上可以提高读写性能default.replication.factor=3min.insync.replicas=2 #当producer设置 request.required.acks 为-1时, min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 repicas 的写数据都是成功的),如果这个数目没有达到, producer 会产生异常(默认为1)queued.max.requests #在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数(默认为 500)zookeeper.connect=slave10:2181,slave11:2181,slave12:2181 #指定zookeeper连接字符串,格式如hostname:port
bin/kafka-server-start.sh
添加以下代码,开启 JMX(便于监控):
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"export JMX_PORT="9999"fi
修改上面的 Java 设置:
测试机上目前的配置如下:
-Xmx6g -Xms6g -XX:PermSize=128m -XX:MaxPermSize=256m
LinkedIn 的 Java 配置:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
Kafka 集群中的节点要关闭防火墙,不然会报如下错误:
Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5b1413a8 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to client3:9092 (id: 1 rack: null) failed
kafka-server-start.sh config/server.properties &kafka-server-stop.sh
A tool for managing Apache Kafka. https://github.com/yahoo/kafka-manager
安装前准备
安装 sbt,jdk8
想要看到读取、写入速度,kafka 需要开启 JMX
下载
git clone https://github.com/yahoo/kafka-manager
编译
由于需要的环境是 Java 8+, 如果 java 不在环境变量中,在编译和运行时需要指定 Java 8+
cd kafka-managerPATH=/home/hadoop-user/jdk1.8.0_131/bin:$PATHJAVA_HOME=/home/hadoop-user/jdk1.8.0_131sbt -java-home/home/hadoop-user/jdk1.8.0_131 clean dist
解压
编译好的包在 kafka-manager/target/universal 中,将其移动到指定目录进行解压。
unzip kafka-manager-1.3.3.6.zip
配置
conf/application.conf
kafka-manager.zkhosts="slave10:2181,slave11:2181,slave12:2181"
启动
编写启动脚本:
vim start.sh#nohup ./kafka-manager &#默认地,kafka manager 使用 9000 端口,可以添加以下参数进行修改:#nohup ./kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080 &#如果 java 8 不在环境变量中,增加 -java-home 参数:#nohup ./kafka-manager -java-home /home/hadoop-user/jdk1.8.0_131 &
启动:
sh start.sh
使用
Web访问9000端口
创建 cluster
配置 cluster:输入Zookeeper Hosts,选择Kafka版本,打开JMX Polling
2.dmy