1. 程式人生 > >Flume中階應用——啟動命令詳解、元件範例說明、事務說明

Flume中階應用——啟動命令詳解、元件範例說明、事務說明

flume啟動命令詳解

Usage: ./flume-ng <command> [options]...

commands:
  help                      display this help text  #顯示幫助資訊
  agent                     run a Flume agent  #啟動flume代理
  avro-client               run an avro Flume client	#啟動avro代理
  version                   show Flume version info		#顯示版本資訊

global options:
--conf,-c <conf> use configs in <conf> directory #指定配置資源的目錄,後面跟路徑 --classpath,-C <cp> append to the classpath #追加一個classpath --dryrun,-d do not actually start Flume, just print the command #不執行flume指令,只進行列印資訊的操作 --plugins-path <dirs> colon-
separated list of plugins.d directories. See the plugins.d section in the user guide for more details. Default: $FLUME_HOME/plugins.d #外掛目錄,預設$FLUME_HOME/plugins.d -Dproperty=value sets a Java system property value #設定一個JAVA的系統屬性 -Xproperty=
value sets a Java -X option #設定一個JAVA-X的選項 agent options: #啟動agent的相關配置 --name,-n <name> the name of this agent (required) #配置agent的名稱 --conf-file,-f <file> specify a config file (required if -z missing) #指定配置檔案 --zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing) #以zk為配置中心,指定zookeeper的連線 --zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs --no-reload-conf do not reload config file if changed --help,-h display help text avro-client options: #啟動avro Client的相關配置 --rpcProps,-P <file> RPC client properties file with server connection params --host,-H <host> hostname to which events will be sent #繫結sourceIP地址 --port,-p <port> port of the avro source #繫結source埠號 --dirname <dir> directory to stream to avro source #指定source監聽的目錄 --filename,-F <file> text file to stream to avro source (default: std input) #指定source監聽的檔案 --headerFile,-R <file> File containing event headers as key/value pairs on each new line # --help,-h display help text Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath.

Agent的啟動:

./flume-ng
agent --conf …/conf --conf-file …/conf/hey01.conf --name a1
-Dflume.root.logger=INFO,console

./flume-ng agent -》指定啟動的服務端型別

–conf …/conf -》指定配置資源的資料夾

–conf-file …/conf/hey01.conf -》指定配置檔案

–name a1 -》定義agent的名字

-Dflume.root.logger=INFO,console -》定義sink為logger時,的輸出型別

簡寫 …/bin/flume-ng agent -n a1 -c ./ -f ./weblog.conf -Dflume.root.logger=INFO,console

Avro-Client的啟動(當source=avro時):

./flume-ng avro-client --conf …/conf --host 0.0.0.0 --port 44444 --filename /home/nums.txt

./flume-ng avro-client -》啟動avro客戶端

–conf …/conf -》指定配置資源的資料夾

–host 0.0.0.0 -》指定source IP地址

–port 44444 -》指定source埠號

–filename /home/nums.txt -》指定用avro監聽的檔案

Flume元件說明

1 Event

一個具有有效荷載的位元組資料流和可選的字串屬性集。一條日誌在flume中會被轉換成一個JSON格式的串來傳遞,這個JSON串就是一個FlumeEvent,具體的格式為{header:{頭資訊},body:日誌內容},所以簡單來說,一條日誌在一個Flume就對應一個JSON串,即,一個FlumeEvent。

2 Agent

一個程序承載從外部源事件流到下一個目的地的過程。包含Source、Channel和 Sink,多個Agent之間還可以連線
形成複雜的日誌流動的網路。


3 Source

用來收集資料來源,接受日誌並封裝成Event,傳輸到Channel。

4 Sink

目的地傳送槽,獲取Agent裡面的資料,即消費Channel中的資料,並傳送到目的地。

4 Channel

