1. 程式人生 > >Flume學習筆記及配置引數詳解

Flume學習筆記及配置引數詳解

一、什麼是flume

Flume是一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

二、flume特點

flume的資料流由事件(Event)貫穿始終。事件是Flume的基本資料單位,它攜帶日誌資料(位元組陣列形式)並且攜帶有頭資訊,這些EventAgent外部的Source生成,當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩衝區,它將儲存事件直到Sink

處理完該事件。Sink負責持久化日誌或者把事件推向另一個Source

flume的可靠性

當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Besteffort(資料傳送到接收方後,不會進行確認)。

flume的可恢復性:

還是靠Channel。推薦使用FileChannel

,事件持久化在本地檔案系統裡(效能較差)。 

flume的一些核心概念:

Agent使用JVM 執行Flume。每臺機器執行一個agent,但是可以在一個agent中包含多個sourcessinks

Client生產資料,執行在一個獨立的執行緒。

SourceClient收集資料,傳遞給Channel

SinkChannel收集資料,執行在一個獨立執行緒。

Channel連線 sources 和 sinks ,這個有點像一個佇列。

Events可以是日誌記錄、 avro 物件等。

值得注意的是,Flume提供了大量內建的SourceChannelSink型別。不同型別的Source,Channel

Sink可以自由組合。組合方式基於使用者設定的配置檔案,非常靈活。比如:Channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。Sink可以把日誌寫入HDFS, HBase,甚至是另外一個Source等等。Flume支援使用者建立多級流,也就是說,多個agent可以協同工作,並且支援Fan-inFan-outContextual RoutingBackup Routes

三、flume架構

Agent:

Flumeagent為最小的獨立執行單位,一個Agent包含多個sourcechannelsink和其他元件

source

flume提供多種source供使用者進行選擇,儘可能多的滿足大部分日誌採集的需求,常用的source的型別包括avroexecnetcatspooling-directorysyslog等。具體的使用範圍和配置方法詳見source.

channel

flume中的channel不如sourcesink那麼重要,但卻是不可忽視的組成部分。常用的channelmemory-channel,同時也有其他型別的channel,如JDBCfile-channelcustom-channel等,詳情見channel.

sink

flumesink也有很多種,常用的包括avrologgerHDFSHbase以及file-roll等,除此之外還有其他型別的sink,如thriftIRCcustom等。具體的使用範圍和使用方法詳見sink.

四、flume使用

建立配置檔案(隨意位置)

vi agent.conf

五、配置

常見的source

avro source:avro可以監聽和收集指定埠的日誌,使用avrosource需要說明被監聽的主機ip和埠號

例子:

agent1.sources = r1

#描述source

agent1.sources.r1.type = avro  (型別為avro source)

