自从Flume1.6开始,新增了对Kafka的支持,极大地提升了Flume的采集能力。避免后端因热点问题导致kafka的channel爆满而无法采集数据。
本篇介绍使用Flume当前最新版本1.8与Kafka的结合使用。
基本环境
- Kafka (192.168.156.101:9092)
- Zookeeper(192.168.156.101:2181)
- JDK1.8
安装Flume
wget http://apache-flume-1.8.0-bin.tar.gz tar -zxvf apache-flume-1.8.0-bin.tar.gz
进入apache-flume-1.8.0-bin目录,在conf路径中新增配置文件flume.properties
(名称随意)。
cd apache-flume-1.8.0-bin touch conf/flume.properties
新增如下配置:
## 此处定义 agent 的source(数据源)、sink(数据流向)、channel(管道) agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 ## 此处定义Agent 数据源的类型 agent1.sources.source1.type=http agent1.sources.source1.bind=0.0.0.0 agent1.sources.source1.port=9000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=10000 agent1.channels.channel1.transactionCapacity=100 agent1.channels.channel1.keep-alive=30 ## 此处定义kafka的sink topic broker agent1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sink1.topic=kafkaTest agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.156.101:9092 agent1.sinks.sink1.requiredAcks=1 agent1.sinks.sink1.kafka.producer.acks = 1 agent1.sinks.sink1.kafka.flumeBatchSize = 20 agent1.sinks.sink1.kafka.producer.linger.ms = 1 agent1.sinks.sink1.kafka.producer.compression.type = snappy ## 此处定义source的channel 和 sink的channel agent1.sources.source1.channels=channel1 agent1.sinks.sink1.channel=channel1
启动flume
在apache-flume-1.8.0-bin
中执行如下命令启动flume。
nohup bin/flume-ng agent -f conf/flume.properties -n agent1 -c /home/cdhuser/apache-flume-1.8.0-bin/conf >/dev/null &
注意此处的-f
、-n
、 -c
参数:
- -f 表示配置文件的路径
- -n agent的名称,与配置文件中一直
- -c 配置文件所在的路径
此时,便已经成功启动了flume,source为HTTP,端口为9000,sink为Kafka,channel默认在内存,当然也可以将channel配置为Kafka Channel。
使用Rest Client
给9000端口发送数据,然后在kafka消费者端进行查看。
启动kafka消费端
cd /opt/kafka bin/kafka-console-consumer.sh --zookeeper 192.168.156.101:2181 --topic kafkaTest --from-beginning
然后发送如下测试数据
[ { "headers" : { "datatype" : "test", "timestamp" : 1456989430522 }, "body" : "123123$45645$20160223111222$10.10.170.75$01$1$2$PC" } ]
此时在kafka消费者那一侧就可以发现如下信息:
123123$45645$20160223111222$10.10.170.75$01$1$2$PC
写的比较乱,有空在整理。