1. 程式人生 > >Flume根據檔案中每行開始的日期進行資料夾分類匯入HDFS

Flume根據檔案中每行開始的日期進行資料夾分類匯入HDFS

這裡使用Flume只需要建立一個新的flume配置檔案就可以了,我這裡將配置檔案取名為access_hdfs.properties,下面是配置檔案內容:

#-----------------access_hdfs.properties---------------#
#--------------------Edit by Cheengvho-----------------#
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1

agent1.sources.src1.interceptors = i1
agent1.sources.src1.interceptors.i1.type = regex_extractor
agent1.sources.src1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
agent1.sources.src1.interceptors.i1.serializers = s1
agent1.sources.src1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent1.sources.src1.interceptors.i1.serializers.s1.name = timestamp
agent1.sources.src1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm


agent1.sources.src1.type = spooldir
agent1.sources.src1.spoolDir = /var/log/flume
agent1.sources.src1.channels = ch1

agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = /flume/events/cheengvho/%Y%m%d
agent1.sinks.sink1.hdfs.filePrefix = %Y-%m-%d
agent1.sinks.sink1.hdfs.fileSuffix = .log
agent1.sinks.sink1.channel = ch1

agent1.sinks.sink1.hdfs.rollInterval = 0
agent1.sinks.sink1.hdfs.rollSize = 4000000
agent1.sinks.sink1.hdfs.rollCount = 0
agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 1000

用這個配置檔案啟動flume

$ flume-ng agent --conf /etc/flume-ng/conf --conf--file /etc/flume-ng/conf/access_hdfs.properties \
--name agent1 -Dflume.root.logger=INFO,console