flume實現監控檔案,並將檔案內容傳入kafka的,kafka在控制檯實現消費
阿新 • • 發佈:2018-12-15
在flume的配置裡建一個檔案flume-kafka.conf
生產者產生的資料放在/home/hadoop/c.txt中
topic消費c.txt中的檔案
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec #設定要監控的資料夾 a1.sources.s1.command=tail -F /home/hadoop/c.txt a1.sources.s1.channels=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100 #設定Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #設定Kafka的broker地址和埠號 a1.sinks.k1.brokerList=hadoop01:9092 #設定Kafka的Topic a1.sinks.k1.topic=test02 #設定序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1
將生產者的java程式碼做成一個jar包
生產者程式碼
public class Test { public static void main(String[] args) { int i = 0; while(true) { i++; System.out.println( "測試資料"+i); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
開啟一個頁面啟動flume
bin/flume-ng a1 --conf-file conf/flume-kafka.conf -c conf/ --name a1 -Dflume.root.logger=DEBUG,console
另開一個頁面啟動消費者,前提是kafka叢集要開啟
bin/kafka-console-consumer.sh --zookeeper 192.168.147.136:2181,192.168.147.137:2181,192.168.147.138:2181 --topic test02 --from-beginning
再開一個頁面啟動生產者,jar包傳到/hadoop/hadoop/下
java -cp /home/hadoop/Test-1.0-SNAPSHOT.jar com.ceshi.Test /home/hadoop/c.txt
可以看到消費者頁面出現與生產者頁面相同訊息則測試成功。