1. 程式人生 > >基於Flume+kafka打造實時日誌收集分析系統

基於Flume+kafka打造實時日誌收集分析系統

  1. Kafka broker
修改conf/server.properties檔案,修改內容如下:           broker.id=1           host.name=172.16.137.196 port=10985           log.dirs=/data/kafka0.10.11-logs zookeeper.connect=172.16.137.196:2581其中broker.id四臺機器依次遞增,host.name為本機業務IP依次啟動kafkanohup bin/kafka-server-start.sh config/server.properties > nohup-kafka.out &建立
topicbin/kafka-topics.sh --create --zookeeper 172.16.137.196:2581 --replication-factor 1 --partitions 4 --topic topic-pearFlume部署安裝flume collecters: 負責集中收集各個flume收集回來的訂單日誌,生產日誌至kafka中供collect-server實時分析服務消費解析。1. 新增flume-collecters.properties
#flume collecters
agent.sources = s1Flume
agent.channels = c1
agent.sinks =sinkKafka

# For each one of the sources, the type is defined
agent.sources.s1Flume.channels = c1
agent.sources.s1Flume.type = avro
agent.sources.s1Flume.bind = 172.16.137.205
agent.sources.s1Flume.port = 6333

# The channel can be defined as follows.
agent.sources.s1Flume.channels = c1

# Each sink's type must be defined
agent.sinks.sinkKafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sinkKafka.topic = topic-pear
agent.sinks.sinkKafka.brokerList = 172.16.137.196:10985,172.16.137.197:10985,172.16.137.198:10985,172.16.137.199:10985
agent.sinks.sinkKafka.requiredAcks = 1
agent.sinks.sinkKafka.batchSize = 20
agent.sinks.sinkKafka.channel = c1

#Specify the channel the sink should use
#agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.c1.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.c1.capacity = 100
配置中sourcesbindport需修改為本機配置。sinksbrokerListkafka broker叢集地址,無需修改。2. 修改log4j.propertiesflume.log.dir=/data/flume/flume-collecters/logs3. 啟動flumenohup bin/flume-ng agent -c conf -f conf/flume-collecters.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE > nohup-collecters.out &flume-order:負責在每臺order-server機器上收集訂單產生的日誌,傳送至flume-collecters進行集中處理。
  1. 新增flume-order-collect.properties
#order-server訂單日誌收集flume
agent.sources = fileSource
agent.channels = memoryChannel
agent.sinks = collecter1 collecter2

agent.sinkgroups = gCollecters
agent.sinkgroups.gCollecters.sinks = collecter1 collecter2

#sink排程模式 load_balance  failover
agent.sinkgroups.gCollecters.processor.type = load_balance
#負載均衡模式  輪詢  random  round_robin
agent.sinkgroups.gCollecters.processor.selector=round_robin
#失效降級
agent.sinkgroups.gCollecters.processor.backoff=true
#降級時間30秒
agent.sinkgroups.gCollecters.processor.maxTimeOut=30000


agent.sources.fileSource.type = exec
agent.sources.fileSource.command = tail -F /data/pear/v1/orderdb-server/COLLECT/DATA_COLLECT.log
#agent.sources.fileSource.charset=utf-8
agent.sources.fileSource.channels = memoryChannel

agent.sources.fileSource.restartThrottle = 10000
agent.sources.fileSource.restart = true
agent.sources.fileSource.logStdErr = true

# Each sink's type must be defined
agent.sinks.collecter1.channel = memoryChannel
agent.sinks.collecter1.type = avro
agent.sinks.collecter1.hostname = 172.16.137.205
agent.sinks.collecter1.port = 6333
agent.sinks.collecter1.batch-size = 10

agent.sinks.collecter2.channel = memoryChannel
agent.sinks.collecter2.type = avro
agent.sinks.collecter2.hostname = 172.16.137.206
agent.sinks.collecter2.port = 6333
agent.sinks.collecter2.batch-size = 10

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
#The max number of events stored in the channel
agent.channels.memoryChannel.capacity = 100
#The max number of events stored in the channel per transaction
agent.channels.memoryChannel.transactionCapacity = 100
#Timeout in seconds for adding or removing an event
agent.channels.memoryChannel.keep-alive=30
配置中ipportflumecollers對應的配置,無需修改。tail –F 命令需修改正確的日誌檔案路徑。
  1. 修改log4j.properties
flume.log.dir=/data/flume/flume-order-collect/logs
  1. 啟動flume
nohup bin/flume-ng agent -c conf -f conf/flume-order-collect.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE > nohup-order-collect.out &測試flume+kafka+collect-server         for i in {1..1000};do echo "[{\"test$i\":\"161.lier\"}]" >> /data/pear/v1/orderdb-server/COLLECT/DATA_COLLECT.log;echo $i;sleep 0.1;done