1. 程式人生 > >kafka和flume整合

kafka和flume整合

使用flume+kafka+sparkstreaming進行日誌實時處理,flume作為kafka的producer,sparkstreaming作為kafka的消費者。

flume只有1.6.0和以上的版才可以和kafka整合,1.6.0之前的版本沒有提供kafka sink這個功能,在kafka中建立一個flumeTopic topic,然後使用flume 監控五個日誌檔案,五個檔案實時更新,flume配置檔案如下

a1.sources = s1 s2 s3 s4 s5
a1.channels = c1
a1.sinks = k1


# define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /nfs0/log/log1.log
a1.sources.s1.shell = /bin/sh -c


a1.sources.s2.type = exec
a1.sources.s2.command = tail -F /nfs0/log/log2.log
a1.sources.s2.shell = /bin/sh -c


a1.sources.s3.type = exec
a1.sources.s3.command = tail -F /nfs0/log/log3.log
a1.sources.s3.shell = /bin/sh -c




a1.sources.s4.type = exec
a1.sources.s4.command = tail -F /nfs0/log/log4.log
a1.sources.s4.shell = /bin/sh -c




a1.sources.s5.type = exec
a1.sources.s5.command = tail -F /nfs0/log/log5.log
a1.sources.s5.shell = /bin/sh -c


#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000


#define the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = master11:9092,slave11:9092,slave12:9092
a1.sinks.k1.topic = flumeTopic


# zuhe
a1.sources.s1.channels = c1
a1.sources.s2.channels = c1
a1.sources.s3.channels = c1
a1.sources.s4.channels = c1
a1.sources.s5.channels = c1

a1.sinks.k1.channel = c1

啟動flume和kafka,此時sparkstreaming就可以實時消費kafka中的資料了