1. 程式人生 > >如何將Flume與kafka進行整合

如何將Flume與kafka進行整合

自從Flume1.6開始,新增了對Kafka的支援,極大地提升了Flume的採集能力。避免後端因熱點問題導致kafka的channel爆滿而無法採集資料。
本篇介紹使用Flume當前最新版本1.8與Kafka的結合使用。

基本環境

  • Kafka (192.168.156.101:9092)
  • Zookeeper(192.168.156.101:2181)
  • JDK1.8

安裝Flume

wget http://apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz

進入apache-flume-1.8.0-bin目錄,在conf路徑中新增配置檔案flume.properties

(名稱隨意)。

cd apache-flume-1.8.0-bin
touch conf/flume.properties

新增如下配置:

## 此處定義 agent 的source(資料來源)、sink(資料流向)、channel(管道)
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1

## 此處定義Agent 資料來源的型別 
agent1.sources.source1.type=http
agent1.sources.source1.bind=0.0.0.0
agent1.sources.source1.port=9000
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=100
agent1.channels.channel1.keep-alive=30

## 此處定義kafka的sink topic broker 
agent1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.topic=kafkaTest
agent1.sinks.sink1.kafka.bootstrap.servers = 192.168.156.101:9092
agent1.sinks.sink1.requiredAcks=1
agent1.sinks.sink1.kafka.producer.acks = 1
agent1.sinks.sink1.kafka.flumeBatchSize = 20
agent1.sinks.sink1.kafka.producer.linger.ms = 1                                                                                                                                             
agent1.sinks.sink1.kafka.producer.compression.type = snappy                                                                                                                                 
## 此處定義source的channel 和 sink的channel                                                                                                                                                 
agent1.sources.source1.channels=channel1                                                                                                                                                    
agent1.sinks.sink1.channel=channel1

啟動flume

apache-flume-1.8.0-bin中執行如下命令啟動flume。

nohup bin/flume-ng agent -f conf/flume.properties -n agent1 -c /home/cdhuser/apache-flume-1.8.0-bin/conf >/dev/null &

注意此處的-f-n-c引數:

  • -f 表示配置檔案的路徑
  • -n agent的名稱,與配置檔案中一直
  • -c 配置檔案所在的路徑

此時,便已經成功啟動了flume,source為HTTP,埠為9000,sink為Kafka,channel預設在記憶體,當然也可以將channel配置為Kafka Channel。

使用Rest Client給9000埠傳送資料,然後在kafka消費者端進行檢視。

啟動kafka消費端

cd /opt/kafka

bin/kafka-console-consumer.sh --zookeeper 192.168.156.101:2181 --topic kafkaTest --from-beginning

然後給flume傳送如下測試資料

[
  {
    "headers" : {
      "datatype" : "test",
      "timestamp" : 1456989430522
    },
    "body" : "123123$45645$20160223111222$10.10.170.75$01$1$2$PC"
  }
]

此時在kafka消費者那一側就可以發現如下資訊:

123123$45645$20160223111222$10.10.170.75$01$1$2$PC

寫的比較亂,有空在整理。