1. 程式人生 > >flume-kafka整合--實時日誌採集

flume-kafka整合--實時日誌採集

flume採用架構

exec-source + memory-channel + avro-sink

avro-source + memory-channel + kafka-sink

kafka採用架構

啟動zookeeper

zkServer.sh start

啟動kafka(啟動一個“籃子”)

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &

模擬

啟動一個kafka消費者監聽日誌檔案

kafka-console-consumer.sh –zookeeper 192.168.145.128:2181 –from-beginning –topic firstTopic

向日志文件中新增資料

echo hi,flume-kafka framework >> flume-kafka.txt

exec-memory-avro.conf


flume-ng agent \
  --name exec-memory-avro \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
  -Dflume.root.logger=INFO,console 

# example exec-memory
-avro exec-memory-avro.sources = exec-source exec-memory-avro.sinks = avro-sink exec-memory-avro.channels = memory-channel # Describe/configure the source exec-memory-avro.sources.exec-source.type = exec exec-memory-avro.sources.exec-source.command = tail -F /root/data/flume-kafka.txt exec-memory-avro.sources.exec
-source.shell = /bin/sh -c # Describe/ the sink exec-memory-avro.sinks.avro-sink.type = avro exec-memory-avro.sinks.avro-sink.hostname = 192.168.145.128 exec-memory-avro.sinks.avro-sink.port = 44444 # Use a channel which buffers events in memory exec-memory-avro.channels.memory-channel.type = memory # Bind the source and sink to the channel exec-memory-avro.sources.exec-source.channels = memory-channel exec-memory-avro.sinks.avro-sink.channel = memory-channel
avro-memory-kafka.conf

flume-ng agent \
  --name avro-memory-kafka \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
  -Dflume.root.logger=INFO,console 


# example avro-memory-kafka
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.145.128
avro-memory-kafka.sources.avro-source.port = 44444

# Describe/ the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = 192.168.145.128:9092
avro-memory-kafka.sinks.kafka-sink.topic = firstTopic
avro-memory-kafka.sinks.kafka-sink.batchSize = 3
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1


# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel