1. 程式人生 > >Flume NG高可用叢集搭建

Flume NG高可用叢集搭建

軟體版本:

  • CentOS 6.7
  • hadoop-2.7.4
  • apache-flume-1.6.0

一、Flume NG簡述

  • Flume 是 Cloudera 提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。
  • Flume將採集到的檔案,socket資料包等各種形式的資料來源,輸出到HDFS、Hbase、hive、kafka等眾多外部儲存系統中
  • Flume針對特殊場景也具備良好的自定義擴充套件能力,因此flume適用於大部分的日常資料採集場景
  • 一般的採集需求,通過對flume的簡單配置即可實現
  • Flume分散式系統中最核心的角色時agentflume採集系統就是由一個個
    agent所連線起來形成的
  • 每個agent相當於一個數據傳遞員(Source到Channel到Sink之間傳遞資料的形式時event事件,event事件是一個數據流單元)

Flume的架構圖中有3個元件,分別是source、channel、sink

  • Source:採集資料來源,用於和資料來源對接,獲取資料
  • Sink:下沉,採集資料傳送目的地,用於往下一級agent傳遞資料或者往最終的儲存系統傳遞資料
  • Channel:agent的內部傳輸管道,用於將資料來源以事件event的形式,從Source到Sink

執行流程:

從外部系統(Web Server)中收集產生的日誌,然後通過Agent的Source元件將資料傳送到臨時儲存Channel元件,最後傳遞給Sink元件,Sink元件將滿足預設值的臨時檔案,儲存到HDFS檔案系統中。

二、搭建單點Flume NG

1、解壓軟體包

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/

2、配置環境變數

export FLUME_HOME=/export/servers//flume-1.6.0
export PATH=$PATH:$FLUME_HOME/bin

3、修改flume配置檔案

$FLUME_HOME/conf/flume-env.sh(flume-env.sh.template修改成flume-env.sh)

export JAVA_HOME=/export/servers/jdk1.8.0_171

4、簡單測試—採集指定檔案到 HDFS

伺服器會在指定目錄下,會不斷產生新的日誌檔案,每當有新的日誌檔案產生,flume自動將新產生的資料來源採集到檔案儲存系統HDFS中

建立配置檔案spooldir-hdfs.properties

# Name the components on this agent 
a1.sources = r1 	# agent的別名
a1.sinks = k1 
a1.channels = c1 
 
# Describe/configure the source 
# 採集資料的型別
a1.sources.r1.type = exec       
# 指定執行命令(flume自動執行該命令)
a1.sources.r1.command = tail -F /export/data/callLog.log    
  
# Describe the sink
# 指定採集資訊下沉到哪裡
a1.sinks.k1.type = hdfs 
# 指定採集到的資料存放在hdfs檔案系統的哪個路徑下
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# 指定儲存資訊檔名的字首
a1.sinks.k1.hdfs.filePrefix = events- 
# 是否捨棄已下沉的檔案,捨棄指的是:根據指定的時間間隔建立資料夾
a1.sinks.k1.hdfs.round = true
#指定每10分鐘捨棄已下沉的檔案
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute 
# 每30s,將臨時檔案,持久化到hdfs檔案系統中;設定為0,不滾動,滾動即下沉
a1.sinks.k1.hdfs.rollInterval = 30 	
# 臨時檔案達到指定位元組就滾動,預設1024;設定為0,不根據臨時檔案大小來滾動檔案
a1.sinks.k1.hdfs.rollSize = 1024 
# 臨時檔案的事件event個數達到指定值就滾動,預設10;如果設定成0,不根據events資料來滾動檔案	
a1.sinks.k1.hdfs.rollCount = 10
# 每個事件寫入的行數,預設100
a1.sinks.k1.hdfs.batchSize = 100
# 是否使用本地的時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
# 生成的檔案型別,預設是 Sequencefile,可用 DataStream(普通文字)
a1.sinks.k1.hdfs.fileType = DataStream
 
