1. 程式人生 > >flume + kafka 日誌採集

flume + kafka 日誌採集

將系統產生日誌資訊通過flume採集,推送至kafka進行消費處理

架構圖

服務 ip port 備註
flume collectors 10.200.132.181 6333 flume collectors
flume agent 10.200.132.168 flume採集器(目前使用一個agent)
kafka 10.200.132.181 9092 2181 kafka和zookeeper

一臺機器部署一個flume agent,如果需要採集多個服務的日誌,在配置檔案裡面可以配置多個collect,

本文主要安裝flume和如何實現日誌採集

一、安裝部署

1、下載安裝flume

[[email protected] opt]# tar -zxvf apache-flume-1.8.0-bin.tar.gz

2、10.200.132.181機器上配置

新建 flume-collecters.properties ,用於將收集到flume agent日誌 推送到kafka

[[email protected] apache-flume-1.8.0-bin]#  cd conf

[[email protected] conf]# vim flume-collecters.properties 
#flume collecters
agent.sources = s1Flume
agent.channels = c1
agent.sinks =sinkKafka
 
# For each one of the sources, the type is defined
agent.sources.s1Flume.channels = c1
agent.sources.s1Flume.type = avro

#flume ip

agent.sources.s1Flume.bind = 10.200.132.181

# flume 埠
agent.sources.s1Flume.port = 6333
 
# The channel can be defined as follows.
agent.sources.s1Flume.channels = c1
 
# Each sink's type must be defined
agent.sinks.sinkKafka.type = org.apache.flume.sink.kafka.KafkaSink

# kafka訊息佇列名稱
agent.sinks.sinkKafka.topic = topic-pear

# kafka ip:port
agent.sinks.sinkKafka.brokerList = 10.200.132.181:9092 
agent.sinks.sinkKafka.requiredAcks = 1
agent.sinks.sinkKafka.batchSize = 20
agent.sinks.sinkKafka.channel = c1
#Specify the channel the sink should use
#agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.c1.type = memory
 
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.c1.capacity = 100

啟動flume

[[email protected] apache-flume-1.8.0-bin]#  bin/flume-ng agent -c conf -f conf/flume-collecters.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE &

檢視6333埠是否已啟動

3、10.200.132.168上flume配置

[[email protected] conf]# vim flume-test-collect.properties
agent.sources = fileSource
agent.channels = memoryChannel
agent.sinks = collecter1
 
agent.sinkgroups = gCollecters
agent.sinkgroups.gCollecters.sinks = collecter1
 
#sink排程模式 load_balance  failover
agent.sinkgroups.gCollecters.processor.type = failover
#負載均衡模式  輪詢  random  round_robin
agent.sinkgroups.gCollecters.processor.selector=round_robin
#失效降級
agent.sinkgroups.gCollecters.processor.backoff=true
#降級時間30秒
agent.sinkgroups.gCollecters.processor.maxTimeOut=30000
 
 
agent.sources.fileSource.type = exec
# 監控的日誌檔案
agent.sources.fileSource.command = tail -F /opt/test/logs/test.log
#agent.sources.fileSource.charset=utf-8
agent.sources.fileSource.channels = memoryChannel
 
agent.sources.fileSource.restartThrottle = 10000
agent.sources.fileSource.restart = true
agent.sources.fileSource.logStdErr = true
 
# Each sink's type must be defined
agent.sinks.collecter1.channel = memoryChannel
agent.sinks.collecter1.type = avro
# flume 服務ip
agent.sinks.collecter1.hostname = 10.200.132.181
# flume 埠
agent.sinks.collecter1.port = 6333
agent.sinks.collecter1.batch-size = 10
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
 
# Other config values specific to each type of channel(sink or source)
#The max number of events stored in the channel
agent.channels.memoryChannel.capacity = 100
#The max number of events stored in the channel per transaction
agent.channels.memoryChannel.transactionCapacity = 100
#Timeout in seconds for adding or removing an event
agent.channels.memoryChannel.keep-alive=30

建立日誌目錄(如果沒有就建立)

[[email protected] conf]# mkdir -p /opt/test/logs/

啟動服務

[[email protected] apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/flume-test-collect.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE

部署配置基本完成

二、驗證

登入10.200.132.181伺服器,執行kafka消費訊息

[[email protected] ~]# /opt/kafka_2.12-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-pear --from-beginning

登入10.200.132.168伺服器,往日誌檔案寫日誌

[[email protected] logs]# echo "hello world" >>test.log

寫入完成之後,大概等幾秒鐘,就可以看到kafka消費者消費的佇列資訊了。

自己也可以寫一個springboot程式產生日誌,修改flume agent 監控的日誌目錄檔案,就可以實時的將日誌通過flume推送至kafka