1. 程式人生 > >Flume + HDFS Sink採集資料及如何新增第三方JAR

Flume + HDFS Sink採集資料及如何新增第三方JAR

Flume預設情況下是沒有引入HDFS,Kafka,Elasticsearch,Hbase等sink的相關jar包,如果使用,需要自行新增相關jar包。

下面我以使用HDFS Sink為例,在Flume中加入第三方JAR包。

Flume支援一種特殊的目錄結構:plugins.d,它有特殊的格式,可以很方面的管理第三方JAR。當然我們可以直接把第三方JAR丟掉$FLUME_HOME/lib目錄,但是這樣不利於除錯和排除故障,特別是處理JAR包衝突的問題。

plugins.d目錄:
plugins.d目錄位於$FLUME_HOME/plugins.d。在啟動的時候,flume-ng啟動指令碼會檢查 plugins.d 目錄的外掛確保符合下面的格式,並且包含了正確的路徑。

外掛目錄佈局:
每個在 plugins.d 內的外掛,最多包含三個子目錄。
1,lib - 外掛的JAR。
2,libext  - 外掛依賴JAR(S)
3,native - 任何所需的本地庫,例如:.so檔案
下面是兩個外掛在 plugins.d 目錄中的位置(以下以使用HDFS Sink為例):

    plugins.d/  
    plugins.d/hdfs-sink/
    #flume-hdfs-sink-1.5.1.jar是Flume自帶的,所有lib目錄為空
    plugins.d/hdfs-sink/lib/    
    #flume-hdfs-sink-1.5.1.jar依賴以下四個包
    plugins.d/hdfs-sink/libext/commons-configuration-1.6.jar
    plugins.d/hdfs-sink/libext/hadoop-annotations-2.4.1.jar
    plugins.d/hdfs-sink/libext/hadoop-auth-2.4.1.jar
    plugins.d/hdfs-sink/libext/hadoop-common-2.4.1.jar
    plugins.d/hdfs-sink/libext/hadoop-hdfs-2.4.1.jar
    #沒有本地庫
    plugins.d/hdfs-sink/native/
    #HDFS配置檔案
    plugins.d/hdfs-sink/conf/hdfs-site.xml
    plugins.d/hdfs-sink/conf/core-site.xml
以上是HDFS所需的配置檔案和依賴的JAR。

配置Flume環境變數:

JAVA_HOME=/usr/local/jdk1.8.0_45

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"

# Note that the Flume conf directory is always included in the classpath.
#將hdfs-site.xml和core-site.xml放入Flume的環境變數中
FLUME_CLASSPATH="/data/apache-flume-1.5.1-bin/plugins.d/hdfs-sink/conf/"

OK,完事具備,最後採集資料到HDFS中

a1.sources = source1
a1.sinks = sink1
a1.channels = channel1
#resources
a1.sources.source1.type = spooldir
a1.sources.source1.channels = channel1
a1.sources.source1.spoolDir = /data/logs

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 500

a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = channel1
a1.sinks.sink1.hdfs.path = /flume/events/%Y-%M-%d
a1.sinks.sink1.hdfs.fileType = DataStream
a1.sinks.sink1.hdfs.rollCount = 0
a1.sinks.sink1.hdfs.rollSize = 0
a1.sinks.sink1.hdfs.rollInterval = 0
a1.sinks.sink1.hdfs.rollSize = 1073741824
a1.sinks.sink1.hdfs.filePrefix = nginx-%H-%M
a1.sinks.sink1.hdfs.batchSize = 200
a1.sinks.sink1.hdfs.round = true
a1.sinks.sink1.hdfs.roundValue = 10
a1.sinks.sink1.hdfs.roundUnit = minute
a1.sinks.sink1.hdfs.useLocalTimeStamp = true
該示例是通過SpoolingDirectorySource獲取資料放入HDFS中