大資料系列之Flume(1.6)+kafka 整合
相關文章:
關於Flume 的 一些核心概念:
元件名稱 | 功能介紹 |
Agent代理 | 使用JVM 執行Flume。每臺機器執行一個agent,但是可以在一個agent中包含多個sources和sinks。 |
Client客戶端 | 生產資料,執行在一個獨立的執行緒。 |
Source源 | 從Client收集資料,傳遞給Channel。 |
Sink接收器 | 從Channel收集資料,進行相關操作,執行在一個獨立執行緒。 |
Channel通道 | 連線 sources 和 sinks ,這個有點像一個佇列。 |
Events事件 | 傳輸的基本資料負載。 |
文章和 大資料系列之Flume+HDFS 非常相似,不同的在於flume安裝目錄conf下新建了kafka.properties檔案,啟動時也應當用此配置檔案作為引數啟動。下面看具體內容:
1. kafka.properties:
agent.sources = s1 agent.channels = c1 agent.sinks = k1 agent.sources.s1.type=exec agent.sources.s1.command=tail -F /tmp/logs/kafka.log agent.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000 agent.channels.c1.transactionCapacity=100 #設定Kafka接收器 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #設定Kafka的broker地址和埠號 agent.sinks.k1.brokerList=master:9092 #設定Kafka的Topic agent.sinks.k1.topic=kafkatest #設定序列化方式 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.channel=c1
關於配置檔案中注意3點:
a. agent.sources.s1.command=tail -F /tmp/logs/kafka.log
b. agent.sinks.k1.brokerList=master:9092
c . agent.sinks.k1.topic=kafkatest
2.很明顯,由配置檔案可以瞭解到:
a.我們需要在/tmp/logs下建一個kafka.log的檔案,且向檔案中輸出內容(下面會說到);
b.flume連線到kafka的地址是 master:9092,注意不要配置出錯了;
c.flume會將採集後的內容輸出到Kafka topic 為kafkatest上,所以我們啟動zk,kafka後需要開啟一個終端消費topic kafkatest的內容。這樣就可以看到flume與kafka之間玩起來了~~
3.具體操作:
a.在/tmp/logs下建立空檔案kafka.log。在mfz 使用者目錄下新建指令碼kafkaoutput.sh(一定要給予可執行許可權),用來向kafka.log輸入內容: kafka_test***
for((i=0;i<=1000;i++));
do echo "kafka_test-"+$i>>/tmp/logs/kafka.log;
done
b. 在kafka安裝目錄下執行如下命令,啟動zk,kafka 。(不明白此處可參照 大資料系列之Flume+HDFS)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
bin/kafka-server-start.sh -daemon config/server.properties &
c.新增Topic kafkatest
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
d.開啟新終端,在kafka安裝目錄下執行如下命令,生成對topic kafkatest 的消費
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning --zookeeper master
e.啟動flume
bin/flume-ng agent --conf-file conf/kafka.properties -c conf/ --name agent -Dflume.root.logger=DEBUG,console
d.執行kafkaoutput.sh指令碼(注意觀察kafka.log內容及消費終端接收到的內容)
e.檢視新終端消費資訊
整體流程如圖:
完~~
後續將介紹Java程式碼對於Flume+HDFS ,Flume+Kafka的實現。敬請期待~