1. 程式人生 > >flume 一級配置和多級配置詳解

flume 一級配置和多級配置詳解

    個人不太喜歡過多的介紹理論,理論在任何一個部落格中都能查到,如果下個瞭解flume 的工作原理,請到別處尋找,如果子昂要找到方案的解決辦法,恭喜你找對了。同時本人不喜歡專門排版,太浪費時間,還不如利用時間,多研究一下乾貨。望諒解。

    在實際應用中,主要多級flume搭建比較常用,在此僅僅以多級 flume 為例,進行配置和研究。

1 Flume的安裝

1.1 安裝JDK

具體方法略。網上很多,另覓他處

1.2 下載安裝包並解壓

$ wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.1.tar.gz

$ tar -xvf flume-ng-1.6.0-cdh5.7.1.tar.gz

$ rm flume-ng-1.6.0-cdh5.7.1.tar.gz

$ mv apache-flume-1.6.0-cdh5.7.1-binflume-1.6.0-cdh5.7.1

1.3 配置環境變數

$ cd /home/hadoop

配置環境變數

$ vim .bash_profile

export FLUME_HOME=/home/hadoop/app/cdh/flume-1.6.0-cdh5.7.1

export PATH=$PATH:$FLUME_HOME/bin

使環境變數生效

$ source .bash_profile

1.4 配置flume-env.sh檔案

$ cd app/cdh/flume-1.6.0-cdh5.7.1/conf/

$ cp flume-env.sh.template flume-env.sh

$ vim flume-env.sh

export JAVA_HOME=/home/hadoop/app/jdk1.7.0_79

export HADOOP_HOME=/home/hadoop/app/cdh/hadoop-2.6.0-cdh5.7.1

1.5 版本驗證

$ flume-ng version

[[email protected] conf]# flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f

恭喜你,安裝成功,接下來主要配置

2 多級 flume 配置的資訊


2.1  一級 flume ,從  網路埠 或者 檔案中獲取獲取日誌

在 /apache-flume-1.6.0-bin/conf 目錄下建立,netcat-logger.conf,檔案中的內容如下

[[email protected] conf]# more netcat-logger.conf

n/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
########
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source   從檔案中獲取日誌
# get log from file
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F /opt/123.txt
#a1.sources.r1.channels = c1

#get log from network port   從網路埠中獲取日誌
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2 一級 flume 配置多個 source ,共同利用一個 channel 和一個 sink  將資料寫入  hdfs 中

########
# Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# get log from file
#a1.sources.r1.type = exec
#a1.sources.r1.command = tail -F /opt/123.txt
#a1.sources.r1.channels = c1

#get log from network port
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r2.type = netcat
a1.sources.r2.bind = localhost
a1.sources.r2.port = 44445

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = master

a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c1

2.3 二級 flume 配置。讀取到檔案的內容之後,將檔案的內容儲存到 hdfs  或者控制檯


#定義三大元件的名稱
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1

# 配置source元件
agent1.sources.source1.type = avro
agent1.sources.source1.channels = channel1
agent1.sources.source1.bind = 0.0.0.0
agent1.sources.source1.port = 4141
#agent1.sources.source1.spoolDir = /home/hadoop/logs/
#agent1.sources.source1.fileHeader = false

# 配置sink元件
# 將日誌列印到控制檯
#agent1.sinks.sink1.type = logger


# 將檔案儲存到 hdfs 中國
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
# 檔案的路徑  檔案下沉目標
#agent1.sinks.sink1.hdfs.path =hdfs://master:8022/flume/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.path =hdfs://master:8022/flume/dt=%y-%m-%d
# 檔名的字首
#agent1.sinks.sink1.hdfs.filePrefix = access_log
#agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
# 5 個事件就往裡面寫入
agent1.sinks.sink1.hdfs.batchSize= 100


#下沉後, 生成的檔案型別,預設是Sequencefile,可用DataStream,則為普通文字  
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#檔案滾動大的大小限制(bytes)
agent1.sinks.sink1.hdfs.rollSize = 1024
#寫入多少個 event 資料後滾動檔案(事件個數)
agent1.sinks.sink1.hdfs.rollCount = 100
#檔案回滾之前等待的時間
agent1.sinks.sink1.hdfs.rollInterval = 60

