1. 程式人生 > >Flume讀取日誌資料並寫入到Kafka,ConsoleConsumer進行實時消費

Flume讀取日誌資料並寫入到Kafka,ConsoleConsumer進行實時消費

最近大資料學習使用了Flume、Kafka等,今天就實現一下Flume實時讀取日誌資料並寫入到Kafka中,同時,讓Kafka的ConsoleConsumer對日誌資料進行消費。

1、Flume

Flume是一個完善、強大的日誌採集工具,關於它的配置,在網上有很多現成的例子和資料,這裡僅做簡單說明不再詳細贅述。

Flume包含Source、Channel、Sink三個最基本的概念,其相應關係如下圖所示:


注:

Source——日誌來源,其中包括:Avro Source、Thrift Source、Exec Source、JMS Source、Spooling Directory Source、Kafka Source、NetCat Source、Sequence Generator Source、Syslog Source、HTTP Source、Stress Source、Legacy Source、Custom Source、Scribe Source以及Twitter 1% firehose Source。

Channel——日誌管道,所有從Source過來的日誌資料都會以佇列的形式存放在裡面,它包括:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel、Pseudo Transaction Channel、Custom Channel。

Sink——日誌出口,日誌將通過Sink向外發射,它包括:HDFS Sink、Hive Sink、Logger Sink、Avro Sink、Thrift Sink、IRC Sink、File Roll Sink、Null Sink、HBase Sink、Async HBase Sink、Morphline Solr Sink、Elastic Search Sink、Kite Dataset Sink、Kafka Sink、Custom Sink。

基於Flume的日誌採集是靈活的,即,我們可以將多個管道處理串聯起來。

2、前期準備

2.1 軟體安裝位置

flume:  /usr/local/share/applications/apache-flume-1.7.0-bin

kafka:  /usr/local/share/applications/kafka_2.10-0.10.1.1

(節點數這個無所謂,最少兩個吧,master & slave,博主有5個結點,哈哈)

2.2 編寫用於實時產生日誌的shell檔案

在/usr/local/share/applications/目錄下執行如下操作:

建立一個臨時存放日誌檔案的目錄:

mkdir testdata
mkdir -p tmp/flumetokafka/logs/

接下來開始編寫shell檔案:

vim output.sh
在output.sh檔案中,首先通過shift + : + i進入編輯模式,編寫如下程式碼:

for((i=5612; i<6000; i++));
do
  touch $PWD/testdata/20170913-jangzhangz-$i.log
  echo 'When we will see you again. Put a little sunshine in your life.----'+$i >> $PWD/testdata/20170913-jangzhangz-$i.log
  mv $PWD/testdata/20170913-jangzhangz-$i.log $PWD/tmp/flumetokafka/logs/
done

這裡需要說明一下:tmp/flumetokafka/logs/就是flume進行監聽日誌檔案的資料目錄

3、為flume構建agent

首先進入flume所在目錄下的conf目錄,編寫構建agent的配置檔案(flume2kafka.properties):

通過vim flume2kafka.properties命令編輯檔案,同樣通過shift + : + i進入編輯模式,在檔案中寫入如下配置資訊:

Flume2KafkaAgent.sources=mysource
Flume2KafkaAgent.channels=mychannel
Flume2KafkaAgent.sinks=mysink

Flume2KafkaAgent.sources.mysource.type=spooldir
Flume2KafkaAgent.sources.mysource.channels=mychannel
Flume2KafkaAgent.sources.mysource.spoolDir=/usr/local/share/applications/tmp/flumetokafka/logs

Flume2KafkaAgent.sinks.mysink.channel=mychannel
Flume2KafkaAgent.sinks.mysink.type=org.apache.flume.sink.kafka.KafkaSink
Flume2KafkaAgent.sinks.mysink.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092,slave3:9092
Flume2KafkaAgent.sinks.mysink.kafka.topic=flume-data
Flume2KafkaAgent.sinks.mysink.kafka.batchSize=20
Flume2KafkaAgent.sinks.mysink.kafka.producer.requiredAcks=1

Flume2KafkaAgent.channels.mychannel.type=memory
Flume2KafkaAgent.channels.mychannel.capacity=30000
Flume2KafkaAgent.channels.mychannel.transactionCapacity=100

然後通過shift + : wq儲存檔案。

4、求證成果

4.1 啟動flume agent

在flume根目錄下:

bin/flume-ng agent -c conf -f conf/flume2kafka.properties -n Flume2KafkaAgent -Dflume.root.logger=INFO,console

4.2 啟動kafka消費者

在kafka根目錄下執行:

bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flume-data --from-beginning

或者在kafka的bin目錄下執行:
./kafka-console-consumer.sh --zookeeper master:2181 --topic flume-data --from-beginning

4.3 生成日誌

在/usr/local/share/applications/下,執行如下命令:

./output.sh

4.4 檢視terminal顯示的消費記錄情況

正常執行後,結果會出現類似情況:


5、踩坑記錄

針對flume的配置檔案,如果要實現flume讀取的日誌資料寫入kafka,則必須要配置Bootstrap Server資訊。

總結

1、瞭解如何構建flume讀取日誌資料並能夠寫入kafka的架構

2、能夠藉助於shell檔案的編寫,從而簡化工作量,並能友好的監測到實時資料

3、flume配置檔案的編寫,source —— channel —— sink

4、啟動服務,進行成果檢測

5、踩坑記錄

希望對各位有幫助,博主設計,僅供參考!又到這個時間點了,睡覺了,晚安!