1. 程式人生 > >整合flume和kafka

整合flume和kafka

技術選型:

exec source + memory channel + avro sink
avro source + memory channel + kafka sink 

name:exec-memory-avro.conf

a1.sources = r1 a1.sinks = k1 a1.channels = c1

a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/zoujc/data a1.sources.r1.shell = /bin/sh -c

a1.sinks.k1.type = avro a1.sinks.k1.hostname = zoujc01 a1.sinks.k1.port = 44444

a1.channels.c1.type = memory

a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

name:avro-memory-kafka.conf

a2.sources = r1 a2.sinks = k1 a2.channels = c1

a2.sources.r1.type = avro a2.sources.r1.bind = zoujc01 a2.sources.r1.port = 44444

a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k1.brokerList = zoujc0101:9092 a2.sinks.k1.topic = hello_topic1 a2.sinks.k1.batchSize = 5 //關係著消費者消費資料的大小 a2.sinks.k1.requiredAcks = 1

a2.channels.c1.type = memory

a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1

啟動

啟動zookeeper

先啟動avro-memory-kafka.conf

flume-ng agent –name a2 –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console

再啟動exec-memory-avro.conf

flume-ng agent –name a1 –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console

啟動kafka生產者消費者

kafka-console-producer.sh --broker-list zoujc01:9092 --topic hello_topic1 kafka-console-consumer.sh --zookeeper zoujc01:2181–topic hello_topic1