1. 程式人生 > >flume實現監控檔案,並將檔案內容傳入kafka的,kafka在控制檯實現消費

flume實現監控檔案,並將檔案內容傳入kafka的,kafka在控制檯實現消費

在flume的配置裡建一個檔案flume-kafka.conf
生產者產生的資料放在/home/hadoop/c.txt中
topic消費c.txt中的檔案

a1.sources = s1                                                                                                                  
a1.channels = c1                                                                                                                 
a1.sinks = k1                                                                                                                    
                                                                                                                                  
a1.sources.s1.type=exec   
#設定要監控的資料夾                                                                                                       
a1.sources.s1.command=tail -F /home/hadoop/c.txt
a1.sources.s1.channels=c1                                                                                                        
a1.channels.c1.type=memory                                                                                                       
a1.channels.c1.capacity=10000                                                                                                    
a1.channels.c1.transactionCapacity=100                                                                                           
                                                                                                                                      
#設定Kafka接收器                                                                                                                    
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink                                                                          
#設定Kafka的broker地址和埠號                                                                                                      
a1.sinks.k1.brokerList=hadoop01:9092                                                                                               
#設定Kafka的Topic                                                                                                                   
a1.sinks.k1.topic=test02                                                                                                 
#設定序列化方式                                                                                                                     
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder                                                                                                                                                                                                        
a1.sinks.k1.channel=c1   

將生產者的java程式碼做成一個jar包
生產者程式碼

public class Test {
	public static void main(String[] args) {
		int i = 0;
		while(true) {
			i++;
			System.out.println(	"測試資料"+i);
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

開啟一個頁面啟動flume

bin/flume-ng a1 --conf-file  conf/flume-kafka.conf -c conf/ --name a1 -Dflume.root.logger=DEBUG,console

另開一個頁面啟動消費者,前提是kafka叢集要開啟

bin/kafka-console-consumer.sh --zookeeper 192.168.147.136:2181,192.168.147.137:2181,192.168.147.138:2181 --topic test02 --from-beginning

再開一個頁面啟動生產者,jar包傳到/hadoop/hadoop/下

java -cp /home/hadoop/Test-1.0-SNAPSHOT.jar com.ceshi.Test /home/hadoop/c.txt 

可以看到消費者頁面出現與生產者頁面相同訊息則測試成功。