1. 程式人生 > >flume 的安裝和入門小例子

flume 的安裝和入門小例子

本文結構 我的環境 CentOS 7

  1. flume 安裝與啟動
  2. flume 的avro小例子
  3. Spool 的小例子
  4. Syslogtcp 小例子

1. flume 安裝與啟動

1.1 下載安裝包

訪問官網傳送門,不信你不點下載apache-flume-1.7.0-bin.tar.gz 安裝包。
並在合適路徑解壓安裝包。

//解壓命令
tar -zxvf apache-flume-1.7.0-bin.tar.gz
//修改名字
mv  apache-flume-1.7.0-bin flume

1.2 配置環境變數

修改 vi /etc/profile 檔案 新增環境變數

export
FLUME_HOME=/opt/apps/flume export PATH=.:$PATH::$FLUME_HOME/bin
注意:/opt/apps/flume 為我解壓flume的路徑

1.3 安裝檢測

進入flume的bin目錄下 執行命令

flume-ng version

輸出:
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

安裝成功

2. flume 的avro小例子

案例: 傳送一個檔案給flume

2.1 新建avro配置檔案

在flume 的conf 資料夾下新建 配置檔案 avro.conf
其內容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type= logger # Use a channel which buffers events in memory a1.channels.c1.type= memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2.2 啟動flume agent a1

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
注意:/opt/apps/flume 是我flume的安裝路徑

執行成功

[[email protected] bin]# ./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/avro.conf -n a1
-Dflume.root.logger=INFO,console Info: Including Hive libraries found via () for Hive access
+ exec /opt/apps/jdk1.8.0_112/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/apps/flume/conf:/opt/apps/flume/lib/*:/lib/*'
-Djava.library.path= org.apache.flume.node.Application -f /opt/apps/flume/conf/avro.conf -n a1 … … … 2017-01-04 15:29:02,210 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 2017-01-04 15:29:02,211 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started 2017-01-04 15:29:02,212 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:259)] Avro source r1 started.

2.3建立指定檔案

新開啟終端,在flume的conf 目錄下 執行(建立test.log檔案並寫入內容“hello world,abel“)

echo "hello world,abel” > test.log

2.4使用avro-client傳送檔案

//iZuf6aefi9w82dwe9g6zllZ 為當前的使用者名稱           
 ./flume-ng avro-client -c /opt/apps/flume/conf -H iZuf6aefi9w82dwe9g6zllZ -p 4141 -F /opt/apps/flume/conf/test.log

如圖

這裡寫圖片描述

此時在fulme啟動的控制檯,可以看到以下資訊,注意最後一行

這裡寫圖片描述

3. Spool 的小例子

**Spool監測配置的目錄下新增的檔案,並將檔案中的資料讀取出來。需要注意兩點:
    1) 拷貝到spool目錄下的檔案不可以再開啟編輯。
    2) spool目錄下不可包含相應的子目錄(新版本貌似解決了子目錄問題)**
    

3.1建立spool配置檔案

在flume 的conf 資料夾下新建 配置檔案 spool.conf
內容如下:

注意:/root/logs 為我存放被監控的日誌檔案目錄
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir =/root/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.2 啟動flume agent a1

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console

3.3 追加檔案到/root/logs目錄

新建檔案 test.log 並寫入內容“hello world“ ,然後將test.log 追加到被監控目錄下。

vi test.log 
mv test.log  /root/logs

此時會在flume的控制檯 看到如下輸出:(右邊為輸出)

這裡寫圖片描述

4.Syslogtcp小例子

Syslogtcp監聽TCP的埠做為資料來源

4.1建立agent 配置檔案

在flume 檔案下的conf 資料夾下建立syslog_tcp.conf 檔案內容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.2 啟動flume 併發送資料

啟動命令

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console

傳送資料

echo "hello idoall.org syslog" | nc localhost 5140

控制檯輸出

這裡寫圖片描述

除了以上的展示外 flume 還有很多種配置,例如:

案例:Exec

exec source 執行時, 不停機直接修改 conf 檔案 ,修改配置的監控檔名稱,flume 監控檔案改變為新配置的檔案。(即可不停機的修改監控檔案。)

 EXEC執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容。

實時監控/opt/apps/logs/tail4.log 檔案。並將結果輸出到/opt/apps/tmp 目錄下

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
#a1.sinks.k1.sink.rollInterval=0
a1.sinks.k1.sink.directory = /opt/apps/tmp

案例:JSONHandler

    讀取指定url 的request 的json 資訊,包含 headers 資訊和body 資訊

案例:Hadoop sink

    讀取資訊到hdfs

案例:Replicating Channel Selector(多 Channel)

 Flume支援Fan out流從一個源到多個通道。有兩種模式的Fan out,分別是複製和複用。在複製的情況下,流的事件被髮送到所有的配置通道。在複用的情況下,事件被髮送到可用的渠道中的一個子集。Fan out流需要指定源和Fan out通道的規則。

案例:Multiplexing Channel Selector(多channel)

根據header中不同的條件分佈到不同的channel上

案例:Load balancing Sink Processor(多sink)

    load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種情況下如果被選擇的sink不可用,就會自動嘗試傳送到下一個可用的sink上面。

案例還有很多,這些案例的詳細內容均來自 參考