flume 的安裝和入門小例子
本文結構 我的環境 CentOS 7
- flume 安裝與啟動
- flume 的avro小例子
- Spool 的小例子
- Syslogtcp 小例子
1. flume 安裝與啟動
1.1 下載安裝包
訪問官網傳送門,不信你不點下載apache-flume-1.7.0-bin.tar.gz 安裝包。
並在合適路徑解壓安裝包。
//解壓命令
tar -zxvf apache-flume-1.7.0-bin.tar.gz
//修改名字
mv apache-flume-1.7.0-bin flume
1.2 配置環境變數
修改 vi /etc/profile 檔案 新增環境變數
export FLUME_HOME=/opt/apps/flume
export PATH=.:$PATH::$FLUME_HOME/bin
注意:/opt/apps/flume 為我解壓flume的路徑
1.3 安裝檢測
進入flume的bin目錄下 執行命令
flume-ng version
輸出:
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
安裝成功
2. flume 的avro小例子
案例: 傳送一個檔案給flume
2.1 新建avro配置檔案
在flume 的conf 資料夾下新建 配置檔案 avro.conf
其內容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type= logger
# Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2 啟動flume agent a1
./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
注意:/opt/apps/flume 是我flume的安裝路徑
執行成功
[[email protected] bin]# ./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1
-Dflume.root.logger=INFO,console Info: Including Hive libraries found via () for Hive access
+ exec /opt/apps/jdk1.8.0_112/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/apps/flume/conf:/opt/apps/flume/lib/*:/lib/*'
-Djava.library.path= org.apache.flume.node.Application -f /opt/apps/flume/conf/avro.conf -n a1 … … … 2017-01-04 15:29:02,210 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2017-01-04 15:29:02,211 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started 2017-01-04 15:29:02,212 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:259)] Avro source r1 started.
2.3建立指定檔案
新開啟終端,在flume的conf 目錄下 執行(建立test.log檔案並寫入內容“hello world,abel“)
echo "hello world,abel” > test.log
2.4使用avro-client傳送檔案
//iZuf6aefi9w82dwe9g6zllZ 為當前的使用者名稱
./flume-ng avro-client -c /opt/apps/flume/conf -H iZuf6aefi9w82dwe9g6zllZ -p 4141 -F /opt/apps/flume/conf/test.log
如圖
此時在fulme啟動的控制檯,可以看到以下資訊,注意最後一行
3. Spool 的小例子
**Spool監測配置的目錄下新增的檔案,並將檔案中的資料讀取出來。需要注意兩點:
1) 拷貝到spool目錄下的檔案不可以再開啟編輯。
2) spool目錄下不可包含相應的子目錄(新版本貌似解決了子目錄問題)**
3.1建立spool配置檔案
在flume 的conf 資料夾下新建 配置檔案 spool.conf
內容如下:
注意:/root/logs 為我存放被監控的日誌檔案目錄
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir =/root/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.2 啟動flume agent a1
./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console
3.3 追加檔案到/root/logs目錄
新建檔案 test.log 並寫入內容“hello world“ ,然後將test.log 追加到被監控目錄下。
vi test.log
mv test.log /root/logs
此時會在flume的控制檯 看到如下輸出:(右邊為輸出)
4.Syslogtcp小例子
Syslogtcp監聽TCP的埠做為資料來源
4.1建立agent 配置檔案
在flume 檔案下的conf 資料夾下建立syslog_tcp.conf 檔案內容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.2 啟動flume 併發送資料
啟動命令
./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
傳送資料
echo "hello idoall.org syslog" | nc localhost 5140
控制檯輸出
除了以上的展示外 flume 還有很多種配置,例如:
案例:Exec
exec source 執行時, 不停機直接修改 conf 檔案 ,修改配置的監控檔名稱,flume 監控檔案改變為新配置的檔案。(即可不停機的修改監控檔案。)
EXEC執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容。
實時監控/opt/apps/logs/tail4.log 檔案。並將結果輸出到/opt/apps/tmp 目錄下
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
#a1.sinks.k1.sink.rollInterval=0
a1.sinks.k1.sink.directory = /opt/apps/tmp
案例:JSONHandler
讀取指定url 的request 的json 資訊,包含 headers 資訊和body 資訊
案例:Hadoop sink
讀取資訊到hdfs
案例:Replicating Channel Selector(多 Channel)
Flume支援Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的情況下,流的事件被髮送到所有的配置通道。在複用的情況下,事件被髮送到可用的渠道中的一個子集。Fan out流需要指定源和Fan out通道的規則。
案例:Multiplexing Channel Selector(多channel)
根據header中不同的條件分佈到不同的channel上
案例:Load balancing Sink Processor(多sink)
load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種情況下如果被選擇的sink不可用,就會自動嘗試傳送到下一個可用的sink上面。
案例還有很多,這些案例的詳細內容均來自 參考