1. 程式人生 > >Flume 入門與簡單運用

Flume 入門與簡單運用

一、Flume 簡述

  • Flume是什麼:通俗地說 Flume 就是一個日誌採集工具。
  • 版本進化過程:分為 Flume-og(0.9x 已停止更新了)、Flume-ng(1.x) 兩個版本,Flume-ng最明顯的改動就是取消了集中管理配置的 Master 和 Zookeeper,變為一個純粹的傳輸工具。Flume-ng另一個主要的不同點是讀入資料和寫出資料現在由不同的工作執行緒處理(稱為 Runner)。在 Flume-og 中,讀入執行緒同樣做寫出工作(除了故障重試)。如果寫出慢的話(不是完全失敗),它將阻塞 Flume 接收資料的能力。
  • 資料處理方面: Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力 。提供了從console(控制檯)、RPC(Thrift-RPC)、text(檔案)、tail(UNIX tail)、syslog(syslog日誌系統),支援TCP和UDP等2種模式),exec(命令執行)等資料來源上收集資料的能力。

二、Flume 架構

典型架構力如下:
這裡寫圖片描述

  • 各元件職責:

    • Source : 負責日誌流入,比如從檔案、網路、Kafka等資料來源流入資料,資料流入的方式有兩種:輪訓拉取和事件驅動。
    • Channel :負責資料聚合或暫存,比如暫存到記憶體、本地檔案、資料庫、Kafka 等,日誌資料不會在管道停留很長時間,委快會被 Sink 消費掉。
      Sink :也叫接收器,負責資料轉移儲存,比如從Channel拿到日誌後直接儲存到HDFS、Hbase、ElasticSearch、Kafka 等。

    • 細分 Flume 資料流應該是由5個元件組成:Events、Sources、Channels、Sink、Agent。基中三個如上所述,Events與Agent 如下:
      Events :是使用Flume移動的資料的基本單位。它類似於JMS中的訊息,通常很小。它由頭和位元組陣列體組成。

三、 第一個Flume Demo

  • 原理方面的這裡暫時不深入,先執行一個簡單的Demo再說…
    其實flume的用法很簡單—-書寫一個配置檔案,在配置檔案當中描述source、channel與sink的具體實現,而後執行一個agent例項,在執行agent例項的過程中會讀取配置檔案的內容,這樣flume就會採集到資料。

  • 筆者是在HDP平臺上安裝的Flume, 因此無需進行任何配置便可以運用Flume。
    部署與安裝步驟只是通過 Ambari 的web 介面,點選 Actions , 選擇 add services ,根據提示直接Next便安裝完成。

  • 檢視flume 是否安裝好 及其版本號
    [

    [email protected] conf]# flume-ng version
    Flume 1.5.2.2.5.3.0-37

  • 這個案例主要的功能是監聽一個指定的網路埠,即只要應用程式向這個端口裡面寫資料,這個source元件就可以獲取到資訊並列印到日誌中,當然如果想將監控的日誌寫入到Hdfs也是很簡單的,只需修改對應的配置便可,這裡以快速入門為由所以只把其記錄下。

詳細操作步驟如下所示:

1、編寫配置檔案

  注:這裡的配置檔案指的是一個Flume任務相關的配置檔案,例如這裡監聽一個指定網路埠的配置,並非安裝相關的配置檔案。

建立一個配置檔案(檔案可以隨便找個目錄放置):

touch testNetcat.conf

2、編寫配置檔案:

vi  testNetcat.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、啟動 flume agent a1 服務端

[root@hdp03 conf]# flume-ng  agent -n a1  -c ../conf  -f testNetcat.conf   -Dflume.root.logger=DEBUG,console

如果要想flume 命令在後臺執行,可在上面命令最後新增一個“&”符號,便可在後臺運行了,實際開發過程,一般通過編寫啟停指令碼來執行。

啟動程式的部分日誌如下:

17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
17/08/25 15:07:24 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
17/08/25 15:07:24 INFO node.Application: Starting Sink k1
17/08/25 15:07:24 INFO node.Application: Starting Source r1
17/08/25 15:07:24 INFO source.NetcatSource: Source starting
17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.135.21.3:55555]

從上面日誌可看出其在監聽 10.135.21.3:55555

4、通過另一臺Linux 伺服器向10.135.21.3:55555 埠傳送資訊

[[email protected] root]# telnet hdp03 55555
Trying 10.194.67.6...
Connected to hdp06.
Escape character is '^]'.
I am bad boy...
OK
戰狼2很好看哦
OK

可在 10.135.21.3 監聽服務的日誌中看到如下資訊

17/08/25 15:07:24 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/10.194.67.6:55555]
17/08/25 15:16:43 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65 21 21 21 21 21 Hello Flume!!!!! }
17/08/25 15:20:02 INFO sink.LoggerSink: Event: { headers:{} body: 49 20 61 6D 20 62 61 64 20 62 6F 79 2E 2E 2E 0D I am bad boy.... }
17/08/25 15:20:40 INFO sink.LoggerSink: Event: { headers:{} body: E6 88 98 E7 8B BC 32 E5 BE 88 E5 A5 BD E7 9C 8B ......2......... }

上述就是這個簡單案例的全部過程。

5、配置檔案簡單解說

a1.sources = r1
a1.sinks = k1
a1.channels = c1 

用程式設計思路來解釋上面三行程式碼就相當與定義了 sources、sinks、channels 對應的變數,方便下面對它們的引用。其中 a1為agent 名,可隨意命令,但要注意在啟動時要與之對應,eg: flume-ng agent -n a1 …

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 10.135.21.3
a1.sources.r1.port = 55555

設定了r1(即sources) 接收的資源的型別為netcat, 監聽地址為10.135.21.3 ,埠為55555

# Describe the sink
a1.sinks.k1.type = logger

指定了k1(sinks) 輸出日誌的形式

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

設定了c1 (channel) 的快取機制,memory 將日誌快取在記憶體,
capacity:預設該通道中最大的可以儲存的event數量是1000,
trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面程式碼就很明瞭,將sources 與 sinks 繫結 channel