# Use a channel which buffers events in memory
# channels資料快取型別
a1.channels.c1.type = memory
# 該通道中最大的可以儲存的event數量	
a1.channels.c1.capacity = 1000
# 每次最大可以從source中拿到或者送到sink中的event數量
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1  

提示:配置檔案中的註釋,在虛擬機器配置時儘可能刪除,如果保留可能會報錯

啟動flume

flume-ng agent -c conf -f conf/spooldir-hdfs.properties -n a1 -Dflume.root.logger=INFO,console

命令列引數解釋: 

  • -c conf   指定flume自身的配置檔案所在目錄

  • -f conf/spooldir-hdfs.properties 指定我們所描述的採集方案

  • -n a1  指定我們這個agent的名字

下面截圖是複製的節點啟動傳送資料來源後,收到的資料來源

複製該節點,產生資料,這裡是自己編寫的java程式碼,也可以使用下面shell命令測試

[[email protected] flume-1.6.0]# while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

 三、搭建高可用Flume NG

高可用的Flume NG叢集,架構圖如下所示:

由於電腦效能的限制,將agent減少到1個節點,Collector維持原來的2個節點。主要是為了後面測試負載均衡、以及容錯考慮

1、節點分配

 圖中所示,Agent資料分別流入到Collector1和Collector2,Flume NG本身提供了Failover機制,可以自動切換和恢復。在上圖中,有3個產生日誌伺服器分佈在不同的機房,要把所有的日誌都收集到一個叢集中儲存。下面我們開發配置Flume NG叢集

2、Flume 的 load-balance

負載均衡是用於解決一臺機器(一個程序)無法解決所有請求而產生的一種演算法。Load balancing Sink Processor能夠實現load balance功能,如上圖 Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的Collector1和Collector2中

2.1、agent端的配置檔案:exec-avro.properties

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/callLog.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2

#set load_balance
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

2.2、Collector1端的配置檔案:avro-logger.properties

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

提示:Collector2的配置檔案與Collector1的基本相同,只需要修改a1.sources.r1.bind

2.3、啟動測試

  1. 分別啟動Collector1、Collector2、agent
  2. 複製agent虛擬機器,執行shell命令:while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

Collector1的執行結果

 Collector2的執行結果

3、Flume 的 failover

3.1、故障轉移機制

實現 failover 功能,具體流程類似 load balance,但是內部處理機制與load balance完全不同

  • Failover Sink Processor維護一個優先順序Sink元件列表,只要有一個Sink 元件可用,Event就會被傳遞到下一個元件。
  • 故障轉移機制的作用是將失敗的Sink 降級到一個池,在這些池中它們被分配一個冷卻時間,隨著故障的連續,增加下次該Sink的重試時間。但只要失敗的Sink成功傳送一個Event,它將恢復到活動池。
  • Sink具有與之相關的優先順序,數值越大,優先順序越高。

例如,具有優先順序為10的sink在優先順序為8的Sink之前被啟用。在啟用過程中,如果傳送事件時匯聚失敗,則將嘗試讓優先順序為8的Sink傳送事件。如果沒有指定優先順序,則根據在配置中指定Sink的順序來確定傳送順序

3.2、配置如下:

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /export/data/callLog.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node02
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node03
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
a1.sinkgroups.g1.processor.type = failover
# 如果開啟,則將失敗的 sink 放入黑名單
a1.sinkgroups.g1.processor.backoff = true
# 還支援random
a1.sinkgroups.g1.processor.selector = round_robin
#在黑名單放置的超時時間,超時結束時,若仍然無法接收,則超時時間呈指數增長
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
#優先順序值, 絕對值越大表示優先順序越高,若不設定,則按照sink的先後順序
a1.sinkgroups.g1.processor.priority.k1 = 5  
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.priority.k3 = 6
#失敗的 Sink 的最大回退期(millis)
a1.sinkgroups.g1.processor.maxpenalty = 20000  

3.3、測試參考負載均衡,這裡就不測試了