1. 程式人生 > >flume介紹及應用

flume介紹及應用

rec 裏的 tps 說明 行數據 tin oca command 定制

版權聲明:本文為yunshuxueyuan原創文章。
如需轉載請標明出處: http://www.cnblogs.com/sxt-zkys/
QQ技術交流群:299142667

flume的概念

1. flume 作為 cloudera 開發的實時日誌收集系統,受到了業界的認可與廣泛應用。Flume 初始的發行版本目前被統稱為 Flume OG(original generation),屬於 cloudera。但隨著 FLume 功能的擴展,Flume OG 代碼工程臃腫、核心組件設計不合理、核心配置不標準等缺點暴露出來,尤其是在 Flume OG 的最後一個發行版本 0.94.0 中,日誌傳輸不穩定的現象尤為嚴重,為了解決這些問題,2011 年 10 月 22 號,cloudera 完成了 Flume-728,對 Flume 進行了裏程碑式的改動:重構核心組件、核心配置以及代碼架構,重構後的版本統稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。

2. flume的特點:

flume是一個分布式、可靠、和高可用的海量日誌采集、聚合和傳輸的系統。支持在日誌系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(比如文本、HDFS、Hbase等)的能力 。

  flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位,它攜帶日誌數據(字節數組形式)並且攜帶有頭信息,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另一個Source。

3. flume的可靠性

當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功後,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復後,繼續發送),Besteffort(數據發送到接收方後,不會進行確認)。

4. flume的可恢復性

還是靠Channel。推薦使用FileChannel,事件持久化在本地文件系統裏(性能較差)。

5. flume的一些核心概念

Agent:使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個 agent中

包含多個sources和sinks。

Client:生產數據,運行在一個獨立的線程。

Source:從Client收集數據,傳遞給Channel。

Sink:從Channel收集數據,運行在一個獨立線程。

Channel:連接 sources 和 sinks ,這個有點像一個隊列。

Events:可以是日誌記錄、 avro 對象等。

event的概念

介紹一下flume中event的相關概念:flume的核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。為了保證輸送的過程一定成功,在送到目的地(sink)之前,會先緩存數據(channel),待數據真正到達目的地(sink)後,flume在刪除自己緩存的數據。

在整個數據的傳輸的過程中,流動的是event,即事務保證是在event級別進行的。那麽什麽是event呢?—–event將傳輸的數據進行封裝,是flume傳輸數據的基本單位,如果是文本文件,通常是一行記錄,event也是事務的基本單位。event從source,流向channel,再到sink,本身為一個字節數組,並可攜帶headers(頭信息)信息。event代表著一個數據的最小完整單元,從外部數據源來,向外部的目的地去。

為了方便大家理解,給出一張event的數據流向圖:

技術分享

flume架構

技術分享

flume之所以這麽神奇,是源於它自身的一個設計,這個設計就是agent,agent本身是一個Java進程,運行在日誌收集節點—所謂日誌收集節點就是服務器節點。

agent裏面包含3個核心的組件:source—->channel—–>sink,類似生產者、倉庫、消費者的架構。

source:source組件是專門用來收集數據的,可以處理各種類型、各種格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。

channel:source組件把數據收集來以後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。

sink:sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、kafaka、自定義。

flume source

Source類型:

Avro Source: 支持Avro協議(實際上是Avro RPC),內置支持

Thrift Source: 支持Thrift協議,內置支持

Exec Source: 基於Unix的command在標準輸出上生產數據

JMS Source: 從JMS系統(消息、主題)中讀取數據

Spooling Directory Source: 監控指定目錄內數據變更

Twitter 1% firehose Source: 通過API持續下載Twitter數據,試驗性質

Netcat Source: 監控某個端口,將流經端口的每一個文本行數據作為Event輸入

Sequence Generator Source: 序列生成器數據源,生產序列數據

Syslog Sources: 讀取syslog數據,產生Event,支持UDP和TCP兩種協議

HTTP Source: 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式

Legacy Sources: 兼容老的Flume OG中Source(0.9.x版本)

flume channel

Channel類型:

Memory Channel:Event數據存儲在內存中

JDBC Channel:Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby

File Channel:Event數據存儲在磁盤文件中

Spillable Memory Channel:Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持

久化到磁盤文件

Pseudo Transaction Channel:測試用途

Custom Channel:自定義Channel實現

flume sink

Sink類型 說明

HDFS Sink:數據寫入HDFS

Logger Sink:數據寫入日誌文件

Avro Sink:數據被轉換成Avro Event,然後發送到配置的RPC端口上

Thrift Sink:數據被轉換成Thrift Event,然後發送到配置的RPC端口上