傳輸通道,被動接受Source傳來的Event資料,暫時儲存,相當於對採集到的資料進行簡單的快取,等待 Sink消費。一個channel僅能對一個sink,但是在使用了Processor的前提下,一個channel可以對一個sinkgroup,也就是可以面向多個processor進行操作。

只有在sink將channel中的資料成功傳送出去之後,channel才會將臨時的event刪除,這種機制保證了資料傳輸的可靠性和安全性。


5 Selector

選擇器,主要用在實現扇出過程中實現按照指定方式分發資料。選擇器可以工作在 複製 多路複用(路由)模式 下,預設情況下,不配置Selector,則扇出採用複製機制。

6 Interceptor

攔截器可以攔截Event,允許或不允許Event通過,或在允許通過的時,改變Event內容,這種改變包括改變Event的體或頭資訊。

攔截器可以手動開發,只要實現org.apache.flume.interceptor.Interceptor介面,在其中編寫攔截規則即可。

Flume也內建了很多攔截器,可以直接使用。可以同時配置多個攔截器組合成攔截器鏈,依次攔截Event。

7 Processor

處理器,是Flume用於實現失敗恢復、負載均衡的元件,其中包含失敗恢復和負載均衡兩種模式。

Flume常用範例說明

1 Source

1.1 Avro 序列化資訊

監聽AVRO埠來接受來自外部AVRO客戶端的事件流,是實現多級流動、扇出流、扇入流等效果的基礎。另外也可以接受通過flume提供的Avro客戶端傳送的日誌資訊。

#配置Source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

1.2 Exec 將命令的輸出作為源

#配置Source
a1.sources.r1.type = exec
a1.sources.r1.command = ping www.baidu.com #設定發出的命令

1.3 HTTP

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444

通過curl命令測試

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://hadoop01:44444

1.4 Spooling Directory

將要收集的資料放置到"自動蒐集"目錄中。這個Source將監視該目錄,並將解析新檔案的出現。事件處理邏輯是可插拔的,當一個檔案被完全讀入通道,它會被重新命名或可選的直接刪除。

#配置Source
a1.sources.r1.type  =  spooldir
a1.sources.r1.spoolDir = /usr/local/src/flume/data #要監控的資料夾,不可以重名,不能修改

1.5 NetCat 監聽一個指定埠,並將接收到的資料的每一行轉換成一個數據

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
a1.sources.r1.max-line-length = 512 #每行最大位元組byte長度,預設512
a1.sources.r1.ack-every-event = true #是否在接收到資訊後回覆一個 ok ,預設回覆

1.6 Custom

如果以上內建的Source都不能滿足需求,可以自己開發Source。按照Flume要是寫一個類實現相應介面。將類打成jar放置到flume的lib目錄下。在配置檔案中通過類的全路徑名載入Source。


2 Sink

2.1 logger

#配置Sink
a1.sinks.k1.type = logger

2.2 File Roll 在本地檔案系統中儲存事件,每隔指定時長生成檔案保持這段時間內收集到的日誌資訊

#配置Sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/fresult #指定檔案目錄

2.3 HDFS