agent1.sources.r1.bind = 0.0.0.0 (指定監聽的主機ip.本機是0.0.0.0.

agent1.sources.r1.port = 16667 (指定監聽的埠號)

exec source:可以通過指定的操作對日誌進行讀取,使用exec時需要指定shell命令,對日誌進行讀取

例子:

agent1.source = r2

#描述source

agent1.sources.r2.type = exec 

agent1.sources.r2.command =tail -F /root/flume/log/event.txt (監聽的檔案的路徑)

Spooling-directory source:可以讀取資料夾裡的日誌,使用時指定一個資料夾,可以讀取該資料夾中的所有檔案,當出現新檔案時會讀取該檔案並獲取資料.需要注意的是該資料夾中的檔案在讀取過程中不能修改,同時檔名也不能修改。

1、spoolDirectory是監控目錄,不能為空,沒有預設值。這個source不具有監控子目錄的功能,也就是不能遞迴監控。如果需要,這需要自己去實現,http://blog.csdn.net/yangbutao/article/details/8835563 這裡有遞迴檢測的實現;

2completedSuffix是檔案讀取完畢後給完成檔案新增的標記字尾,預設是".COMPLETED"

3deletePolicy這是是否刪除讀取完畢的檔案,預設是"never",就是不刪除,目前只支援"never"和“IMMEDIATE”;

4fileHeader是否在eventHeader中新增檔名,boolean型別預設false

5fileHeaderKey這是eventHeader中的key,value是檔名

6batchSize這個是一次處理的記錄數,預設是100

7inputCharset編碼方式,預設是"UTF-8"

8ignorePattern忽略符合條件的檔名

9trackerDirPath被處理檔案元資料的儲存目錄,預設".flumespool"

10deserializerType將檔案中的資料序列化成event的方式,預設是“LINE---org.apache.flume.serialization.LineDeserializer

11deserializerContext這個主要用在Deserializer中設定編碼方式outputCharset和檔案每行最大長度maxLineLength

例子:

agent1.sources = r3

#描述source

agent1.sources.r3.type = spooldir

agent1.sources.r3.spoolDir = /root/flume/log  (監聽的檔案目錄)

agent1.sources.r3.fileHeader = true

NetCat source:用來監聽一個指定埠,並將接收到的資料的每一行轉換為一個事件。

例子:

agent1.sources = r4 

#描述source

agent1.sources.r4.type = netcat  (source型別)

agent1.sources.r4.bind = 0.0.0.0 (指定繫結的ip或主機名)

agent1.sources.r4.port = 16668   (指定繫結的埠號)

HTTP source:接受HTTPGETPOST請求作為Flume的事件,其中GET方式應該只用於試驗。

type    型別,必須為"HTTP"

port–   監聽的埠

bind   0.0.0.0    監聽的主機名或ip

handler      org.apache.flume.source.http.JSONHandler處理器類,需要實現HTTPSourceHandler介面

handler.*  –   處理器的配置引數

Selector.type

selector.*   

interceptors  –  

interceptors.*        

enableSSL  false  是否開啟SSL,如果需要設定為true。注意,HTTP不支援SSLv3

excludeProtocols  SSLv3  空格分隔的要排除的SSL/TLS協議。SSLv3總是被排除的。

keystore      金鑰庫檔案所在位置

keystorePassword Keystore 金鑰庫密碼

例子:

agent1.sources.r1.type  = http

agent1.sources.r1.port  = 66666

常見的channel

memory channel:記憶體

例子:

agent1.channels = c1

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 100000

agent1.channels.c1.transactionCapacity = 100000

常見的sink

logger sink:將收集到的日誌寫到flumelog

例子:

agent1.sinks = k1

agent2.sinks.k1.type = logger

avro sink:可以將接受到的日誌傳送到指定埠,供級聯agent的下一跳收集和接受日誌,使用時需要指定目的ip和埠

例子:

agent1.sinks=k1

agent1.sinks.k2.type = avro

agent1.sinks.k2.channel = c2

agent1.sinks.k2.hostname = hadoop03 (指定的主機名或ip)

agent1.sinks.k2.port = 16666  (指定的埠號)

file_roll sink:可以將一定時間內收集到的日誌寫到一個指定的檔案中,具體過程為使用者指定一個資料夾和一個週期,然後啟動agent,這時該資料夾會產生一個檔案將該週期內收集到的日誌全部寫進該檔案內,直到下一個週期再次產生一個新檔案繼續寫入,以此類推,周而復始。

例子:

agent1.sinks=k1

agent1.sinks.k1.type = file_roll

agent1.sinks.k1.channel = c1

agent1.sinks.k1.sink.directory=/root/flumelog (指定檔案所在目錄的路徑)

hdfs sink:將收集到的日誌寫入到新建立的檔案中儲存起來,儲存路徑為分散式的檔案系統hdfs的路徑,同時hdfs建立新檔案的週期可以是時間,也可以是檔案的大小,還可以是採集日誌的條數

配置引數說明:

channel

Type:hdfs

path:寫入hdfs的路徑,需要包含檔案系統標識,比如:hdfs://namenode/flume/webdata/

可以使用flume提供的日期及%{host}表示式。

filePrefix: 預設值:FlumeData 寫入hdfs的檔名字首,可以使用flume提供的日期及%{host}表示式。

fileSuffix:寫入hdfs的檔名字尾,比如:.lzo .log等。

inUsePrefix:臨時檔案的檔名字首,hdfs sink會先往目標目錄中寫臨時檔案,再根據相關規則重新命名成最終目標檔案;

inUseSuffi: 預設值:.tmp  臨時檔案的檔名字尾。

rollInterval:  預設值:30  hdfs sink間隔多長將臨時檔案滾動成最終目標檔案,單位:秒;

如果設定成0,則表示不根據時間來滾動檔案;

注:滾動(roll)指的是,hdfs sink將臨時檔案重新命名成最終目標檔案,並新開啟一個臨時檔案來寫入資料;

rollSize  預設值:1024 當臨時檔案達到該大小(單位:bytes)時,滾動成目標檔案;

如果設定成0,則表示不根據臨時檔案大小來滾動檔案;

rollCount  預設值:10  events資料達到該數量時候,將臨時檔案滾動成目標檔案;

如果設定成0,則表示不根據events資料來滾動檔案;

idleTimeout  預設值:0

當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案;

batchSize   預設值:100  每個批次重新整理到HDFS上的events數量;

codeC  檔案壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy

fileType  預設值:SequenceFile

檔案格式,包括:SequenceFile, DataStream,CompressedStream 當使用DataStream時候,檔案不會被壓縮,不需要設定hdfs.codeC;當使用CompressedStream時候,必須設定一個正確的hdfs.codeC值;

maxOpenFiles  預設值:5000 最大允許開啟的HDFS檔案數,當開啟的檔案數達到該值,最早開啟的檔案將會被關閉;

minBlockReplicas  預設值:HDFS副本數   寫入HDFS檔案塊的最小副本數。該引數會影響檔案的滾動配置,一般將該引數配置成1,才可以按照配置正確滾動檔案。

writeFormat  sequence檔案的格式。包含:Text, Writable(預設)

callTimeout  預設值:10000 執行HDFS操作的超時時間(單位:毫秒);

threadsPoolSize  預設值:10  hdfs sink啟動的操作HDFS的執行緒數。

rollTimerPoolSize  預設值:1  hdfs sink啟動的根據時間滾動檔案的執行緒數。

kerberosPrincipal: HDFS安全認證kerberos配置;

kerberosKeytab

HDFS安全認證kerberos配置;

proxyUser  代理使用者

round 預設值:false  是否啟用時間上的捨棄,這裡的捨棄,類似於四捨五入,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式;

roundValue  預設值:1   時間上進行捨棄的值;

roundUnit  預設值:seconds 時間上進行捨棄的單位,包含:second,minute,hour

示例:

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

當時間為2015-10-16 17:38:59時候,hdfs.path依然會被解析為:

/flume/events/20151016/17:30/00

因為設定的是捨棄10分鐘內的時間,因此,該目錄每10分鐘新生成一個。

timeZone

預設值:Local Time  時區。

useLocalTimeStamp 預設值:flase  是否使用當地時間。

closeTries:預設值:0

hdfs sink關閉檔案的嘗試次數;

如果設定為1,當一次關閉檔案失敗後,hdfs sink將不會再次嘗試關閉檔案,這個未關閉的檔案將會一直留在那,並且是開啟狀態。

設定為0,當一次關閉失敗後,hdfs sink會繼續嘗試下一次關閉,直到成功。

retryInterval

預設值:180(秒)

hdfs sink嘗試關閉檔案的時間間隔,如果設定為0,表示不嘗試,相當於於將hdfs.closeTries設定成1.

Serializer  預設值:TEXT

序列化型別。其他還有:avro_event或者是實現了EventSerializer.Builder的類名。

Hbase sink:hbase是一種資料庫,可以儲存日誌,使用時需要指定儲存日誌的表名和列族名,然後agent就可以將收集到的日誌逐條插入到資料庫中。

tableName:要寫入的HBase資料表名,不能為空;

columnFamily:資料表對應的列簇名,這個sink目前只支援一個列簇,不能為空;

batchSize:每次事務可以處理的最大Event數量,預設是100

eventSerializerType:用來將event寫入HBase,即將event轉化為put。預設是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,還有一個是RegexHbaseEventSerializer,即適合HBaseSinkSerializer只有這倆,否則自己定製;

serializerContext:是eventSerializerType的配置資訊,就是配置檔案中包含“serializer.”的項;

例子:

agent1.sinks = k1

agent1.sinks.k1.type = hbase

agent1.sinks.k1.table = flume

agent1.sinks.k1.columnFamily=fl_conf

agent1.sinks.k1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer