1. 程式人生 > >Flume中的攔截器(Interceptor)介紹與使用

Flume中的攔截器(Interceptor)介紹與使用

Flume中的攔截器(interceptor),使用者Source讀取events傳送到Sink的時候,在events header中加入一些有用的資訊,或者對events的內容進行過濾,完成初步的資料清洗。這在實際業務場景中非常有用,Flume-ng 1.6中目前提供了以下攔截器:

Timestamp Interceptor; Host Interceptor; Static Interceptor; UUID Interceptor; Morphline Interceptor; Search and Replace Interceptor; Regex Filtering Interceptor; Regex Extractor Interceptor;

本文對常用的幾種攔截器進行學習和介紹,並附上使用示例。

本文中使用的Source為TaildirSource,就是監控一個檔案的變化,將內容傳送給Sink,具體可參考《Flume中的TaildirSource》,Source配置如下:

#–>設定sources名稱 agent_lxw1234.sources = sources1 #–> 設定channel名稱 agent_lxw1234.channels = fileChannel #–> 設定sink 名稱 agent_lxw1234.sinks = sink1

source 配置

agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json agent_lxw1234.sources.sources1.filegroups = f1 agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log agent_lxw1234.sources.sources1.batchSize = 100 agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000 agent_lxw1234.sources.sources1.maxBackoffSleep = 5000 agent_lxw1234.sources.sources1.channels = fileChannel Flume Source中使用攔截器的相關配置如下:

source 攔截器

agent_lxw1234.sources.sources1.interceptors = i1 i2 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp

對一個Source可以使用多個攔截器。

Timestamp Interceptor

時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多。比如在使用HDFS Sink時候,根據events的時間戳生成結果檔案,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d

hdfs.filePrefix = log_%Y%m%d_%H

會根據時間戳將資料寫入相應的檔案中。

但可以用其他方式代替(設定useLocalTimeStamp = true)。

Host Interceptor

主機名攔截器。將執行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)。

根據上面的Source,攔截器的配置如下:

source 攔截器

agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost

sink 1 配置

agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel

該配置用於將source的events儲存到HDFS上hdfs://cdh5/tmp/lxw1234的目錄下,檔名為lxw1234_<主機名>.log

Static Interceptor

靜態攔截器,用於在events header中加入一組靜態的key和value。

根據上面的Source,攔截器的配置如下:

source 攔截器

agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = static agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.key = static_key agent_lxw1234.sources.sources1.interceptors.i1.value = static_value

sink 1 配置

agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234 agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel 看看最終Sink在HDFS上生成的檔案結構:

flume interceptor

UUID Interceptor

UUID攔截器,用於在每個events header中生成一個UUID字串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中讀取並使用。根據上面的source,攔截器的配置如下:

source 攔截器

agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_

sink 1 配置

agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel 執行後在日誌中檢視header資訊:

flume interceptor