#配置Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata
a1.sinks.k1.hdfs.fileType = DataStream  #配置生成的檔案的格式,目前支援  SequenceFile, DataStream or CompressedStream,預設SequenceFile
a1.sinks.k1.hdfs.rollInterval = 30 #滾動檔案之前等待多久,預設30秒,0表示不基於檔案滾動
a1.sinks.k1.hdfs.rollSize = 0 #檔案大道多大的大小產生滾動,預設1KB,0表示不基於大小滾動
a1.sinks.k1.hdfs.rollCount = 0 #寫入時間數量達到多少時產生滾動,預設10,0表示不基於數量滾動
a1.sinks.k1.hdfs.minBlockReplicas = 1 #設定hdfs副本數量,預設按hadoop配置檔案配置
a1.sinks.k1.hdfs.hdfs.timeZone = GMT+8 #時區設定,預設系統時區
Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, …)
%A locale’s full weekday name (Monday, Tuesday, …)
%b locale’s short month name (Jan, Feb, …)
%B locale’s long month name (January, February, …)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00…23)
%I hour (01…12)
%j day of year (001…366)
%k hour ( 0…23)
%m month (01…12)
%n month without padding (1…12)
%M minute (00…59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00…59)
%y last two digits of year (00…99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

2.4 Avro

#配置Agent
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02
a1.sinks.k1.port = 44444

3 Channel

3.1 Memory

事件將被儲存在記憶體中的具有指定大小的佇列中。速度快,但斷電會丟失資料。非常適合那些需要高吞吐量但是可以容忍極端情況下會丟失資料的場景下。

#配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 #通道中最多可以存放的事件的數量
a1.channels.c1.transactionCapacity = 100  #一個事務中最多持有的事件的數量

3.2 File

事件將被儲存在磁碟中的檔案中,特點是速度慢,但斷電不會丟失資料,非常適合那些需要高可靠性 可恢復,但效能要求不高的場景。

#配置Channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/chk #檢查點檔案存放的目錄
a1.channels.c1.dataDirs = /home/centos/flume/data #日誌資料儲存的路徑,可以配置多個

4 Selector

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.selector.type = multiplexing #指定選擇器工作模式為多路複用
a1.sources.r1.selector.header = gender #選擇用於識別的資訊頭
a1.sources.r1.selector.mapping.male = c1  #分配規則,選擇對應資訊體的路由
a1.sources.r1.selector.mapping.female = c2  #分配規則,選擇對應資訊體的路由
a1.sources.r1.selector.default = c1 #對未定義的資訊或不是對應的資訊設定路由

5 Interceptor

5.1 Timestamp Interceptor

時間戳攔截器,攔截到Event後,允許通過,但在頭資訊中增加時間戳頭資訊。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.header = timestamp #新增出的頭的名稱,預設是timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true  #如果時間戳頭資訊存在是否要保持,預設false

5.2 Host Interceptor

主機名攔截器,攔截下Event後,允許通過,但在頭資訊中增加主機名或IP頭資訊。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.useIP = true #預設true為使用IP,false為使用主機名
a1.sources.r1.interceptors.i1.hostHeader = host #增加的頭的名稱,預設是host

5.3 Static Interceptor

靜態攔截器,攔截下Event之後,允許通過,但要增加上指定的頭和值。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i2
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = country #增加頭的名字,預設為key
a1.sources.r1.interceptors.i2.value = China #增加的頭的值,預設為value

5.4 UUID Interceptor

UUID攔截器,攔截下Event之後,允許通過,但要在頭上增加上一個UUID唯一表示作為頭。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i3
a1.sources.r1.interceptors.i3.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder #必須是這個
a1.sources.r1.interceptors.i3.headerName = id #header的頭,預設id
a1.sources.r1.interceptors.i3.preserveExisting = true #是否保留原id的頭,預設true
a1.sources.r1.interceptors.i3.prefix = "" #在UUID值前要增加什麼字首,預設不增加

5.5 Search and Replace Interceptor

搜尋和替換攔截器,攔截下Event後,通過正則匹配日誌中的體,將符合正則的部分替換為指定的內容。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern =  \\d  #正則表示式,用來匹配日誌的體
a1.sources.r1.interceptors.i1.replaceString = * #要替換為的字串
a1.sources.r1.interceptors.i1.charset = UTF-8 #體的編碼集,預設是utf-8 

5.6 Regex Filtering Interceptor

正則過濾攔截器,攔截下Event之後,利用正則匹配日誌的體,根據是否匹配決定是否保留或是否取出當前Event。

#配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^\\d.*$ #要匹配的正則
a1.sources.r1.interceptors.i1.excludeEvents = true #預設false,符合正則保留;true為符合正則去除

6 Processor

6.1 失敗恢復機制

失敗恢復機制下,Processor將會維護一個sink們的優先表。sink們可以被配置一個優先順序,數字越大優先順序越高。事件將永遠將只會發往優先順序最高的Sink。只要有一個Sink存活,整個過程仍然可以進行。如果沒有指定優先順序,則優先順序順序取決於sink們的配置順序,先配置的預設優先順序高於後配置的。

#配置Sink
a1.sinkgroups = g1 #定義sinkgroups
a1.sinkgroups.g1.sinks = k1 k2 #繫結sink組
a1.sinkgroups.g1.processor.type = failover #指定處理器工作模式失敗恢復機制的模式
a1.sinkgroups.g1.processor.priority.k1 = 5 #指定k1的優先順序
a1.sinkgroups.g1.processor.priority.k2 = 10 #指定k2的優先順序
a1.sinkgroups.g1.processor.maxpenalty = 10000 #節點最大限定失敗時間,預設30000毫秒

6.2 負載均衡機制

Processor的負載均衡機制提供了在多個sink之間實現負載均衡的能力,它維護了一個活動sink的索引列表。

通過Processor動態切換channel在SinkGroup中對Sink的指向,實現資料的負載均衡方式分發。

支援輪詢 或 隨機方式的負載均衡,預設值是輪詢方式,可以通過配置指定。

負載均衡模式下,如果某個中心伺服器宕機,則Processor會將該中心伺服器Sink剔除SinkGroup組,並將之前傳送失敗的資料發給其他仍然存活的Sink,所以可以認為Processor的負載均衡機制自帶失敗恢復的能力。

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance #指定處理器工作模式是負載均衡
a1.sinkgroups.g1.processor.backoff = true #失敗的sink是否不再生效,預設false
a1.sinkgroups.g1.processor.selector = round_robin #負載均衡模式,輪詢,random為隨機模式
a1.sinkgroups.g1.processor.maxTimeOut = 10000 #最大連線超時時間,預設30000毫秒

– 注意,flume預設批處理,即使是輪詢,隔得時間過長或過短,都不一定能實現ABABA模式


Flume事務機制

Flume的核心事務元件時channel,會使用兩個獨立的事務分別從source到channel,以及channel到sink的事件傳遞。即一旦事務中所有事件全部傳遞到channel且提交成功,那麼source就將該檔案標記為完成,否則就進行回滾,同理,如果某種原因使事務從channel到sink的傳遞過程無法記錄,那麼事務將會回滾,所有事件都會保持在channel中,等待重新傳遞。

1 At-least-once提交方式

Flume在傳送事務時,保證至少一次到達(at-least-once),也就是說可能重複出現。如果上次處理過程中,有些資料已被處理,但是事務還沒有提交(在輸出之後提交之前發生故障),則這些時間會被重試,出現重複。

除了at-least-once,還有at-most-once 和 exactly-once:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once

一些傳統企業會要求精確的一次到達,但是一次到達需要使用2PC兩階段提交協議,這樣的協議開銷非常大。所以如有需求,會在資料的其他處理環節對重複資料進行去重,通常是採用MR或Hive處理。

2 批量處理

為了提高效率,Flume儘可能的以事務為單位來處理事件,而不是逐一基於事件進行處理。批處理的設定尤其有利於提高file channle的效率,這樣整個事務只需要寫入一次本地磁碟,或者呼叫一次fsync,速度回快很多。

Flume相關題目

Flume的安全驗證怎麼做

flume不採集Nginx日誌,通過Logger4j採集日誌,優缺點是什麼

flume的channel採用記憶體,flume宕機了資料丟失怎麼解決

flume和kafka採集日誌區別,採集日誌時中間停了,怎麼記錄之前的日誌

Flume事務的處理

flume的扇入扇出

flume資料來源有哪些?使用過http+AVro搭配嗎?

Flume的幾個器

flume處理的資料量

flume工具使用