Flume讀取日誌資料並寫入到Kafka,ConsoleConsumer進行實時消費
最近大資料學習使用了Flume、Kafka等,今天就實現一下Flume實時讀取日誌資料並寫入到Kafka中,同時,讓Kafka的ConsoleConsumer對日誌資料進行消費。
1、Flume
Flume是一個完善、強大的日誌採集工具,關於它的配置,在網上有很多現成的例子和資料,這裡僅做簡單說明不再詳細贅述。
Flume包含Source、Channel、Sink三個最基本的概念,其相應關係如下圖所示:
注:
Source——日誌來源,其中包括:Avro
Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Kafka Source、NetCat Source、Sequence Generator Source、Syslog Source、HTTP Source、Stress Source、Legacy Source、Custom Source、Scribe Source以及Twitter 1% firehose Source。
Channel——日誌管道,所有從Source過來的日誌資料都會以佇列的形式存放在裡面,它包括:Memory
Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom Channel。
Sink——日誌出口,日誌將通過Sink向外發射,它包括:HDFS
Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBase Sink、Async HBase Sink、Morphline Solr Sink、Elastic Search Sink、Kite Dataset Sink、Kafka Sink、Custom Sink。
基於Flume的日誌採集是靈活的,即,我們可以將多個管道處理串聯起來。
2、前期準備
2.1 軟體安裝位置
flume: /usr/local/share/applications/apache-flume-1.7.0-bin
kafka: /usr/local/share/applications/kafka_2.10-0.10.1.1
(節點數這個無所謂,最少兩個吧,master & slave,博主有5個結點,哈哈)
2.2 編寫用於實時產生日誌的shell檔案
在/usr/local/share/applications/目錄下執行如下操作:
建立一個臨時存放日誌檔案的目錄:
mkdir testdata
mkdir -p tmp/flumetokafka/logs/
接下來開始編寫shell檔案:
vim output.sh
在output.sh檔案中,首先通過shift + : + i進入編輯模式,編寫如下程式碼:for((i=5612; i<6000; i++));
do
touch $PWD/testdata/20170913-jangzhangz-$i.log
echo 'When we will see you again. Put a little sunshine in your life.----'+$i >> $PWD/testdata/20170913-jangzhangz-$i.log
mv $PWD/testdata/20170913-jangzhangz-$i.log $PWD/tmp/flumetokafka/logs/
done
這裡需要說明一下:tmp/flumetokafka/logs/就是flume進行監聽日誌檔案的資料目錄
3、為flume構建agent
首先進入flume所在目錄下的conf目錄,編寫構建agent的配置檔案(flume2kafka.properties):
通過vim flume2kafka.properties命令編輯檔案,同樣通過shift + : + i進入編輯模式,在檔案中寫入如下配置資訊:
Flume2KafkaAgent.sources=mysource
Flume2KafkaAgent.channels=mychannel
Flume2KafkaAgent.sinks=mysink
Flume2KafkaAgent.sources.mysource.type=spooldir
Flume2KafkaAgent.sources.mysource.channels=mychannel
Flume2KafkaAgent.sources.mysource.spoolDir=/usr/local/share/applications/tmp/flumetokafka/logs
Flume2KafkaAgent.sinks.mysink.channel=mychannel
Flume2KafkaAgent.sinks.mysink.type=org.apache.flume.sink.kafka.KafkaSink
Flume2KafkaAgent.sinks.mysink.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092,slave3:9092
Flume2KafkaAgent.sinks.mysink.kafka.topic=flume-data
Flume2KafkaAgent.sinks.mysink.kafka.batchSize=20
Flume2KafkaAgent.sinks.mysink.kafka.producer.requiredAcks=1
Flume2KafkaAgent.channels.mychannel.type=memory
Flume2KafkaAgent.channels.mychannel.capacity=30000
Flume2KafkaAgent.channels.mychannel.transactionCapacity=100
然後通過shift + : wq儲存檔案。
4、求證成果
4.1 啟動flume agent
在flume根目錄下:
bin/flume-ng agent -c conf -f conf/flume2kafka.properties -n Flume2KafkaAgent -Dflume.root.logger=INFO,console
4.2 啟動kafka消費者
在kafka根目錄下執行:
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flume-data --from-beginning
或者在kafka的bin目錄下執行:
./kafka-console-consumer.sh --zookeeper master:2181 --topic flume-data --from-beginning
4.3 生成日誌
在/usr/local/share/applications/下,執行如下命令:
./output.sh
4.4 檢視terminal顯示的消費記錄情況
正常執行後,結果會出現類似情況:
5、踩坑記錄
針對flume的配置檔案,如果要實現flume讀取的日誌資料寫入kafka,則必須要配置Bootstrap Server資訊。
總結
1、瞭解如何構建flume讀取日誌資料並能夠寫入kafka的架構
2、能夠藉助於shell檔案的編寫,從而簡化工作量,並能友好的監測到實時資料
3、flume配置檔案的編寫,source —— channel —— sink
4、啟動服務,進行成果檢測
5、踩坑記錄
希望對各位有幫助,博主設計,僅供參考!又到這個時間點了,睡覺了,晚安!