1. 程式人生 > >大資料之 Flume 日誌收集框架入門

大資料之 Flume 日誌收集框架入門

                                             Flume 日誌收集框架入門 

 一、 flume 簡介

Apache Flume是一個分散式,可靠且可用的系統,用於有效地從許多不同的源收集,聚合和移動大量日誌資料到集中式資料儲存。

Apache Flume的使用不僅限於日誌資料聚合。由於資料來源是可定製的,因此Flume可用於傳輸大量事件資料,包括但不限於網路流量資料,社交媒體生成的資料,電子郵件訊息以及幾乎任何可能的資料來源。

Apache Flume是Apache Software Foundation的頂級專案。

目前有兩種版本程式碼行,版本0.9.x和1.x.

系統要求:

  1. Java執行時環境 - Java 1.8或更高版本
  2. 記憶體 - 源,通道或接收器使用的配置的足夠記憶體
  3. 磁碟空間 - 通道或接收器使用的配置的足夠磁碟空間
  4. 目錄許可權 - 代理使用的目錄的讀/寫許可權

資料流模型

Flume事件被定義為具有位元組有效負載和可選字串屬性集的資料流單元。Flume代理是一個(JVM)程序,它承載事件從外部源流向下一個目標(躍點)的元件。

代理元件圖

Flume源消耗由外部源(如Web伺服器)傳遞給它的事件。外部源以目標Flume源識別的格式向Flume傳送事件。例如,Avro Flume源可用於從Avro客戶端或從Avro接收器傳送事件的流中的其他Flume代理接收Avro事件。可以使用Thrift Flume Source定義類似的流程,以接收來自Thrift Sink或Flume Thrift Rpc客戶端或Thrift客戶端的事件,這些客戶端使用Flume thrift協議生成的任何語言編寫。當Flume源接收事件時,它將其儲存到一個或多個頻道。該通道是一個被動儲存器,可以保持事件直到它被Flume接收器消耗。檔案通道就是一個例子 - 它由本地檔案系統支援。接收器從通道中移除事件並將其放入外部儲存庫(如HDFS(通過Flume HDFS接收器))或將其轉發到流中下一個Flume代理(下一跳)的Flume源。給定代理程式中的源和接收器與通道中暫存的事件非同步執行。

 

二、flume 下載 安裝 ,配置環境變數

cdh 版本下載地址 http://archive.cloudera.com/cdh5/cdh/5/

解壓  tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar

配置環境變數:

  • 事先配置好jdk1.8以上的JAVA_HOME
  • 配置FLUME_HOME
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=${FLUME_HOME}/bin:$PATH 

修改 配置 檔案:

 cd apache-flume-1.6.0-cdh5.7.0-bin/conf/

cp flume-env.sh.template flume-env.sh

vim flume-env.sh

新增 一行 

export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64

 執行 flume-ng version 命令 可以在控制檯上看到版本號輸出說明安裝成功。

三、flume 實戰(1)從指定的網路埠採集資料輸出到控制檯

使用 flume 的關鍵就是寫配置檔案

(a)配置source

(b)配置channel

(c)配置sink

(d)把以上三個元件串起來

類似於netcat的源,它偵聽給定埠並將每行文字轉換為事件。像nc -k -l [host] [port]這樣的行為。換句話說,它開啟一個指定的埠並偵聽資料。期望提供的資料是換行符分隔的文字。每行文字都轉換為Flume事件,並通過連線的通道傳送。

示例配置檔案  agent 的名稱為a1 ,sources 的名稱為 r1 ,sinks 的名稱為k1,channels 的名稱為 c1

編寫 配置 example.conf 檔案,放到 $FLUME_HOME/conf 目錄下:

#example.conf:單節點Flume配置

#為此代理命名元件
a1.sources =  r1 
a1.sinks  =  k1 
a1.channels  =  c1

#描述/配置源
a1.sources.r1.type  =  netcat 
a1.sources.r1.bind  =  localhost 
a1.sources.r1.port  =  44444

#描述接收器
a1.sinks.k1.type  =  logger

#使用緩衝記憶體中事件的通道
a1.channels.c1.type  =  memory 
a1.channels.c1.capacity  =  1000 
a1.channels.c1.transactionCapacity  =  100

#將源和接收器繫結到通道
a1.sources.r1.channels  =  c1 
a1.sinks.k1.channel  =  c1

啟動 一個 agent

  flume-ng  agent --name a1 --conf  $FLUME_HOME/conf --conf-file  $FLUME_HOME/conf/example.conf  -Dflume.root.logger

=INFO,console

使用 telnet 命名進行測試 telnet  hadoop000 44444

