念念不忘
必会回响

如何将Flume与Kafka进行整合

自从Flume1.6开始,新增了对Kafka的支持,极大地提升了Flume的采集能力。避免后端因热点问题导致kafka的channel爆满而无法采集数据。
本篇介绍使用Flume当前最新版本1.8与Kafka的结合使用。

基本环境

  • Kafka (192.168.156.101:9092)
  • Zookeeper(192.168.156.101:2181)
  • JDK1.8

安装Flume

wget http://apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz

进入apache-flume-1.8.0-bin目录,在conf路径中新增配置文件flume.properties(名称随意)。

cd apache-flume-1.8.0-bin
touch conf/flume.properties

新增如下配置:

## 此处定义 agent 的source(数据源)、sink(数据流向)、channel(管道)
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1

## 此处定义Agent 数据源的类型 
agent1.sources.source1.type=http
agent1.sources.source1.bind=0.0.0.0
agent1.sources.source1.port=9000
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=100
agent1.channels.channel1.keep-alive=30

## 此处定义kafka的sink topic broker 
agent1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.topic=kafkaTest
agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.156.101:9092
agent1.sinks.sink1.requiredAcks=1
agent1.sinks.sink1.kafka.producer.acks = 1
agent1.sinks.sink1.kafka.flumeBatchSize = 20
agent1.sinks.sink1.kafka.producer.linger.ms = 1                                                                                                                                             
agent1.sinks.sink1.kafka.producer.compression.type = snappy                                                                                                                                 
## 此处定义source的channel 和 sink的channel                                                                                                                                                 
agent1.sources.source1.channels=channel1                                                                                                                                                    
agent1.sinks.sink1.channel=channel1

启动flume

apache-flume-1.8.0-bin中执行如下命令启动flume。

nohup bin/flume-ng agent -f conf/flume.properties -n agent1 -c /home/cdhuser/apache-flume-1.8.0-bin/conf >/dev/null &

注意此处的-f-n、 -c参数:

  • -f 表示配置文件的路径
  • -n agent的名称,与配置文件中一直
  • -c 配置文件所在的路径

此时,便已经成功启动了flume,source为HTTP,端口为9000,sink为Kafka,channel默认在内存,当然也可以将channel配置为Kafka Channel。

使用Rest Client给9000端口发送数据,然后在kafka消费者端进行查看。

启动kafka消费端

cd /opt/kafka

bin/kafka-console-consumer.sh --zookeeper 192.168.156.101:2181 --topic kafkaTest --from-beginning

然后发送如下测试数据

[
  {
    "headers" : {
      "datatype" : "test",
      "timestamp" : 1456989430522
    },
    "body" : "123123$45645$20160223111222$10.10.170.75$01$1$2$PC"
  }
]

此时在kafka消费者那一侧就可以发现如下信息:

123123$45645$20160223111222$10.10.170.75$01$1$2$PC

写的比较乱,有空在整理。

赞(0) 打赏
未经允许不得转载:堆上小栈 » 如何将Flume与Kafka进行整合

评论 抢沙发

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册