1. 程式人生 > >均衡負載方式搭建高可用的flume-ng環境寫入資訊到hadoop和kafka

均衡負載方式搭建高可用的flume-ng環境寫入資訊到hadoop和kafka

應用場景為多臺agent推送本地日誌資訊到hadoop,由於agent和hadoop叢集處在不同的網段,資料量較大時可能出現網路壓力較大的情況,所以我們在hadoop一側的網段中部署了兩臺flume collector機器,將agent的資料傳送到collector上進行分流,分成2個collector的資料匯入hadoop,資料流圖如下:

圖中只畫了3個agent,實際應用場景中有多臺,但是collector只有兩臺

我們需要將agent的資料均衡地分發到兩臺collector機器上,agent的配置如下:

#name the components on this agent  這裡宣告各個source、channel、sink的名稱
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source    宣告source的型別,此處是通過tcp的方式監聽本地埠5140
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

#define sinkgroups   此處配置k1、k2的組策略,k1、k2合為一組,型別為均衡負載方式
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin

#define the sink 1<span>	</span>指定sink1、sink2的資料流向,都是通過avro方式發到兩臺collector機器
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=10.0.3.82
a1.sinks.k1.port=5150

#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=10.0.3.83
a1.sinks.k2.port=5150


# Use a channel which buffers events in memory   指定channel的型別為記憶體channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
在collector1、collector2都正常的情況下,agent的資料隨機向兩臺機器分發,當collector任意一臺機器故障時,agent的資料會發送到另一臺正常的機器上

collector1的配置

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

collector1.sources = r1
collector1.channels = c1 c2
collector1.sinks = k1 k2

# Describe the source
collector1.sources.r1.type = avro
collector1.sources.r1.port = 5150
collector1.sources.r1.bind = 0.0.0.0
collector1.sources.r1.channels = c1 c2


# Describe channels c1 c2 which buffers events in memory
collector1.channels.c1.type = file
collector1.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector1.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector1.channels.c2.type = memory
collector1.channels.c2.capacity = 1000
collector1.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector1.sinks.k1.type = hdfs
collector1.sinks.k1.channel = c1
collector1.sinks.k1.hdfs.path = /quantone/flume/
collector1.sinks.k1.hdfs.fileType = DataStream
collector1.sinks.k1.hdfs.writeFormat = TEXT
collector1.sinks.k1.hdfs.rollInterval = 300
collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d
collector1.sinks.k1.hdfs.round = true
collector1.sinks.k1.hdfs.roundValue = 5
collector1.sinks.k1.hdfs.roundUnit = minute
collector1.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k2.topic = mytopic
collector1.sinks.k2.channel = c2
collector1.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector1.sinks.k2.requiredAcks = 1
collector1.sinks.k2.batchSize = 20


collector2的配置

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

collector2.sources = r1
collector2.channels = c1 c2
collector2.sinks = k1 k2

# Describe the source
collector2.sources.r1.type = avro
collector2.sources.r1.port = 5150
collector2.sources.r1.bind = 0.0.0.0
collector2.sources.r1.channels = c1 c2

# Describe channels c1 c2 which buffers events in memory
collector2.channels.c1.type = file
collector2.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector2.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector2.channels.c2.type = memory
collector2.channels.c2.capacity = 1000
collector2.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector2.sinks.k1.type = hdfs
collector2.sinks.k1.channel = c1
collector2.sinks.k1.hdfs.path = /quantone/flume
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.hdfs.writeFormat = TEXT
collector2.sinks.k1.hdfs.rollInterval = 300
collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d
collector2.sinks.k1.hdfs.round = true
collector2.sinks.k1.hdfs.roundValue = 5
collector2.sinks.k1.hdfs.roundUnit = minute
collector2.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector2.sinks.k2.topic = mytopic
collector2.sinks.k2.channel = c2
collector2.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector2.sinks.k2.requiredAcks = 1
collector2.sinks.k2.batchSize = 20

sink到hadoop的channel型別為file型別,該型別的channel會在對應的sink傳送資料失敗後將資訊持久化到對應的檔案目錄中,待網路恢復正常後繼續講資料傳送出去,相比memory channel,此種類型的channel適合資料量不大但是對可靠性要求較高的資料傳輸。

需要注意的是:此處我們使用collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d 的配置標明寫入hadoop中檔名的字首,如果在傳送資料的header中沒有對應的timestamp欄位,這樣配置會導致資料傳送不了,此時需要加上配置collector2.sinks.k1.hdfs.useLocalTimeStamp = true 表明使用collector此時的時間來匹配%Y-%m-%d欄位,但是這個時間其實不是日誌在agent本地生成的真實時間。

如果想讓不同的agent的資料寫入到不同的kafka 的topic中,在collector的kafka sink中的欄位collector1.sinks.k2.topic = mytopic 配置可以不配,在每個agent的source中配置static型別的interceptors,如:

a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.value = mytopic
這樣可以使不同的agent生成不同的topic名,將不同agent的資料寫入到對應的topic中