1. 程式人生 > >flume高階配置——資料流的複製、分流、負載均衡、故障轉移

flume高階配置——資料流的複製、分流、負載均衡、故障轉移

一、在前面幾篇文章中介紹過幾種常見的flume pipeline 場景。我們在回顧一下,主要有一下幾種:

1、多個 agent 順序連線:

可以將多個Agent順序連線起來,將最初的資料來源經過收集,儲存到最終的儲存系統中。這是最簡單的情況,一般情況下,應該控制這種順序連線的Agent的數量,因為資料流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。

2、多個agent的資料匯聚到一個agent:

這種情況應用的場景比較多,比如要收集Web網站的使用者行為日誌,Web網站為了可用性使用的負載均衡的叢集模式,每個節點都產生使用者行為日誌,可以為每個節點都配置一個Agent來單獨收集日誌資料,然後多個Agent將資料最終匯聚到一個用來儲存資料儲存系統,如HDFS上。

3、flume channel selectors功能:

flume channel selectors允許給一個source可以配置多個channel的能力。這種模式有兩種方式,一種是用來複制(Replication),另一種是用來分流(Multiplexing)。

1)Replication方式,可以將最前端的資料來源複製多份,分別傳遞到多個channel中,每個channel接收到的資料都是相同的。

配置格式示例如下:

# List the sources, sinks and channels for the agent <Agent>.sources = <Source1> <Agent>.sinks = <Sink1> <Sink2> <Agent>.channels = <Channel1> <Channel2>   # set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>   # set channel for sinks <Agent>.sinks.<Sink1>.channel = <Channel1> <Agent>.sinks.<Sink2>.channel = <Channel2>   <Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值為replication(沒有制定時預設的方式),Source1會將資料分別儲存到Channel1和Channel2,這兩個channel裡面儲存的資料是相同的,然後資料被傳遞到Sink1和Sink2。

注:還有個配置 selector.optional,如下配置,表示channel1是optional的channel,如果想channel1寫入失敗,則會被忽略,channel2沒有被比較optional,如果想channel2寫入失敗則會導致整個事件失敗。

# set list of channels for source (separated by space) <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>   <Agent>.sources.<Source1>.selector.optional=  <Channel1>

2)Multiplexing方式,selector可以根據header的值來確定資料傳遞到哪一個channel,配置格式,如下所示:

# Mapping for multiplexing selector <Agent>.sources.<Source1>.selector.type = multiplexing <Agent>.sources.<Source1>.selector.header = <someHeader> <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #... <Agent>.sources.<Source1>.selector.default = <Channel2> 上面selector的type的值為multiplexing,同時配置selector的header資訊,還配置了多個selector的mapping的值,即header的值:如果header的值為Value1、Value2,資料從Source1路由到Channel1;如果header的值為Value2、Value3,資料從Source1路由到Channel2。

注:如果只配置了一個channel,多個sink,那麼只有一個固定的sink可以獲取到channel的資料。

4、flume sink processors功能: sink groups允許給一個實體設定多個sinks,sink processors可以使在sink group中所有sink具有負載均衡的能力,或者在一個sink失效後切換到另一個sink的fail over模式。 1)負載均衡:

Load balancing Sink Processor能夠實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink元件上,而每個Sink元件分別連線到一個獨立的Agent上,示例配置,如下所示:

a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true #健康檢查 a1.sinkgroups.g1.processor.selector = round_robin #random兩種選擇方式 a1.sinkgroups.g1.processor.selector.maxTimeOut=10000  它維護一個可用sink索引,它支援通過round_robin和random兩種方法進行負載分配,預設的選擇方式是round_type型別的,也可以通過配置檔案進行更改。當被選擇器被呼叫的時候,它不會遮蔽故障的sink,繼續嘗試訪問每一個可用的sink,如果所有的sink都故障了,選擇器則無法給sink傳播資料。如果backoff被開啟,則sink processor會遮蔽故障的sink,選擇器會在一個給定的超時時間內移除它們,當超時時間完畢後,sink還是無法訪問,則超時時間以指數方式增長。

2)實現failover: Failover Sink Processor能夠實現failover功能,具體流程類似load balance,但是內部處理機制與load balance完全不同:Failover Sink Processor維護一個優先順序Sink元件列表,只要有一個Sink元件可用,Event就被傳遞到下一個元件。如果一個Sink能夠成功處理Event,則會加入到一個Pool中,否則會被移出Pool並計算失敗次數,設定一個懲罰因子,示例配置如下所示:

a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000 #故障轉移時間

二、flume設定多個數據源:

a1.sources = r1 r2 r3   a1.sinks = k1   a1.channels = c1     #source   a1.sources.r1.type = exec   a1.sources.r1.command = tail -F /application/flume/logs/access.log   a1.sources.r1.channels = c1      a1.sources.r2.type = exec   a1.sources.r2.command = tail -F /application/flume/logs/uqc_head.log   a1.sources.r2.channels = c1      a1.sources.r3.type = exec   a1.sources.r3.command = tail -F /application/flume/logs/uqc_tail.log   a1.sources.r3.channels = c1     # channel a1.channels.c1.type = memory   a1.channels.c1.capacity = 1000   a1.channels.c1.transactionCapacity = 100      # sink   a1.sinks.k1.type = logger   a1.sinks.k1.channel = c1  

上面配置,在同一個flume程序中,會有多個source資料會流入到一個channel中。通常,我們不會這麼幹,會採用上面第二種思路(多個agent彙總到一個agent那種思路)。