IRC Sink:數據在IRC上進行回放

File Roll Sink:存儲數據到本地文件系統

Null Sink:丟棄到所有數據

HBase Sink:數據寫入HBase數據庫

Morphline Solr Sink:數據發送到Solr搜索服務器(集群)

ElasticSearch Sink:數據發送到Elastic Search搜索服務器(集群)

Kite Dataset Sink:寫數據到Kite Dataset,試驗性質的

Custom Sink:自定義Sink實現

flume運行機制

flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據的輸入——source,一個是數據的輸出sink,sink負責將數據發送到外部指定的目的地。source接收到數據之後,將數據發送給channel,chanel作為一個數據緩沖區會臨時存放這些數據,隨後sink會將channel中的數據發送到指定的地方—-例如HDFS等,註意:只有在sink將channel中的數據成功發送出去之後,channel才會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。

flume的廣義用法

flume之所以這麽神奇—-其原因也在於flume可以支持多級flume的agent,即flume可以前後相繼,例如sink可以將數據寫到下一個agent的source中,這樣的話就可以連成串了,可以整體處理了。flume還支持扇入(fan-in)、扇出(fan-out)。所謂扇入就是source可以接受多個輸入,所謂扇出就是sink可以將數據輸出多個目的地destination中。

技術分享

flume安裝

1. 下載源碼包,上傳到集群的節點:

2. 解壓到指定目錄

3. 修改conf/flume.env.sh:

技術分享

註意:JAVA_OPTS 配置 如果我們傳輸文件過大 報內存溢出時 需要修改這個配置項

4. 配置環境變量

技術分享

刷新profile文件:source /etc/profile

5. 驗證安裝是否成功

技術分享

flume應用

案例1

http://flume.apache.org/FlumeUserGuide.html#a-simple-example

配置文件simple.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
技術分享技術分享

啟動flume

flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

安裝telnet

yum install telnet

Memory Chanel 配置

capacity:默認該通道中最大的可以存儲的event數量是100,

trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100

keep-alive:event添加到通道中或者移出的允許時間

byte**:即event的字節量的限制,只包括eventbody

案例2、兩個flume做集群

node01服務器中,配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444



# Describe the sink

# a1.sinks.k1.type = logger

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node2

a1.sinks.k1.port = 60000



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
技術分享技術分享

node02服務器中,安裝Flume(步驟略)

配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.bind = node2

a1.sources.r1.port = 60000



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
技術分享技術分享

先啟動node02的Flume

flume-ng agent -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console

再啟動node01的Flume

flume-ng agent -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console

打開telnet 測試 node02控制臺輸出結果

案例3、Exec Source

http://flume.apache.org/FlumeUserGuide.html#exec-source

配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/flume.exec.log



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
技術分享技術分享

啟動Flume

flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console

創建空文件演示 touch flume.exec.log

循環添加數據

for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done

案例4、Spooling Directory Source

http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source

配置文件

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/logs

a1.sources.r1.fileHeader = true



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1
技術分享技術分享

啟動Flume

flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

拷貝文件演示

mkdir logs

cp flume.exec.log logs/

案例5、hdfs sink

http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

配置文件

############################################################

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/logs

a1.sources.r1.fileHeader = true

# Describe the sink

***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M

##每隔60s或者文件大小超過10M的時候產生新文件

# hdfs有多少條消息時新建文件,0不基於消息個數

a1.sinks.k1.hdfs.rollCount=0

# hdfs創建多長時間新建文件,0不基於時間

a1.sinks.k1.hdfs.rollInterval=60

# hdfs多大時新建文件,0不基於文件大小

a1.sinks.k1.hdfs.rollSize=10240

# 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件

a1.sinks.k1.hdfs.idleTimeout=3

a1.sinks.k1.hdfs.fileType=DataStream

a1.sinks.k1.hdfs.useLocalTimeStamp=true

## 每五分鐘生成一個目錄:

# 是否啟用時間上的”舍棄”,這裏的”舍棄”,類似於”四舍五入”,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式

a1.sinks.k1.hdfs.round=true

# 時間上進行“舍棄”的值;

a1.sinks.k1.hdfs.roundValue=5

# 時間上進行”舍棄”的單位,包含:second,minute,hour

a1.sinks.k1.hdfs.roundUnit=minute

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

############################################################

創建HDFS目錄

hadoop fs -mkdir /flume

啟動Flume

flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

查看hdfs文件

hadoop fs -ls /flume/...

hadoop fs -get /flume/...

版權聲明:本文為yunshuxueyuan原創文章。
如需轉載請標明出處:http://www.cnblogs.com/sxt-zkys/
QQ技術交流群:299142667

flume介紹及應用