1. 程式人生 > >flume-ng資料複用案列配置詳解

flume-ng資料複用案列配置詳解

#型別 AVRO spooldir netcat HTTP
a1.sources = sourceMqtt
a1.sinks = sinkMqtt1 sinkMqtt2
a1.channels = channelMqtt1 channelMqtt2

# Describe/configure the source
a1.sources.sourceMqtt.type = com.leapmotor.emqflumesource.MQTTSource
a1.sources.sourceMqtt.selector.type = multiplexing
a1.sources.sourceMqtt.selector.header = biztype
a1.sources.sourceMqtt.selector.mapping.cardata = channelMqtt2
a1.sources.sourceMqtt.selector.default = channelMqtt1

# Describe the sinkMqtt1
#型別 file_roll avro HDFS
a1.sinks.sinkMqtt1.type = hdfs  
#寫入hdfs的路徑,需要包含檔案系統標識
a1.sinks.sinkMqtt1.hdfs.path = hdfs://cluster/clusterfolder/data1/%Y-%m-%d/%H
a1.sinks.sinkMqtt1.hdfs.fileType = DataStream
a1.sinks.sinkMqtt1.hdfs.writeFormat=TEXT
a1.sinks.sinkMqtt1.hdfs.filePrefix = gbt32960Hdfs
a1.sinks.sinkMqtt1.hdfs.batchSize = 1000
a1.sinks.sinkMqtt1.hdfs.rollSize = 0
a1.sinks.sinkMqtt1.hdfs.rollCount = 10
a1.sinks.sinkMqtt1.hdfs.rollInterval = 0
a1.sinks.sinkMqtt1.hdfs.useLocalTimeStamp = true

# Describe the sinkMqtt1
#型別 file_roll avro HDFS
a1.sinks.sinkMqtt2.type = hdfs  
#寫入hdfs的路徑,需要包含檔案系統標識
a1.sinks.sinkMqtt2.hdfs.path = hdfs://cluster/clusterfolder/data2/%Y-%m-%d/%H
a1.sinks.sinkMqtt2.hdfs.fileType = DataStream
a1.sinks.sinkMqtt2.hdfs.writeFormat=TEXT
a1.sinks.sinkMqtt2.hdfs.filePrefix = cardataHdfs
a1.sinks.sinkMqtt2.hdfs.batchSize = 1000
a1.sinks.sinkMqtt2.hdfs.rollSize = 0
a1.sinks.sinkMqtt2.hdfs.rollCount = 10
a1.sinks.sinkMqtt2.hdfs.rollInterval = 0
a1.sinks.sinkMqtt2.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
a1.channels.channelMqtt1.type = memory
a1.channels.channelMqtt1.capacity = 5000
a1.channels.channelMqtt1.transactionCapacity = 1000

a1.channels.channelMqtt2.type = memory
a1.channels.channelMqtt2.capacity = 5000
a1.channels.channelMqtt2.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.sourceMqtt.channels = channelMqtt1 channelMqtt2
a1.sinks.sinkMqtt1.channel = channelMqtt1
a1.sinks.sinkMqtt2.channel = channelMqtt2