# 10 分鐘就建立檔案
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
#用本地時間格式化目錄
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

啟動 flume 命令:

flume-ng agent --conf conf --conf-file avro-file_roll.conf --name agent1

3 常見的  source  型別

3.1 : avro  source

avro可以監聽和收集指定埠的日誌,使用avro的source需要說明被監聽的主機ip和埠號,下面給出一個具體的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

3.2:  exec source

exec可以通過指定的操作對日誌進行讀取,使用exec時需要指定shell命令,對日誌進行讀取,下面給出一個具體的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

3.3: spooling-directory source

spo_dir可以讀取資料夾裡的日誌,使用時指定一個資料夾,可以讀取該資料夾中的所有檔案,需要注意的是該資料夾中的檔案在讀取過程中不能修改,同時檔名也不能修改。下面給出一個具體的例子:

agent-1.channels = ch-1

agent-1.sources = src-1

agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true

3.4: syslog source

syslog可以通過syslog協議讀取系統日誌,分為tcp和udp兩種,使用時需指定ip和埠,下面給出一個udp的例子:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

4 常見的 channel 型別


Flume的channel種類並不多,最常用的是memory channel,下面給出例子:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

5  常見的  sink  型別



5.1 logger sink

logger顧名思義,就是將收集到的日誌寫到flume的log中,是個十分簡單但非常實用的sink
簡單實用,但是在實際專案中沒有太大用處。
只需將型別指定為 logger 就可以了
agent1.sinks.sink1.type = logger

5.2: avro sink

avro可以將接受到的日誌傳送到指定埠,供級聯agent的下一跳收集和接受日誌,使用時需要指定目的ip和埠:例子如下:
上面我給出的例子就是用的是這種方法

a1.channels = c1
a1.sinks = k1

a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10(目標主機的IP 和 埠號)
a1.sinks.k1.port = 4545

5.3: file roll sink

file_roll可以將一定時間內收集到的日誌寫到一個指定的檔案中,具體過程為使用者指定一個資料夾和一個週期,然後啟動agent,這時該資料夾會產生一個檔案將該週期內收集到的日誌全部寫進該檔案內,直到下一個週期再次產生一個新檔案繼續寫入,以此類推,周而復始。下面給出一個具體的例子:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

5.4:hdfs sink

hdfs與file roll有些類似,都是將收集到的日誌寫入到新建立的檔案中儲存起來,但區別是file roll的檔案儲存路徑為系統的本地路徑,而hdfs的儲存路徑為分散式的檔案系統hdfs的路徑,同時hdfs建立新檔案的週期可以是時間,也可以是檔案的大小,還可以是採集日誌的條數。具體例項如下:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

這裡主要對 sink 配置的引數進行詳解

有哪個引數不明白,都可以在此查詢到

agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d

hdfs 檔名字首
agent_lxw1234.sinks.sink1.hdfs.filePrefix = log_%Y%m%d_%H

寫入的 hdfs 檔名字尾
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .lzo


臨時檔名字首
inUsePrefix


預設值:.tmp     臨時檔案的檔名字尾
inUseSuffix

agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.fileType = CompressedStream
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0

預設值:1024
當臨時檔案達到該大小(單位:bytes)時,滾動成目標檔案;
如果設定成0,則表示不根據臨時檔案大小來滾動檔案;
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0


預設值:10
當events資料達到該數量時候,將臨時檔案滾動成目標檔案;
如果設定成0,則表示不根據events資料來滾動檔案;
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600


檔案壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy
agent_lxw1234.sinks.sink1.hdfs.codeC = lzop


預設值:100
每個批次重新整理到HDFS上的events數量;
agent_lxw1234.sinks.sink1.hdfs.batchSize = 100

agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10

fileType

預設值:SequenceFile
檔案格式,包括:SequenceFile, DataStream,CompressedStream
當使用DataStream時候,檔案不會被壓縮,不需要設定hdfs.codeC;
當使用CompressedStream時候,必須設定一個正確的hdfs.codeC值;



maxOpenFiles
預設值:5000
最大允許開啟的HDFS檔案數,當開啟的檔案數達到該值,最早開啟的檔案將會被關閉;




預設值:0
當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,
則將該臨時檔案關閉並重命名成目標檔案;
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0


writeFormat
寫sequence檔案的格式。包含:Text, Writable(預設)


