Flume學習筆記及配置引數詳解
一、什麼是flume
Flume是一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。
二、flume特點
flume的資料流由事件(Event)貫穿始終。事件是Flume的基本資料單位,它攜帶日誌資料(位元組陣列形式)並且攜帶有頭資訊,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩衝區,它將儲存事件直到Sink
flume的可靠性
當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Besteffort(資料傳送到接收方後,不會進行確認)。
flume的可恢復性:
還是靠Channel。推薦使用FileChannel
flume的一些核心概念:
Agent使用JVM 執行Flume。每臺機器執行一個agent,但是可以在一個agent中包含多個sources和sinks。
Client生產資料,執行在一個獨立的執行緒。
Source從Client收集資料,傳遞給Channel。
Sink從Channel收集資料,執行在一個獨立執行緒。
Channel連線 sources 和 sinks ,這個有點像一個佇列。
Events可以是日誌記錄、 avro 物件等。
值得注意的是,Flume提供了大量內建的Source、Channel和Sink型別。不同型別的Source,Channel
三、flume架構
Agent:
Flume以agent為最小的獨立執行單位,一個Agent包含多個source、channel、sink和其他元件
source
flume提供多種source供使用者進行選擇,儘可能多的滿足大部分日誌採集的需求,常用的source的型別包括avro、exec、netcat、spooling-directory和syslog等。具體的使用範圍和配置方法詳見source.
channel
flume中的channel不如source和sink那麼重要,但卻是不可忽視的組成部分。常用的channel為memory-channel,同時也有其他型別的channel,如JDBC、file-channel、custom-channel等,詳情見channel.
sink
flume的sink也有很多種,常用的包括avro、logger、HDFS、Hbase以及file-roll等,除此之外還有其他型別的sink,如thrift、IRC、custom等。具體的使用範圍和使用方法詳見sink.
四、flume使用
建立配置檔案(隨意位置)
vi agent.conf
五、配置
常見的source
avro source:avro可以監聽和收集指定埠的日誌,使用avro的source需要說明被監聽的主機ip和埠號
例子:
agent1.sources = r1
#描述source
agent1.sources.r1.type = avro (型別為avro source)
agent1.sources.r1.bind = 0.0.0.0 (指定監聽的主機ip.本機是0.0.0.0.)
agent1.sources.r1.port = 16667 (指定監聽的埠號)
exec source:可以通過指定的操作對日誌進行讀取,使用exec時需要指定shell命令,對日誌進行讀取
例子:
agent1.source = r2
#描述source
agent1.sources.r2.type = exec
agent1.sources.r2.command =tail -F /root/flume/log/event.txt (監聽的檔案的路徑)
Spooling-directory source:可以讀取資料夾裡的日誌,使用時指定一個資料夾,可以讀取該資料夾中的所有檔案,當出現新檔案時會讀取該檔案並獲取資料.需要注意的是該資料夾中的檔案在讀取過程中不能修改,同時檔名也不能修改。
1、spoolDirectory是監控目錄,不能為空,沒有預設值。這個source不具有監控子目錄的功能,也就是不能遞迴監控。如果需要,這需要自己去實現,http://blog.csdn.net/yangbutao/article/details/8835563 這裡有遞迴檢測的實現;
2、completedSuffix是檔案讀取完畢後給完成檔案新增的標記字尾,預設是".COMPLETED";
3、deletePolicy這是是否刪除讀取完畢的檔案,預設是"never",就是不刪除,目前只支援"never"和“IMMEDIATE”;
4、fileHeader是否在event的Header中新增檔名,boolean型別, 預設false
5、fileHeaderKey這是event的Header中的key,value是檔名
6、batchSize這個是一次處理的記錄數,預設是100;
7、inputCharset編碼方式,預設是"UTF-8";
8、ignorePattern忽略符合條件的檔名
9、trackerDirPath被處理檔案元資料的儲存目錄,預設".flumespool"
10、deserializerType將檔案中的資料序列化成event的方式,預設是“LINE”---org.apache.flume.serialization.LineDeserializer
11、deserializerContext這個主要用在Deserializer中設定編碼方式outputCharset和檔案每行最大長度maxLineLength。
例子:
agent1.sources = r3
#描述source
agent1.sources.r3.type = spooldir
agent1.sources.r3.spoolDir = /root/flume/log (監聽的檔案目錄)
agent1.sources.r3.fileHeader = true
NetCat source:用來監聽一個指定埠,並將接收到的資料的每一行轉換為一個事件。
例子:
agent1.sources = r4
#描述source
agent1.sources.r4.type = netcat (source型別)
agent1.sources.r4.bind = 0.0.0.0 (指定繫結的ip或主機名)
agent1.sources.r4.port = 16668 (指定繫結的埠號)
HTTP source:接受HTTP的GET和POST請求作為Flume的事件,其中GET方式應該只用於試驗。
!type 型別,必須為"HTTP"
!port– 監聽的埠
bind 0.0.0.0 監聽的主機名或ip
handler org.apache.flume.source.http.JSONHandler處理器類,需要實現HTTPSourceHandler介面
handler.* – 處理器的配置引數
Selector.type
selector.*
interceptors –
interceptors.*
enableSSL false 是否開啟SSL,如果需要設定為true。注意,HTTP不支援SSLv3。
excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS協議。SSLv3總是被排除的。
keystore 金鑰庫檔案所在位置
keystorePassword Keystore 金鑰庫密碼
例子:
agent1.sources.r1.type = http
agent1.sources.r1.port = 66666
常見的channel
memory channel:記憶體
例子:
agent1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 100000
agent1.channels.c1.transactionCapacity = 100000
常見的sink
logger sink:將收集到的日誌寫到flume的log中
例子:
agent1.sinks = k1
agent2.sinks.k1.type = logger
avro sink:可以將接受到的日誌傳送到指定埠,供級聯agent的下一跳收集和接受日誌,使用時需要指定目的ip和埠
例子:
agent1.sinks=k1
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = c2
agent1.sinks.k2.hostname = hadoop03 (指定的主機名或ip)
agent1.sinks.k2.port = 16666 (指定的埠號)
file_roll sink:可以將一定時間內收集到的日誌寫到一個指定的檔案中,具體過程為使用者指定一個資料夾和一個週期,然後啟動agent,這時該資料夾會產生一個檔案將該週期內收集到的日誌全部寫進該檔案內,直到下一個週期再次產生一個新檔案繼續寫入,以此類推,周而復始。
例子:
agent1.sinks=k1
agent1.sinks.k1.type = file_roll
agent1.sinks.k1.channel = c1
agent1.sinks.k1.sink.directory=/root/flumelog (指定檔案所在目錄的路徑)
hdfs sink:將收集到的日誌寫入到新建立的檔案中儲存起來,儲存路徑為分散式的檔案系統hdfs的路徑,同時hdfs建立新檔案的週期可以是時間,也可以是檔案的大小,還可以是採集日誌的條數
配置引數說明:
channel
Type:hdfs
path:寫入hdfs的路徑,需要包含檔案系統標識,比如:hdfs://namenode/flume/webdata/
可以使用flume提供的日期及%{host}表示式。
filePrefix: 預設值:FlumeData 寫入hdfs的檔名字首,可以使用flume提供的日期及%{host}表示式。
fileSuffix:寫入hdfs的檔名字尾,比如:.lzo .log等。
inUsePrefix:臨時檔案的檔名字首,hdfs sink會先往目標目錄中寫臨時檔案,再根據相關規則重新命名成最終目標檔案;
inUseSuffi: 預設值:.tmp 臨時檔案的檔名字尾。
rollInterval: 預設值:30 hdfs sink間隔多長將臨時檔案滾動成最終目標檔案,單位:秒;
如果設定成0,則表示不根據時間來滾動檔案;
注:滾動(roll)指的是,hdfs sink將臨時檔案重新命名成最終目標檔案,並新開啟一個臨時檔案來寫入資料;
rollSize 預設值:1024 當臨時檔案達到該大小(單位:bytes)時,滾動成目標檔案;
如果設定成0,則表示不根據臨時檔案大小來滾動檔案;
rollCount 預設值:10 當events資料達到該數量時候,將臨時檔案滾動成目標檔案;
如果設定成0,則表示不根據events資料來滾動檔案;
idleTimeout 預設值:0
當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案;
batchSize 預設值:100 每個批次重新整理到HDFS上的events數量;
codeC 檔案壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy
fileType 預設值:SequenceFile
檔案格式,包括:SequenceFile, DataStream,CompressedStream 當使用DataStream時候,檔案不會被壓縮,不需要設定hdfs.codeC;當使用CompressedStream時候,必須設定一個正確的hdfs.codeC值;
maxOpenFiles 預設值:5000 最大允許開啟的HDFS檔案數,當開啟的檔案數達到該值,最早開啟的檔案將會被關閉;
minBlockReplicas 預設值:HDFS副本數 寫入HDFS檔案塊的最小副本數。該引數會影響檔案的滾動配置,一般將該引數配置成1,才可以按照配置正確滾動檔案。
writeFormat 寫sequence檔案的格式。包含:Text, Writable(預設)
callTimeout 預設值:10000 執行HDFS操作的超時時間(單位:毫秒);
threadsPoolSize 預設值:10 hdfs sink啟動的操作HDFS的執行緒數。
rollTimerPoolSize 預設值:1 hdfs sink啟動的根據時間滾動檔案的執行緒數。
kerberosPrincipal: HDFS安全認證kerberos配置;
kerberosKeytab
HDFS安全認證kerberos配置;
proxyUser 代理使用者
round 預設值:false 是否啟用時間上的”捨棄”,這裡的”捨棄”,類似於”四捨五入”,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式;
roundValue 預設值:1 時間上進行“捨棄”的值;
roundUnit 預設值:seconds 時間上進行”捨棄”的單位,包含:second,minute,hour
示例:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
當時間為2015-10-16 17:38:59時候,hdfs.path依然會被解析為:
/flume/events/20151016/17:30/00
因為設定的是捨棄10分鐘內的時間,因此,該目錄每10分鐘新生成一個。
timeZone
預設值:Local Time 時區。
useLocalTimeStamp 預設值:flase 是否使用當地時間。
closeTries:預設值:0
hdfs sink關閉檔案的嘗試次數;
如果設定為1,當一次關閉檔案失敗後,hdfs sink將不會再次嘗試關閉檔案,這個未關閉的檔案將會一直留在那,並且是開啟狀態。
設定為0,當一次關閉失敗後,hdfs sink會繼續嘗試下一次關閉,直到成功。
retryInterval
預設值:180(秒)
hdfs sink嘗試關閉檔案的時間間隔,如果設定為0,表示不嘗試,相當於於將hdfs.closeTries設定成1.
Serializer 預設值:TEXT
序列化型別。其他還有:avro_event或者是實現了EventSerializer.Builder的類名。
Hbase sink:hbase是一種資料庫,可以儲存日誌,使用時需要指定儲存日誌的表名和列族名,然後agent就可以將收集到的日誌逐條插入到資料庫中。
tableName:要寫入的HBase資料表名,不能為空;
columnFamily:資料表對應的列簇名,這個sink目前只支援一個列簇,不能為空;
batchSize:每次事務可以處理的最大Event數量,預設是100;
eventSerializerType:用來將event寫入HBase,即將event轉化為put。預設是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,還有一個是RegexHbaseEventSerializer,即適合HBaseSink的Serializer只有這倆,否則自己定製;
serializerContext:是eventSerializerType的配置資訊,就是配置檔案中包含“serializer.”的項;
例子:
agent1.sinks = k1
agent1.sinks.k1.type = hbase
agent1.sinks.k1.table = flume
agent1.sinks.k1.columnFamily=fl_conf
agent1.sinks.k1.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer