1. 程式人生 > >Flume的體系結構介紹以及Flume入門案例(往HDFS上傳資料)

Flume的體系結構介紹以及Flume入門案例(往HDFS上傳資料)



# Flume的體系結構

java有興趣的朋友可以加上面說的553175249這個群哦,一起學習,共同進步 .

# Flume介紹

Flume是Cloudera提供的一個高可用的高可靠的分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

# 系統功能

# 日誌收集

Flume最早是Cloudera提供的日誌收集系統,目前是Apache下的一個孵化專案,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料。

# 資料處理

Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力Flume提供了從console(控制檯)、RPC(Thrift-RPC)、text(檔案)、tail(UNIX tail)、syslog(syslog日誌系統,支援TCP和UDP等2種模式),exec(命令執行)等資料來源上收集資料的能力。

# 工作方式 (Flume-NG舊版本的概念,新版本已經丟棄)

Flume採用了多Master的方式。為了保證配置資料的一致性,Flume引入了ZooKeeper,用於儲存配置資料,ZooKeeper本身可保證配置資料的一致性和高可用,另外,在配置資料發生變化時,ZooKeeper可以通知FlumeMaster節點。Flume Master間使用gossip協議同步資料。

# Flume的設計目標(百度百科)

# 可靠性 

當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Besteffort(資料傳送到接收方後,不會進行確認)。

# 可擴充套件性(Flume-NG舊版本的概念,新版本已經丟棄)

Flume採用了三層架構,分別為agent,collector和storage,每一層均可以水平擴充套件。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。

# 可管理性(Flume-NG舊版本的概念,新版本已經丟棄)

所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置資料的一致性。使用者可以在master上檢視各個資料來源或者資料流執行情況,且可以對各個資料來源配置和動態載入。Flume提供了web和shell script command兩種形式對資料流進行管理。

# 功能可擴充套件性

使用者可以根據需要新增自己的agent,collector或者storage。此外,Flume自帶了很多元件,包括各種agent(file,syslog等),collector和storage(file,HDFS等)。(這裡看下面的Flume架構圖你就明白了)

# Flume架構

# Flume基礎架構,如下圖:

  

這是一個flume-ng 最簡單的圖。flume-ng 是由一個個agent組成的。一個agent就像一個細胞一樣。

# Flume的多agent架構,如下圖:

    

   上面是兩個agent連結在一起的,再看看更多的......

# Flume的合併(合作)架構,如下圖:


你是不是覺得這種設計是不是吊炸天了,可以隨意組合,跟搭積木一樣。跟Storm的設計思想是差不多的,何止吊炸天啊,簡直就是吊炸天 、、、

# Flume的多路複用架構,如下圖:

      

# agent的構造

每個agent裡都有三部分構成:source、channel和sink。

就相當於source接收資料,通過channel傳輸資料,sink把資料寫到下一端。這就完了,就這麼簡單。其中source有很多種可以選擇,channel有很多種可以選擇,sink也同樣有多種可以選擇,並且都支援自定義。餓靠!太靈活了。想怎麼玩就怎麼玩,這你妹的!

同時,如上上圖所示,agent還支援選擇器,就是一個source支援多個channel和多個sink,這樣就完成了資料的分發。

這就完了,flume-ng就這麼簡單........

從看到最後用,一天足可以搞定。剩下的就是怎麼組織你的agent的問題了。也就是搭積木的過程......

另外有一點需要強調的是,flume-ng提供了一種特殊的啟動方式(不同於agent),那就是client啟動。cilent是一個特殊的agent, 他的source是檔案,channel是記憶體,sink是arvo。實際上是為了方便大家用,直接來傳遞檔案的。具體可以看看官方使用手冊。

估計到這兒,應該對flume-ng有了解了吧 、、、on my god、、、


# Flume的安裝

# 下載 flume(使用wget下載)

[[email protected] flume]# wget -c -P /root http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz

# 安裝

[email protected] flume]# pwd

/usr/local/adsit/yting/apache/flume

[[email protected] flume]# ll

total 4

drwxr-xr-x 3 root root 4096 Jun 24 17:25mirrors.cnnic.cn

[[email protected] flume]# cpmirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz .

[[email protected] flume]# ll

total 25276

-rw-r--r-- 1 root root 25876246 Jun 24 17:27apache-flume-1.5.0-bin.tar.gz

drwxr-xr-x 3 root root     4096 Jun 24 17:25 mirrors.cnnic.cn

[[email protected] flume]# tar -zxvfapache-flume-1.5.0-bin.tar.gz

[[email protected] flume]# ll

total 25280

drwxr-xr-x 7 root root     4096 Jun 24 17:27 apache-flume-1.5.0-bin

-rw-r--r-- 1 root root 25876246 Jun 24 17:27apache-flume-1.5.0-bin.tar.gz

drwxr-xr-x 3 root root     4096 Jun 24 17:25 mirrors.cnnic.cn

[[email protected] flume]# rm -rfapache-flume-1.5.0-bin.tar.gz

[[email protected] flume]# rm -rf mirrors.cnnic.cn/

[[email protected] flume]# ll

total 4

drwxr-xr-x 7 root root 4096 Jun 24 17:27apache-flume-1.5.0-bin

[[email protected] flume]#

# 修改 flume-env.sh 配置檔案

[[email protected] conf]# pwd

/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf

[[email protected] conf]# ll

total 12

-rw-r--r-- 1 501 games 1661 Mar 29 06:15flume-conf.properties.template

-rw-r--r-- 1 501 games 1197 Mar 29 06:15 flume-env.sh.template

-rw-r--r-- 1 501 games 3063 Mar 29 06:15log4j.properties

[[email protected] conf]# cp flume-env.sh.templateflume-env.sh

[[email protected] conf]# vi flume-env.sh

# Enviroment variables can be sethere.

JAVA_HOME=/usr/local/adsit/yting/jdk/jdk1.7.0_60

# 修改 flume-site.xml 配置檔案(貌似沒有該步驟,貌似也可以修改,研究後再來弄吧!)

# 驗證 flume是否安裝成功

[[email protected] conf]# ../bin/flume-ng version

Flume 1.5.0

Source code repository:https://Git-wip-us.apache.org/repos/asf/flume.git

Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97

Compiled by hshreedharan on Wed May  7 14:49:18 PDT 2014

From source with checksuma01fe726e4380ba0c9f7a7d222db961f

出現這樣的資訊表示安裝成功了

# Flume 入門案例

# Flume監控指定目錄下的日誌資訊,並將日誌資訊上傳到HDFS中去

# 在conf目錄下新建example.conf配置檔案

新建檔案:在conf目錄下新建一個example.conf檔案(隨便起什麼名字),當然隨便哪裡都行

注意:檔名最好跟配置中的名字一樣,比如裡面的agent1最好跟外面的檔名一樣,見名知意

[[email protected] conf]# pwd

/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf

[[email protected] apache-flume-1.5.0-bin]# catconf/example.conf

# agent1 : yting first flume example

agent1.sources=source1

agent1.sinks=sink1

agent1.channels=channel1

# configure source1

agent1.sources.source1.type=spooldir

agent1.sources.source1.spoolDir=/usr/local/yting/flume/tdata/tdir1

agent1.sources.source1.channels=channel1

agent1.sources.source1.fileHeader = false

# configure sink1

agent1.sinks.sink1.type=hdfs

agent1.sinks.sink1.hdfs.path=hdfs://rs229:9000/yting/flumet

agent1.sinks.sink1.hdfs.fileType=DataStream

agent1.sinks.sink1.hdfs.writeFormat=TEXT

agent1.sinks.sink1.hdfs.rollInterval=4

agent1.sinks.sink1.channel=channel1

# configure channel1

agent1.channels.channel1.type=file

agent1.channels.channel1.checkpointDir=/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001

agent1.channels.channel1.dataDirs=/usr/local/yting/flume/datadirs/tddirs/example_agent1_001

注意:紅色字型部分自己修改成自己對應的目錄了

# 執行Flume 使用example.conf

#命令引數說明

-c conf 指定配置目錄為conf

-f conf/example.conf 指定配置檔案為conf/example.conf

-n agent1 指定agent名字為agent1,需要與example.conf中的一致(這裡不一致,可能會一直停在那裡,請參考筆記中後面的錯誤全集Flume部分,那裡介紹了錯誤的分析,原因,解決)

-Dflume.root.logger=INFO,console 指定DEBUF模式在console輸出INFO資訊

[[email protected]]# ./bin/flume-ng agent -c conf/ -f conf/example.conf-n agent1-Dflume.root.logger=INFO,console

-bash:./bin/flume-ng: No such file or directory

[[email protected]]# cd ..

[[email protected]]# ./bin/flume-ng agent -c conf/ -f conf/example.conf -nagent1 -Dflume.root.logger=INFO,console

Info: Sourcingenvironment configuration script/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf/flume-env.sh

Info: IncludingHadoop libraries found via(/usr/local/adsit/yting/apache/Hadoop/hadoop-2.2.0/bin/hadoop) for HDFS access

Info: Excluding/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jarfrom classpath

Info: Excluding/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jarfrom classpath

Info: IncludingHBASE libraries found via(/usr/local/adsit/yting/apache/hbase/hbase-0.96.2-hadoop2/bin/hbase) for HBASEaccess

Info: Excluding/usr/local/adsit/yting/apache/hbase/hbase-0.96.2-hadoop2/bin/../lib/slf4j-api-1.6.4.jarfrom classpath

Info: Excluding/usr/local/adsit/yting/apache/hbase/hbase-0.96.2-hadoop2/bin/../lib/slf4j-log4j12-1.6.4.jarfrom classpath

Info: Excluding/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jarfrom classpath

Info: Excluding/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jarfrom classpath

….capacity-scheduler/*.jar:/conf'-Djava.library.path=:/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0/lib:/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0//liborg.apache.flume.node.Application -f conf/example.conf -n agent1

2014-06-2510:37:45,763 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.Java:61)]Configuration provider starting

2014-06-2510:37:45,772 (conf-file-poller-0) [INFO -org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]Reloading configuration file:conf/example.conf

2014-06-2510:37:45,781 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,783 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,783 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)]Added sinks: sink1 Agent: agent1

2014-06-2510:37:45,783 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,783 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,783 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,784 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)]Processing:sink1

2014-06-2510:37:45,809 (conf-file-poller-0) [INFO -org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)]Post-validation flume configuration contains configuration for agents: [agent1]

2014-06-2510:37:45,809 (conf-file-poller-0) [INFO -org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)]Creating channels

2014-06-2510:37:45,823 (conf-file-poller-0) [INFO -org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)]Creating instance of channel channel1 type file

2014-06-2510:37:45,828 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)]Created channel channel1

2014-06-2510:37:45,829 (conf-file-poller-0) [INFO -org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)]Creating instance of source source1, type spooldir

2014-06-2510:37:45,844 (conf-file-poller-0) [INFO -org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)]Creating instance of sink: sink1, type: hdfs

2014-06-2510:37:46,293 (conf-file-poller-0) [WARN -org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:62)]Unable to load native-hadoop library for your platform... using builtin-javaclasses where applicable

2014-06-2510:37:46,572 (conf-file-poller-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink.authenticate(HDFSEventSink.java:555)]Hadoop Security enabled: false

2014-06-2510:37:46,576 (conf-file-poller-0) [INFO -org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)]Channel channel1 connected to [source1, sink1]

2014-06-2510:37:46,587 (conf-file-poller-0) [INFO -org.apache.flume.node.Application.startAllComponents(Application.java:138)]Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: {source:Spool Directory source source1: { spoolDir:/usr/local/yting/flume/tdata/tdir1 } }} sinkRunners:{sink1=SinkRunner: {policy:[email protected] counterGroup:{name:null counters:{} } }} channels:{channel1=FileChannel channel1 { dataDirs:[/usr/local/yting/flume/datadirs/tddirs/example_agent1_001] }} }

2014-06-2510:37:46,593 (conf-file-poller-0) [INFO -org.apache.flume.node.Application.startAllComponents(Application.java:145)]Starting Channel channel1

2014-06-2510:37:46,593 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.FileChannel.start(FileChannel.java:259)] StartingFileChannel channel1 { dataDirs:[/usr/local/yting/flume/datadirs/tddirs/example_agent1_001] }...

2014-06-2510:37:46,617 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.<init>(Log.java:328)] Encryption is notenabled

2014-06-2510:37:46,618 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.replay(Log.java:373)] Replay started

2014-06-2510:37:46,620 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.replay(Log.java:385)] Found NextFileID 0,from []

2014-06-2510:37:46,661 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.EventQueueBackingStoreFile.<init>(EventQueueBackingStoreFile.java:91)]Preallocated/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001/checkpoint to8008232 for capacity 1000000

2014-06-2510:37:46,663 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:53)]Starting up with/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001/checkpoint and/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001/checkpoint.meta

2014-06-2510:37:47,095 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.FlumeEventQueue.<init>(FlumeEventQueue.java:114)]QueueSet population inserting 0 took 0

2014-06-2510:37:47,100 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.replay(Log.java:423)] Last Checkpoint Wed Jun25 10:37:46 CST 2014, queue depth = 0

2014-06-2510:37:47,105 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.doReplay(Log.java:507)] Replaying logs withv2 replay logic

2014-06-2510:37:47,109 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:249)]Starting replay of []

2014-06-2510:37:47,109 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:346)]read: 0, put: 0, take: 0, rollback: 0, commit: 0, skip: 0, eventCount:0

2014-06-2510:37:47,110 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.FlumeEventQueue.replayComplete(FlumeEventQueue.java:407)]Search Count = 0, Search Time = 0, Copy Count = 0, Copy Time = 0

2014-06-2510:37:47,119 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.replay(Log.java:470)] Rolling/usr/local/yting/flume/datadirs/tddirs/example_agent1_001

2014-06-2510:37:47,120 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.roll(Log.java:932)] Roll start/usr/local/yting/flume/datadirs/tddirs/example_agent1_001

2014-06-2510:37:47,137 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize(DirectMemoryUtils.java:113)]Unable to get maxDirectMemory from VM: NoSuchMethodException:sun.misc.VM.maxDirectMemory(null)

2014-06-2510:37:47,140 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.allocate(DirectMemoryUtils.java:47)]Direct Memory Allocation:  Allocation =1048576, Allocated = 0, MaxDirectMemorySize = 18874368, Remaining = 18874368

2014-06-2510:37:47,195 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$Writer.<init>(LogFile.java:214)]Opened /usr/local/yting/flume/datadirs/tddirs/example_agent1_001/log-1

2014-06-2510:37:47,208 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.roll(Log.java:948)] Roll end

2014-06-2510:37:47,208 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:214)]Start checkpoint for/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001/checkpoint,elements to sync = 0

2014-06-2510:37:47,211 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:239)]Updating checkpoint metadata: logWriteOrderID: 1403663867120, queueSize: 0,queueHead: 0

2014-06-2510:37:47,235 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1005)] Updatedcheckpoint for file:/usr/local/yting/flume/datadirs/tddirs/example_agent1_001/log-1 position: 0logWriteOrderID: 1403663867120

2014-06-2510:37:47,235 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.channel.file.FileChannel.start(FileChannel.java:285)] QueueSize after replay: 0 [channel=channel1]

2014-06-2510:37:47,296 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]Monitored counter group for type: CHANNEL, name: channel1: Successfullyregistered new MBean.

2014-06-2510:37:47,296 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Component type: CHANNEL, name: channel1 started

2014-06-2510:37:47,297 (conf-file-poller-0) [INFO -org.apache.flume.node.Application.startAllComponents(Application.java:173)] StartingSink sink1

2014-06-2510:37:47,297 (conf-file-poller-0) [INFO -org.apache.flume.node.Application.startAllComponents(Application.java:184)]Starting Source source1

2014-06-2510:37:47,298 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)]SpoolDirectorySource source starting with directory:/usr/local/yting/flume/tdata/tdir1

2014-06-2510:37:47,300 (lifecycleSupervisor-1-1) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]Monitored counter group for type: SINK, name: sink1: Successfully registerednew MBean.

2014-06-2510:37:47,300 (lifecycleSupervisor-1-1) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Component type: SINK, name: sink1 started

2014-06-2510:37:47,330 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]Monitored counter group for type: SOURCE, name: source1: Successfullyregistered new MBean.

2014-06-2510:37:47,330 (lifecycleSupervisor-1-0) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Component type: SOURCE, name: source1 started

2014-06-2510:37:47,331 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-2510:37:47,831 (pool-6-thread-1) [INFO - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-2510:37:48,332 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

到了這裡說明你的程式執行正常了,但是你的監視目錄 /usr/local/yting/flume/tdata/tdir1下沒有新檔案的產生,所以會一直出現上面的那條資訊

# 在/usr/local/yting/flume/tdata/tdir1這個flume監視目錄下新增一個新檔案yting_flume_example_agent1_00001.log

[[email protected] hadoop-2.2.0]# ./bin/hadoop fs -ls /yting

14/06/25 10:48:00 WARN util.NativeCodeLoader: Unableto load native-hadoop library for your platform... using builtin-java classeswhere applicable

Found 1 items

-rw-r--r--   3root supergroup       4278 2014-06-1018:29 /yting/yarn-daemon.sh

[[email protected] tdir1]# ll

total 0

[[email protected] tdir1]# ll -a

total 12

drwxr-xr-x 3 root root 4096 Jun 25 10:37 .

drwxr-xr-x 3 root root 4096 Jun 24 22:25 ..

drwxr-xr-x 2 root root 4096 Jun 25 09:48 .flumespool(隱藏檔案)

[[email protected] tdir1]# viyting_flume_example_agent1_00001.log

The you smile until forever .....................

[[email protected] tdir1]# ll

total 4

# 檔名變成.COMPLETED結尾

說明該檔案yting_flume_example_agent1_00001.log已經被flume處理了,處理過後的檔名變成yting_flume_example_agent1_00001.log.COMPLETED,接下來看看flume那邊的資訊,應該發生變化了

# 檢視flume shell的資訊變化

2014-06-25 10:51:00,530 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-25 10:51:01,434 (pool-6-thread-1) [INFO -org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:332)]Preparing to move file/usr/local/yting/flume/tdata/tdir1/yting_flume_example_agent1_00001.log to/usr/local/yting/flume/tdata/tdir1/yting_flume_example_agent1_00001.log.COMPLETED

2014-06-25 10:51:02,436 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-25 10:51:02,473(SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO -org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:261)]Creatinghdfs://rs229:9000/yting/flumet/FlumeData.1403664662360.tmp

2014-06-25 10:51:07,440 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-25 10:51:07,519 (hdfs-sink1-roll-timer-0)[INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:409)]Closinghdfs://rs229:9000/yting/flumet/FlumeData.1403664662360.tmp

2014-06-25 10:51:07,521 (hdfs-sink1-call-runner-3)[INFO - org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:339)]Close tries incremented

2014-06-25 10:51:07,549 (hdfs-sink1-call-runner-4)[INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:669)]Renaminghdfs://rs229:9000/yting/flumet/FlumeData.1403664662360.tmp tohdfs://rs229:9000/yting/flumet/FlumeData.1403664662360

2014-06-25 10:51:07,557 (hdfs-sink1-roll-timer-0)[INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:402)]Writer callback called.

2014-06-25 10:51:16,448 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-06-25 10:51:16,626 (Log-BackgroundWorker-channel1)[INFO -org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:214)]Start checkpoint for/usr/local/yting/flume/checkpointdir/tcpdir/example_agent1_001/checkpoint,elements to sync = 1

2014-06-25 10:51:16,628(Log-BackgroundWorker-channel1) [INFO -org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:239)]Updating checkpoint metadata: logWriteOrderID: 1403663867125, queueSize: 0,queueHead: 0

2014-06-25 10:51:16,630(Log-BackgroundWorker-channel1) [INFO -org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1005)] Updatedcheckpoint for file:/usr/local/yting/flume/datadirs/tddirs/example_agent1_001/log-1 position: 206logWriteOrderID: 1403663867125

注意:這裡的分析請看下面的分析整個過程

# 檢視hdfs上flume是否上傳了資料

[[email protected] tdir1]# hadoop fs -ls /yting

Found 2 items

drwxr-xr-x   -root supergroup          0 2014-06-2510:51 /yting/flumet

-rw-r--r--   3root supergroup       4278 2014-06-1018:29 /yting/yarn-daemon.sh

[[email protected] tdir1]# hadoop fs -ls /yting/flumet

Found 1 items

-rw-r--r--   3root supergroup         50 2014-06-2510:51 /yting/flumet/FlumeData.1403664662360

[[email protected] tdir1]# hadoop fs -cat /yting/flumet/FlumeData.1403664662360

The you smile until forever .....................(日誌資訊以及被上傳了,OK、、、)

[[email protected] tdir1]#


# 分析整個過程

通過分析flume shell的日誌資訊可以發現當我們在監視目錄下新檔案被建立儲存的時候flume進行處理並且重新命名該檔案,在原檔案命後面新增.COMPLETE,然後將檔案中的資料上傳到hdfs中並建立一個臨時檔案filename.tmp,上傳成功後重命名hdfs上的臨時檔案,將檔案字尾.tmp去掉就ok了,最後flume將本次操作寫入自己的日誌資訊。

# 初學者注意的地方

# 配置檔案的檔名命

# 配置檔案中的agent1與flume-ng 的-n 引數一直

# 最好配置檔案的檔名與配置檔案內容的名字一樣,這樣-n引數就不會敲錯了


java有興趣的朋友可以加上面說的553175249這個群哦,一起學習,共同進步 .

雲端計算有興趣的朋友可以加上面說的214293307這個群哦,一起學習,共同進步 ...

# Flume介紹

Flume是Cloudera提供的一個高可用的高可靠的分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

系統功能

# 日誌收集

Flume最早是Cloudera提供的日誌收集系統,目前是Apache下的一個孵化專案,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料。

# 資料處理

Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力Flume提供了從console(控制檯)、RPC(Thrift-RPC)、text(檔案)、tail(UNIX tail)、syslog(syslog日誌系統,支援TCP和UDP等2種模式),exec(命令執行)等資料來源上收集資料的能力。

# 工作方式 (Flume-NG舊版本的概念,新版本已經丟棄)

Flume採用了多Master的方式。為了保證配置資料的一致性,Flume引入了ZooKeeper,用於儲存配置資料,ZooKeeper本身可保證配置資料的一致性和高可用,另外,在配置資料發生變化時,ZooKeeper可以通知FlumeMaster節點。Flume Master間使用gossip協議同步資料。

# Flume的設計目標(百度百科)

# 可靠性 

當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(這也是scribe採用的策略,當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Besteffort(資料傳送到接收方後,不會進行確認)。

# 可擴充套件性(Flume-NG舊版本的概念,新版本已經丟棄)

Flume採用了三層架構,分別為agent,collector和storage,每一層均可以水平擴充套件。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。

# 可管理性(Flume-NG舊版本的概念,新版本已經丟棄)

所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置資料的一致性。使用者可以在master上檢視各個資料來源或者資料流執行情況,且可以對各個資料來源配置和動態載入。Flume提供了web和shell script command兩種形式對資料流進行管理。

# 功能可擴充套件性

使用者可以根據需要新增自己的agent,collector或者storage。此外,Flume自帶了很多元件,包括各種agent(file,syslog等),collector和storage(file,HDFS等)。(這裡看下面的Flume架構圖你就明白了)

# Flume架構

# Flume基礎架構,如下圖:

  

這是一個flume-ng 最簡單的圖。flume-ng 是由一個個agent組成的。一個agent就像一個細胞一樣。

# Flume的多agent架構,如下圖:

    

   上面是兩個agent連結在一起的,再看看更多的......

# Flume的合併(合作)架構,如下圖:


你是不是覺得這種設計是不是吊炸天了,可以隨意組合,跟搭積木一樣。跟Storm的設計思想是差不多的,何止吊炸天啊,簡直就是吊炸天 、、、

# Flume的多路複用架構,如下圖:

      

# agent的構造

每個agent裡都有三部分構成:source、channel和sink。

就相當於source接收資料,通過channel傳輸資料,sink把資料寫到下一端。這就完了,就這麼簡單。其中source有很多種可以選擇,channel有很多種可以選擇,sink也同樣有多種可以選擇,並且都支援自定義。餓靠!太靈活了。想怎麼玩就怎麼玩,這你妹的!

同時,如上上圖所示,agent還支援選擇器,就是一個source支援多個channel和多個sink,這樣就完成了資料的分發。

這就完了,flume-ng就這麼簡單........

從看到最後用,一天足可以搞定。剩下的就是怎麼組織你的agent的問題了。也就是搭積木的過程......

另外有一點需要強調的是,flume-ng提供了一種特殊的啟動方式(不同於agent),那就是client啟動。cilent是一個特殊的agent, 他的source是檔案,channel是記憶體,sink是arvo。實際上是為了方便大家用,直接來傳遞檔案的。具體可以看看官方使用手冊。

估計到這兒,應該對flume-ng有了解了吧 、、、on my god、、、


# Flume的安裝

# 下載 flume(使用wget下載)

[[email protected] flume]# wget -c -P /root http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz

# 安裝

[email protected] flume]# pwd

/usr/local/adsit/yting/apache/flume

[[email protected] flume]# ll

total 4

drwxr-xr-x 3 root root 4096 Jun 24 17:25mirrors.cnnic.cn

[[email protected] flume]# cpmirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz .

[[email protected] flume]# ll

total 25276

-rw-r--r-- 1 root root 25876246 Jun 24 17:27apache-flume-1.5.0-bin.tar.gz

drwxr-xr-x 3 root root     4096 Jun 24 17:25 mirrors.cnnic.cn

[[email protected] flume]# tar -zxvfapache-flume-1.5.0-bin.tar.gz

[[email protected] flume]# ll

total 25280

drwxr-xr-x 7 root root     4096 Jun 24 17:27 apache-flume-1.5.0-bin

-rw-r--r-- 1 root root 25876246 Jun 24 17:27apache-flume-1.5.0-bin.tar.gz

drwxr-xr-x 3 root root     4096 Jun 24 17:25 mirrors.cnnic.cn

[[email protected] flume]# rm -rfapache-flume-1.5.0-bin.tar.gz

[[email protected] flume]# rm -rf mirrors.cnnic.cn/

[[email protected] flume]# ll

total 4

drwxr-xr-x 7 root root 4096 Jun 24 17:27apache-flume-1.5.0-bin

[[email protected] flume]#

# 修改 flume-env.sh 配置檔案

[[email protected] conf]# pwd

/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf

[[email protected] conf]# ll

total 12

-rw-r--r-- 1 501 games 1661 Mar 29 06:15flume-conf.properties.template

相關推薦

Flume體系結構介紹以及Flume入門案例(HDFS資料)

# Flume的體系結構 對java有興趣的朋友可以加上面說的553175249這個群哦,一起學習,共同進步 . # Flume介紹 Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各

OSI,TCP/IP,五層協議的體系結構以及各層協議

802.3 面向連接 udp 用戶 數據包 格式 ios mpeg 用戶數據 以下整理的是計算機網絡的OSI,TCP/IP,五層協議的體系結構,以及各層協議,便於以後查看: OSI分層,自上而下分別是:物理層,數據鏈路層,網絡層,傳輸層,會話層,表示層,應用層 TCP/IP

spring框架體系結構介紹

簡潔 之間 sta art 更多 multi 引入 nfa cti Spring框架學習(本人整理,非原創) 一、spring簡介 Spring是一個開源框架,Spring是於2003 年興起的一個輕量級的Java 開發框架,由Rod Johnson創建。簡單來說,Spri

Oracle數據庫基本操作 —— Oracle數據庫體系結構介紹、DDL、DCL、DML

back 存儲 rop oracle數據 none run 單元 修改表 多列 一、Oracle數據庫介紹 1、基本介紹   Oracle數據庫系統是美國ORACLE公司(甲骨文)提供的以分布式數據庫為核心的一組軟件產品,是目前最流行的客戶/服務器(CLIENT/SERVE

Hive 體系結構介紹

led 一個 base 可擴展性 ask back pan 二進制格式 bject 下面是Hive的架構圖。 圖1.1 Hive體系結構 Hive的體系結構可以分為以下幾部分: (1)用戶接口主要有三個:CLI,Client 和 WUI。其中最常用的是C

Java體系結構介紹

CA 但是 volcano 訪問控制 開發 kit 很好 優化 特性 Java技術的核心就是Java虛擬機——所有Java程序都在其上運行,需要Java虛擬機、Java API和Java,class文件的配合,Java程序才能夠運行 為什麽使用Java 通過網絡連接起來

Lucene介紹及簡單入門案例(集成ik分詞器)

chinese depend 創建索引 圖片 latest frame numeric id字段 div 介紹     Lucene是apache軟件基金會4 jakarta項目組的一個子項目,是一個開放源代碼的全文檢索引擎工具包,但它不是一個完整的全文檢索引擎,而是一個

redis學習(二) redis資料結構介紹以及常用命令

redis資料結構介紹   我們已經知道redis是一個基於key-value資料儲存的資料結構資料庫,這裡的key指的是string型別,而對應的value則可以是多樣的資料結構。其中包括下面五種型別:   1.string 字串    string字串型別是redis最基礎的資料儲存型別。

GDB體系結構介紹(二)

4.7 符號方面 GDB的符號端主要負責讀取可執行檔案,提取它找到的任何符號資訊,並將其構建到符號表中。 讀取過程從BFD庫開始。 BFD是一種用於處理二進位制檔案和目標檔案的通用庫;在任何主機上執行,​​它可以讀取和寫入原始的Unix a.out格式,COFF(用於System V Unix和

GDB體系結構介紹(一)

GNU偵錯程式GDB是最早為自由軟體基金會編寫的程式之一,從那以後它一直是免費和開源軟體系統的主要部分。它最初設計為普通的Unix原始碼級偵錯程式,後來擴充套件到廣泛的用途,包括與許多嵌入式系統一起使用,並且從幾千行C增加到超過五十萬。 本章將深入研究GDB的整體內部結構,展示隨著新使用者需求和新功

Eclipse體系結構介紹(四)

6.4 Eclipse 4.0 必須不斷檢查架構以評估它是否仍然合適。它能夠融入新技術嗎?它是否鼓勵社群的成長?吸引新的貢獻者是否容易?在2007年末,Eclipse專案提交者決定這些問題的答案是否定的,他們著手設計Eclipse的新願景。與此同時,他們意識到有數千個Eclipse應用程式依賴於

《深入Java虛擬機器 第二版》之第1章 Java體系結構介紹

1、Java技術的核心就是“Java虛擬機器”。 2、Java體系結構四個獨立的技術: ·Java程式設計語言; ·Java class檔案格式; ·Java應用程式設計介面(API); ·Java虛擬機器; 3、編寫並執行一個Java程式,對四種技術的體驗:

ServerSuperIO 3.5版本的體系結構以及未來規劃的幾點思考

一.特點 1.輕型高效能通訊框架,適用於多種應用場,輪詢模式、自控模式、併發模式和單例模式。 2.不僅是通訊框架,是裝置驅動、IO通道、控制模式場景的協調機制。 3.支援協議驅動器,可以按規範寫標準協議和自定義協議。 4.支援傳送資料快取器,支援命令快取重發和按優先級別傳送。 5.支援協議過

YUI介紹以及快速入門 Yahoo的JS框架

1、YUI介紹: YUI庫是一系列使用Javascript和CSS建立的的工具和控制元件集,用來建立富客戶端Web應用。使用到了DOM scripting,DHTML和AJAX。 2、在頁面中引入JS檔案: 可以從官網下載YUI http://yuilibrary.

android體系結構介紹

l  應用程式(application) l  應用程式框架(Application Framework) l  各種庫和android執行環境 l  操作層OS Ø  應用程式(application):android的應用程式通常涉及使用者介面和互動; Ø  應用

嵌入式晶片體系結構介紹

根據處理器的應用範圍及處理能力可以將處理器分為嵌入式微處理器、嵌入式微控制器、嵌入式DSP處理器、嵌入式片上系統。 1.嵌入式微處理器(Micro Processor Unit,MPU)    嵌入式微處理器是由通用計算機中的CPU演變而來的。它的特徵是具有32位以上的處理

OSI(7層)TCP/IP(4層)五層協議的體系結構以及各層的協議,作用

OSI模型(1)    物理層:IEEE802,IEEE802.2  作用:以二進位制的資料形式在物理媒體上傳輸資料(中繼器,集線器,閘道器)(2)    資料鏈路層:ARP,RARP,PPP,MTU 作用:傳輸有地址的幀,將位元組裝成幀和點到點的傳遞,以及錯誤檢測功能(網橋

JDK 6 目錄結構介紹以及JDK中的工具研究

要想深入瞭解Java必須對JDK的組成, 本文對JDK6裡的目錄做了基本的介紹,主要還是講解 了下JDK裡的各種可執行程式或工具的用途 Java(TM) 有兩個平臺 JRE 執行平臺,包括Java虛擬機器,執行類庫,java應用程式裝載器。 JRE不是開發環境,所以不包括編

OSI,TCP/IP,五層協議的體系結構以及各層協議簡介

OSI分層 (7層):物理層、資料鏈路層、網路層、傳輸層、會話層、表示層、應用層。 TCP/IP分層(4層):網路介面層、 網際層、運輸層、 應用層。 五層協議 (5層):物理層、資料鏈路層、網路層

Struts2體系結構以及詳解

Strut2的體系結構如圖所示: 橙色是Servlet Filters,過濾器鏈,所有的請求都要經過Filter鏈的處理。 淺藍色是Struts Core,Struts2的核心部分,Struts2中已經做好的功能,在實際開發中不需要動它們。 淺綠色是In