callTimeout
預設值:10000
執行HDFS操作的超時時間(單位:毫秒);


threadsPoolSize
預設值:10
hdfs sink啟動的操作HDFS的執行緒數。


rollTimerPoolSize
預設值:1
hdfs sink啟動的根據時間滾動檔案的執行緒數




kerberosPrincipal
HDFS安全認證kerberos配置;


kerberosKeytab
HDFS安全認證kerberos配置;


proxyUser
代理使用者


round
預設值:false
是否啟用時間上的”捨棄”,這裡的”捨棄”,類似於”四捨五入”,後面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式;


roundValue
預設值:1
時間上進行“捨棄”的值;


roundUnit
預設值:seconds
時間上進行”捨棄”的單位,包含:second,minute,hour
示例:
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

當時間為2015-10-16 17:38:59時候,hdfs.path依然會被解析為:
/flume/events/20151016/17:30/00
因為設定的是捨棄10分鐘內的時間,因此,該目錄每10分鐘新生成一個。

timeZone
預設值:Local Time
時區。

useLocalTimeStamp
預設值:flase
是否使用當地時間。

closeTries
預設值:0
hdfs sink關閉檔案的嘗試次數;
如果設定為1,當一次關閉檔案失敗後,hdfs sink將不會再次嘗試關閉檔案,這個未關閉的檔案將會一直留在那,並且是開啟狀態。
設定為0,當一次關閉失敗後,hdfs sink會繼續嘗試下一次關閉,直到成功。


retryInterval
預設值:180(秒)
hdfs sink嘗試關閉檔案的時間間隔,如果設定為0,表示不嘗試,相當於於將hdfs.closeTries設定成1.

serializer
預設值:TEXT
序列化型別。其他還有:avro_event或者是實現了EventSerializer.Builder的類名。


5.5:hbase sink

hbase是一種資料庫,可以儲存日誌,使用時需要指定儲存日誌的表名和列族名,然後agent就可以將收集到的日誌逐條插入到資料庫中。例子如下:

a1.channels = c1
a1.sinks = k1

a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1










相關推薦

flume 一級配置多級配置

    個人不太喜歡過多的介紹理論,理論在任何一個部落格中都能查到,如果下個瞭解flume 的工作原理,請到別處尋找,如果子昂要找到方案的解決辦法,恭喜你找對了。同時本人不喜歡專門排版,太浪費時間,還不如利用時間,多研究一下乾貨。望諒解。    在實際應用中,主要多級flum

五.linux開發之uboot移植(五)——uboot配置編譯過程

一.uboot主Makefile分析1 1、uboot version確定(Makefile的24-29行) Makefile程式碼部分 (1)uboo

Linux核心配置編譯過程

一、引言:  本文件的內容大部份內容都是從網上收集而來,然後配合一些新的截 圖(核心版本:V2.4.19)。在每一配置項後會有一個選擇指南的部份,用來指導大家怎麼樣 根據自己的情況來做相應的選擇;還有在每一個大項和文件的最後會有一個經驗談,它是一些高手們在應對問題和處理特有

linux kafka叢集配置測試圖文

一.前期準備 1.1 Win7官網下載kafka包 本文使用版本kafka_2.10-0.10.1.0.tgz 1.2 配置jdk、scala、zookeeper jdk,scala,

Redis 主從配置參數

value ppi threads shu 如何 指令 支持 aof pac 安裝redis 下載redis wget http://download.redis.io/releases/redis-3.0.7.tar.gz 解壓redis tar -x

Spring aop 配置註解及

Spring aopaop:(Aspect Oriented Programming)面向切面程式設計,縱向重複,橫向抽取 代理: 生活中的例項:找明星拍戲,上綜藝。拍戲   直接找明星,說明明星太小。如果明星有點名氣,那就不能直接訪問了,必須先通過先訪問明星的經紀人,然後由

hibernate教程--常用配置核心API

一、Hibernate的常用的配置及核心API. 1.1 Hibernate的常見配置: 1.1.1、核心配置: 核心配置有兩種方式進行配置:  1)屬性檔案的配置: * hibernate.properties * 格式: * key=value  hibernate.c