輸入 任意字串 ,可以看到 flume 的控制檯接收到了我們 輸入的 內容如下:

 [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F                                  hello world.}

Event 是Flume 資料傳輸的基本單元

四 、 fluem 實戰(2)監控一個檔案實時採集新增的資料輸出到控制檯

在/home/hadoop/目錄 下 建立一個 hello.txt檔案 ,並向其中輸入內容 

編寫配置檔案 

#example.conf:單節點Flume配置

#為此代理命名元件
a1.sources =  r1 
a1.sinks  =  k1 
a1.channels  =  c1

#描述/配置源
a1.sources.r1.type  =  exec
a1.sources.r1.command = tail -F /home/hadoop/hello.txt
a1.sources.r1.shell = /bin/bash -c


#描述接收器
a1.sinks.k1.type  =  logger

#使用緩衝記憶體中事件的通道
a1.channels.c1.type  =  memory 
a1.channels.c1.capacity  =  1000 
a1.channels.c1.transactionCapacity  =  100

#將源和接收器繫結到通道
a1.sources.r1.channels  =  c1 
a1.sinks.k1.channel  =  c1

啟動 agent:

flume-ng  agent --name a1 --conf  $FLUME_HOME/conf --conf-file  $FLUME_HOME/conf/example.conf  -Dflume.root.logger

=INFO,console

執行 touch  hello world >> hello.txt 進行測試,看到 flume 控制檯接收到 資料即可。

2018-12-25 18:28:21,386 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

五、flume實戰(3)將A伺服器上面的日誌採集到B伺服器上

 

技術選型:exec  source  + memory channel +avro sink

                  avro  source + memory channel + logger sink

編寫 配置  exec-memory-avro.conf



#為此代理命名元件
exec-memory-avro.sources =  exec-source
exec-memory-avro.sinks  =  avro-sink 
exec-memory-avro.channels  =  memory-channel

#描述/配置源
exec-memory-avro.sources.exec-source.type  =  exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/hello.txt
exec-memory-avro.sources.exec-source.shell = /bin/bash -c


#描述接收器
exec-memory-avro.sinks.avro-sink.type  =  avro
exec-memory-avro.sinks.avro-sink.hostname  =  hadoop000
exec-memory-avro.sinks.avro-sink.port  =  44444

#使用緩衝記憶體中事件的通道
exec-memory-avro.channels.memory-channel.type  =  memory 
exec-memory-avro.channels.memory-channel.capacity  =  1000 
exec-memory-avro.channels.memory-channel.transactionCapacity  =  100

#將源和接收器繫結到通道
exec-memory-avro.sources.exec-source.channels  =  memory-channel
exec-memory-avro.sinks.avro-sink.channel  =  memory-channel

編寫 配置  avro-memory-logger.conf



#為此代理命名元件
 avro-memory-logger.sources =  avro-source
 avro-memory-logger.sinks  =  logger-sink 
 avro-memory-logger.channels  =  memory-channel

#描述/配置源
 avro-memory-logger.sources.avro-source.type  =  avro
 avro-memory-logger.sources.avro-source.bind = hadoop000
 avro-memory-logger.sources.avro-source.port = 44444


#描述接收器
avro-memory-logger.sinks.logger-sink.type  =  logger

#使用緩衝記憶體中事件的通道
avro-memory-logger.channels.memory-channel.type  =  memory 
avro-memory-logger.channels.memory-channel.capacity  =  1000 
avro-memory-logger.channels.memory-channel.transactionCapacity  =  100

#將源和接收器繫結到通道
avro-memory-logger.sources.avro-source.channels  =  memory-channel
avro-memory-logger.sinks.logger-sink.channel  =  memory-channel

日誌收集過程:

  •  機器上A 上監控一個檔案,當我們訪問主站時候會有使用者行為日誌記錄到 access.log中
  • avro sink把新產生的日誌輸出到對應的avro source 指定的hostname  和 port 上
  • 通過avro  source 對應的 agent 將我們的 日誌輸出到控制檯或者(kafka)

先啟動 B 伺服器 agent

flume-ng  agent --name avro-memory-logger --conf  $FLUME_HOME/conf --conf-file  $FLUME_HOME/conf/avro-memory-logger.conf  -Dflume.root.logger=INFO,console

在啟動A伺服器 agent

flume-ng  agent --name exec-memory-avro  --conf  $FLUME_HOME/conf --conf-file  $FLUME_HOME/conf/exec-memory-avro.conf  -Dflume.root.logger=INFO,console

向 /home/hadoop/hello.txt 檔案中輸出內容, 即可 看到 B 伺服器 flume 控制檯 上接收到A 伺服器上面 sink 過來的內容。