java架構之路-(分散式zookeeper)zookeeper叢集配置選舉機制

  上次部落格我們說了一下zookeeper的配置檔案,以及命令的使用https://www.cnblogs.com/cxiaocai/p/11597465.html。我們這次來說一下我們的zookeeper的叢集配置和java的API相關操作。 叢集:   一般情況下我們用zookeeper來做任務排程中心

MyBatis一級快取二級快取

一級快取   Mybatis對快取提供支援,但是在沒有配置的預設情況下,它只開啟一級快取,一級快取只是相對於同一個SqlSession而言。所以在引數和SQL完全一樣的情況下,我們使用同一個SqlSession物件呼叫一個Mapper方法,往往只執行一次SQL,因為使用SelSession第一次

Mybatis的一級快取二級快取

注:本筆記是根據尚矽谷的MyBatis視訊記錄的 對於任何一個持久層框架,都有快取機制;快取在電腦中有一塊真實的儲存空間(https://baike.baidu.com/item/%E7%BC%93%E5%AD%98/100710?fr=aladdin); 兩個關於mybatis快取額外的連

Hibernate一級快取二級快取

一、一級快取二級快取的概念解釋 (1)一級快取就是Session級別的快取,一個Session做了一個查詢操作,它會把這個操作的結果放在一級快取中,如果短時間內這個 session(一定要同一個se

SpringMVCSpring的配置檔案掃描包

其實Spring和SpringMVC是有父子容器關係的,而且正是因為這個才往往會出現包掃描的問題,我們在此來分析和理解Spring和SpringMVC的父子容器關係並且給出Spring和SpringMVC配置檔案中包掃描的官方推薦方式。   在Spring整體框架的核

Linux系統中定時任務croncrontab命令配置規則說明

cron機制         cron可以讓系統在指定的時間,去執行某個指定的工作,我們可以使用crontab指令來管理cron機制 crontab引數         -u:這個引數可以讓我們去編輯其他人的crontab,如果沒有加上這個引數的話就會開啟自己的crontab

redis cluster配置檔案叢集狀態

redis cluster命令 叢集(cluster)   cluster info       列印叢集的資訊 cluster nodes   列出叢集當前已知的所有節點(node),以及這些節點的相關資訊   節點(node)   cluster meet <ip

springboot2.0日誌配置 logback的使用logback.xml

只根據上面的我們會發現,日誌要麼輸出到檔案。要麼輸出到控制檯不能有選擇的輸出滿足我們的要求,這是需要看下面這個文章, 這兩篇文章結合,基本滿足我們實際應用的需要。 為了使我們的日誌更加利於觀看,我們可以把日誌做成html格式的, <?xml v

ssh整合下的配置檔案引數存放位置

本人初級程式設計師,今天無聊做了個demo,此demo是SSH框架。由於太久沒有搭建,前前後後也出現很多瑣碎的問題,特地總結配置檔案的存放和配置。 專案目錄如圖: . hibernate.cfg.xml和struts.xml放在src/main/resources目錄下

在Linux平臺上安裝配置Ruby on Rails

ruby on rails推薦的生產執行環境是Linux/FreeBSD/Unix,即Unix系列的作業系統,採用lighttpd+FCGI的解決方案。以下我將以Linux作業系統,lighttpd+FCGI,MySQL資料庫為例,從原始碼編譯安裝開始講解。 在安裝之前,應

[Struts2] 配置檔案struts.xmlweb.xml

 配置檔案struts.xml和web.xml。 其實要使Struts2可以工作,配置很簡單,套模板就好了。而且基本與版本無關。 如,只要struts2需要的基本jar包已經引入,那麼直接配置一下web.xml和struts.xml檔案。 web.xml中包含: <

[Hibernate]七種關聯關係配置檔案測試例項

用了一天整理下來。所有關係分為以下七種:單向【1-1】雙向【1-1】單向【1-N】雙向【1-N】單向【N-1】單向【N-N】雙向【N-N】1單向【1-1】基於外來鍵的單向【1-1】是【N-1】的特殊形式,要求【N】方唯一。基於外來鍵的單向1-1只需要在原有的<many-

Hibernate配置檔案核心配置檔案及其API

Hibernate對映配置檔案 對映配置檔名稱和位置沒有固定要求 對映配置檔案中,標籤name屬性值寫實體類相關內容 class標籤name屬性值實體類全路徑 table是你想要建立的資料表名