1. 程式人生 > >flume採集日誌,放到kafka中

flume採集日誌,放到kafka中

實打實的程式碼,拷了就能用!!!!!!!!!!!!

tier1.sources  = HbaseAuditSources HbaseRunSources HdfsAuditSources HdfsRunSources HiveAuditSources HiveRunSources StormWorkerSources StormRunSources YarnAuditSources YarnRunSources
tier1.channels = HbaseAuditChannel HbaseRunChannel HdfsAuditChannel HdfsRunChannel HiveAuditChannel HiveRunChannel StormWorkerChannel StormRunChannel YarnAuditChannel YarnRunChannel
tier1.sinks    = HbaseAuditSink HbaseRunSink HdfsAuditSink HdfsRunSink HiveAuditSink HiveRunSink StormWorkerSink StormRunSink YarnAuditSink YarnRunSink


#HBASE
tier1.sources.HbaseAuditSources.type = TAILDIR
tier1.sources.HbaseAuditSources.channels = HbaseAuditChannel
tier1.sources.HbaseAuditSources.filegroups = HbaseAuditGroup
tier1.sources.HbaseAuditSources.filegroups.HbaseAuditGroup = /home/data/flume-ng/hbase/^audit.*.log.*
tier1.sources.HbaseAuditSources.positionFile  = /var/data/flume-ng/audit_hbase_position.json
tier1.sources.HbaseAuditSources.interceptors = i2 i3 i4 i1
tier1.sources.HbaseAuditSources.interceptors.i1.type = regex_filter
tier1.sources.HbaseAuditSources.interceptors.i1.regex = scope\\=hbase\\:meta
tier1.sources.HbaseAuditSources.interceptors.i1.excludeEvents = true
tier1.sources.HbaseAuditSources.interceptors.i2.type = static
tier1.sources.HbaseAuditSources.interceptors.i2.key = datatye
tier1.sources.HbaseAuditSources.interceptors.i2.value = hbase
tier1.sources.HbaseAuditSources.interceptors.i3.type = host
tier1.sources.HbaseAuditSources.interceptors.i3.hostHeader = serverip
tier1.sources.HbaseAuditSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder






tier1.sources.HbaseRunSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.HbaseRunSources.channels = HbaseRunChannel
tier1.sources.HbaseRunSources.filegroups = HbaseRunGroup
tier1.sources.HbaseRunSources.filegroups.HbaseRunGroup.parentDir=/home/data/flume-ng/hbase/
tier1.sources.HbaseRunSources.filegroups.HbaseRunGroup.filePattern = hbase*.log.*
tier1.sources.HbaseRunSources.filegroups.HbaseRunGroup.cachePatternMatching=false
tier1.sources.HbaseRunSources.multiline = true
tier1.sources.HbaseRunSources.multilinePattern = [\\d\\-\\s,\\:]{24}
tier1.sources.HbaseRunSources.multilinePatternBelong = previous
tier1.sources.HbaseRunSources.multilineMatched = false
tier1.sources.HbaseRunSources.multilineEventTimeoutSeconds = 1
tier1.sources.HbaseRunSources.multilineMaxBytes = 3145728
tier1.sources.HbaseRunSources.multilineMaxLines = 3000
tier1.sources.HbaseRunSources.positionFile  = /var/data/flume-ng/running_hbase_position.json
tier1.sources.HbaseRunSources.interceptors = i2 i3 i4 i1
tier1.sources.HbaseRunSources.interceptors.i1.type = regex_filter
tier1.sources.HbaseRunSources.interceptors.i1.regex = (DEBUG|INFO|WARN|TRACE)
tier1.sources.HbaseRunSources.interceptors.i1.excludeEvents = true
tier1.sources.HbaseRunSources.interceptors.i2.type = static
tier1.sources.HbaseRunSources.interceptors.i2.key = datatye
tier1.sources.HbaseRunSources.interceptors.i2.value = hbase
tier1.sources.HbaseRunSources.interceptors.i3.type = host
tier1.sources.HbaseRunSources.interceptors.i3.hostHeader = serverip
tier1.sources.HbaseRunSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder


#HDFS
tier1.sources.HdfsAuditSources.type = TAILDIR
tier1.sources.HdfsAuditSources.channels = HdfsAuditChannel
tier1.sources.HdfsAuditSources.filegroups = HdfsAuditGroup
tier1.sources.HdfsAuditSources.filegroups.HdfsAuditGroup = /home/data/flume-ng/hadoop-hdfs/^audit.*.log.*
tier1.sources.HdfsAuditSources.positionFile  = /var/data/flume-ng/audit_hdfs_position.json
tier1.sources.HdfsAuditSources.interceptors = i1 i2 i3 
tier1.sources.HdfsAuditSources.interceptors.i1.type = static
tier1.sources.HdfsAuditSources.interceptors.i1.key = datatype
tier1.sources.HdfsAuditSources.interceptors.i1.value = hdfs
tier1.sources.HdfsAuditSources.interceptors.i2.type = host
tier1.sources.HdfsAuditSources.interceptors.i2.hostHeader = serverip
tier1.sources.HdfsAuditSources.interceptors.i3.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder






tier1.sources.HdfsRunSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.HdfsRunSources.channels = HdfsRunChannel
tier1.sources.HdfsRunSources.filegroups = HdfsRunGroup
tier1.sources.HdfsRunSources.filegroups.HdfsRunGroup.parentDir=/home/data/flume-ng/hadoop-hdfs/
tier1.sources.HdfsRunSources.filegroups.HdfsRunGroup.filePattern = hadoop*.log.*
tier1.sources.HdfsRunSources.filegroups.HdfsRunGroup.cachePatternMatching=false
tier1.sources.HdfsRunSources.multiline = true
tier1.sources.HdfsRunSources.multilinePattern = [\\d\\-\\s,\\:]{24}
tier1.sources.HdfsRunSources.multilinePatternBelong = previous
tier1.sources.HdfsRunSources.multilineMatched = false
tier1.sources.HdfsRunSources.multilineEventTimeoutSeconds = 1
tier1.sources.HdfsRunSources.multilineMaxBytes = 3145728
tier1.sources.HdfsRunSources.multilineMaxLines = 3000
tier1.sources.HdfsRunSources.positionFile  = /var/data/flume-ng/running_hdfs_position.json
tier1.sources.HdfsRunSources.interceptors = i2 i3 i4 i1
tier1.sources.HdfsRunSources.interceptors.i1.type = regex_filter
tier1.sources.HdfsRunSources.interceptors.i1.regex = (DEBUG|INFO|WARN|TRACE)
tier1.sources.HdfsRunSources.interceptors.i1.excludeEvents = true
tier1.sources.HdfsRunSources.interceptors.i2.type = static
tier1.sources.HdfsRunSources.interceptors.i2.key = datatye
tier1.sources.HdfsRunSources.interceptors.i2.value = hdfs
tier1.sources.HdfsRunSources.interceptors.i3.type = host
tier1.sources.HdfsRunSources.interceptors.i3.hostHeader = serverip
tier1.sources.HdfsRunSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




#HIVE
tier1.sources.HiveAuditSources.type = TAILDIR
tier1.sources.HiveAuditSources.channels = HiveAuditChannel
tier1.sources.HiveAuditSources.filegroups = HiveAuditGroup
tier1.sources.HiveAuditSources.filegroups.HiveAuditGroup = /home/data/flume-ng/hive/^audit.*.log.*
tier1.sources.HiveAuditSources.positionFile  = /var/data/flume-ng/audit_hive_position.json
tier1.sources.HiveAuditSources.interceptors = i1 i2 i3
tier1.sources.HiveAuditSources.interceptors.i1.type = static
tier1.sources.HiveAuditSources.interceptors.i1.key = datatype
tier1.sources.HiveAuditSources.interceptors.i1.value = hive
tier1.sources.HiveAuditSources.interceptors.i2.type = host
tier1.sources.HiveAuditSources.interceptors.i2.hostHeader = serverip
tier1.sources.HiveAuditSources.interceptors.i3.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




tier1.sources.HiveRunSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.HiveRunSources.channels = HiveRunChannel
tier1.sources.HiveRunSources.filegroups = HiveRunGroup
tier1.sources.HiveRunSources.filegroups.HiveRunGroup.parentDir=/home/data/flume-ng/hive/
tier1.sources.HiveRunSources.filegroups.HiveRunGroup.filePattern = hadoop*.log.*
tier1.sources.HiveRunSources.filegroups.HiveRunGroup.cachePatternMatching=false
tier1.sources.HiveRunSources.multiline = true
tier1.sources.HiveRunSources.multilinePattern = [\\d\\-\\s,\\:]{24}
tier1.sources.HiveRunSources.multilinePatternBelong = previous
tier1.sources.HiveRunSources.multilineMatched = false
tier1.sources.HiveRunSources.multilineEventTimeoutSeconds = 1
tier1.sources.HiveRunSources.multilineMaxBytes = 3145728
tier1.sources.HiveRunSources.multilineMaxLines = 3000
tier1.sources.HiveRunSources.positionFile  = /var/data/flume-ng/running_hive_position.json
tier1.sources.HiveRunSources.interceptors = i2 i3 i4 i1
tier1.sources.HiveRunSources.interceptors.i1.type = regex_filter
tier1.sources.HiveRunSources.interceptors.i1.regex = (DEBUG|INFO|WARN|TRACE)
tier1.sources.HiveRunSources.interceptors.i1.excludeEvents = true
tier1.sources.HiveRunSources.interceptors.i2.type = static
tier1.sources.HiveRunSources.interceptors.i2.key = datatype
tier1.sources.HiveRunSources.interceptors.i2.value = hive
tier1.sources.HiveRunSources.interceptors.i3.type = host
tier1.sources.HiveRunSources.interceptors.i3.hostHeader = serverip
tier1.sources.HiveRunSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




#STORM
tier1.sources.StormWorkerSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.StormWorkerSources.channels = StormWorkerChannel
tier1.sources.StormWorkerSources.filegroups = StormWorkerGroup          
tier1.sources.StormWorkerSources.filegroups.StormWorkerGroup.parentDir=/home/data/flume-ng/storm/logs/workers-artifacts
tier1.sources.StormWorkerSources.filegroups.StormWorkerGroup.filePattern = **/worker.log* 
tier1.sources.StormWorkerSources.filegroups.StormWorkerGroup.cachePatternMatching=false
tier1.sources.StormWorkerSources.positionFile  = /var/data/flume-ng/worker_position.json
tier1.sources.StormWorkerSources.interceptors = i1 i2 i3
tier1.sources.StormWorkerSources.interceptors.i1.type = static
tier1.sources.StormWorkerSources.interceptors.i1.key = datatype
tier1.sources.StormWorkerSources.interceptors.i1.value = storm
tier1.sources.StormWorkerSources.interceptors.i2.type = host
tier1.sources.StormWorkerSources.interceptors.i2.hostHeader = serverip
tier1.sources.StormWorkerSources.interceptors.i3.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder


tier1.sources.StormRunSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.StormRunSources.channels = StormRunChannel
tier1.sources.StormRunSources.filegroups = StormWorkerGroup
tier1.sources.StormRunSources.filegroups.StormWorkerGroup.parentDir=/home/data/flume-ng/storm/logs/
tier1.sources.StormRunSources.filegroups.StormWorkerGroup.filePattern = {nimbus,ui,logviewer,supervisor}.log
tier1.sources.StormRunSources.filegroups.StormWorkerGroup.cachePatternMatching=false
tier1.sources.StormRunSources.multiline = true
tier1.sources.StormRunSources.multilinePattern = [\\d\\-\\s:.]{24}
tier1.sources.StormRunSources.multilinePatternBelong = previous
tier1.sources.StormRunSources.multilineMatched = false
tier1.sources.StormRunSources.multilineEventTimeoutSeconds = 1
tier1.sources.StormRunSources.multilineMaxBytes = 3145728
tier1.sources.StormRunSources.multilineMaxLines = 3000
tier1.sources.StormRunSources.positionFile  = /var/data/flume-ng/running_storm_position.json
tier1.sources.StormRunSources.interceptors = i2 i3 i4 i1
tier1.sources.StormRunSources.interceptors.i1.type = regex_filter
tier1.sources.StormRunSources.interceptors.i1.regex = (DEBUG|INFO|WARN|TRACE)
tier1.sources.StormRunSources.interceptors.i1.excludeEvents = true
tier1.sources.StormRunSources.interceptors.i2.type = static
tier1.sources.StormRunSources.interceptors.i2.key = datatye
tier1.sources.StormRunSources.interceptors.i2.value = storm
tier1.sources.StormRunSources.interceptors.i3.type = host
tier1.sources.StormRunSources.interceptors.i3.hostHeader = serverip
tier1.sources.StormRunSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




#YARN
tier1.sources.YarnAuditSources.type = TAILDIR
tier1.sources.YarnAuditSources.channels = YarnAuditChannel
tier1.sources.YarnAuditSources.filegroups = YarnAuditGroup
tier1.sources.YarnAuditSources.filegroups.YarnAuditGroup = /home/data/flume-ng/hadoop-yarn/^audit.*.log.*
tier1.sources.YarnAuditSources.positionFile  = /var/data/flume-ng/audit_yarn_position.json
tier1.sources.YarnAuditSources.interceptors = i1 i2 i3
tier1.sources.YarnAuditSources.interceptors.i1.type = static
tier1.sources.YarnAuditSources.interceptors.i1.key = datatype
tier1.sources.YarnAuditSources.interceptors.i1.value = yarn
tier1.sources.YarnAuditSources.interceptors.i2.type = host
tier1.sources.YarnAuditSources.interceptors.i2.hostHeader = serverip
tier1.sources.YarnAuditSources.interceptors.i3.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




tier1.sources.YarnRunSources.type = com.travelsky.flume.source.taildir.TaildirSource
tier1.sources.YarnRunSources.channels = YarnRunChannel
tier1.sources.YarnRunSources.filegroups = YarnRunGroup
tier1.sources.YarnRunSources.filegroups.YarnRunGroup.parentDir=/home/data/flume-ng/hadoop-yarn/
tier1.sources.YarnRunSources.filegroups.YarnRunGroup.filePattern = hadoop*.log.*
tier1.sources.YarnRunSources.filegroups.YarnRunGroup.cachePatternMatching=false
tier1.sources.YarnRunSources.multiline = true
tier1.sources.YarnRunSources.multilinePattern = [\\d\\-\\s,\\:]{24}
tier1.sources.YarnRunSources.multilinePatternBelong = previous
tier1.sources.YarnRunSources.multilineMatched = false
tier1.sources.YarnRunSources.multilineEventTimeoutSeconds = 1
tier1.sources.YarnRunSources.multilineMaxBytes = 3145728
tier1.sources.YarnRunSources.multilineMaxLines = 3000
tier1.sources.YarnRunSources.positionFile  = /var/data/flume-ng/running_yarn_position.json
tier1.sources.YarnRunSources.interceptors = i2 i3 i4 i1
tier1.sources.YarnRunSources.interceptors.i1.type = regex_filter
tier1.sources.YarnRunSources.interceptors.i1.regex = (DEBUG|INFO|WARN|TRACE)
tier1.sources.YarnRunSources.interceptors.i1.excludeEvents = true
tier1.sources.YarnRunSources.interceptors.i2.type = static
tier1.sources.YarnRunSources.interceptors.i2.key = datatye
tier1.sources.YarnRunSources.interceptors.i2.value = yarn
tier1.sources.YarnRunSources.interceptors.i3.type = host
tier1.sources.YarnRunSources.interceptors.i3.hostHeader = serverip
tier1.sources.YarnRunSources.interceptors.i4.type = com.travelsky.flume.interceptor.AppendHeaderToBody$Builder




tier1.sinks.HbaseAuditSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HbaseAuditSink.topic = AUDIT_HBASE_WC
tier1.sinks.HbaseAuditSink.brokerList  = *.*.*.*:9093
tier1.sinks.HbaseAuditSink.batchSize = 1
tier1.sinks.HbaseAuditSink.requiredAcks = 1
tier1.sinks.HbaseAuditSink.channel = HbaseAuditChannel


tier1.sinks.HbaseRunSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HbaseRunSink.topic = RUN_HBASE_WC
tier1.sinks.HbaseRunSink.brokerList  = *.*.*.*:9093
tier1.sinks.HbaseRunSink.batchSize = 1
tier1.sinks.HbaseRunSink.requiredAcks = 1
tier1.sinks.HbaseRunSink.channel = HbaseRunChannel


tier1.sinks.HdfsAuditSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HdfsAuditSink.topic = AUDIT_HDFS_WC
tier1.sinks.HdfsAuditSink.brokerList  = *.*.*.*:9093
tier1.sinks.HdfsAuditSink.batchSize = 1
tier1.sinks.HdfsAuditSink.requiredAcks = 1
tier1.sinks.HdfsAuditSink.channel = HdfsAuditChannel


tier1.sinks.HdfsRunSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HdfsRunSink.topic = RUN_HDFS_WC
tier1.sinks.HdfsRunSink.brokerList  = *.*.*.*:9093
tier1.sinks.HdfsRunSink.batchSize = 1
tier1.sinks.HdfsRunSink.requiredAcks = 1
tier1.sinks.HdfsRunSink.channel = HdfsRunChannel




tier1.sinks.HiveAuditSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HiveAuditSink.topic = AUDIT_HIVE_WC
tier1.sinks.HiveAuditSink.brokerList  = *.*.*.*:9093
tier1.sinks.HiveAuditSink.batchSize = 1
tier1.sinks.HiveAuditSink.requiredAcks = 1
tier1.sinks.HiveAuditSink.channel = HiveAuditChannel




tier1.sinks.HiveRunSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.HiveRunSink.topic = RUN_HIVE_WC
tier1.sinks.HiveRunSink.brokerList  = *.*.*.*:9093
tier1.sinks.HiveRunSink.batchSize = 1
tier1.sinks.HiveRunSink.requiredAcks = 1
tier1.sinks.HiveRunSink.channel = HiveRunChannel




tier1.sinks.StormWorkerSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.StormWorkerSink.topic = AUDIT_STORM_WC   
tier1.sinks.StormWorkerSink.brokerList  = *.*.*.*:9093
tier1.sinks.StormWorkerSink.batchSize = 1
tier1.sinks.StormWorkerSink.requiredAcks = 1
tier1.sinks.StormWorkerSink.channel = StormWorkerChannel




tier1.sinks.StormRunSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.StormRunSink.topic = RUN_STORM_WC 
tier1.sinks.StormRunSink.brokerList  = *.*.*.*:9093
tier1.sinks.StormRunSink.batchSize = 1
tier1.sinks.StormRunSink.requiredAcks = 1
tier1.sinks.StormRunSink.channel = StormRunChannel




tier1.sinks.YarnAuditSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.YarnAuditSink.topic = AUDIT_YARN_WC
tier1.sinks.YarnAuditSink.brokerList  = *.*.*.*:9093
tier1.sinks.YarnAuditSink.batchSize = 1
tier1.sinks.YarnAuditSink.requiredAcks = 1
tier1.sinks.YarnAuditSink.channel = YarnAuditChannel


tier1.sinks.YarnRunSink.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.YarnRunSink.topic = RUN_YARN_WC
tier1.sinks.YarnRunSink.brokerList  = *.*.*.*:9093
tier1.sinks.YarnRunSink.batchSize = 1
tier1.sinks.YarnRunSink.requiredAcks = 1
tier1.sinks.YarnRunSink.channel = YarnRunChannel




tier1.channels.HbaseAuditChannel.type   = memory
tier1.channels.HbaseAuditChannel.capacity = 1000
tier1.channels.HbaseAuditChannel.transactionCapacity=100
tier1.channels.HbaseAuditChannel.byteCapacityBufferPercentage=20 


tier1.channels.HbaseRunChannel.type   = memory
tier1.channels.HbaseRunChannel.capacity = 1000
tier1.channels.HbaseRunChannel.transactionCapacity=100
tier1.channels.HbaseRunChannel.byteCapacityBufferPercentage=20 


tier1.channels.HdfsAuditChannel.type   = memory
tier1.channels.HdfsAuditChannel.capacity = 1000
tier1.channels.HdfsAuditChannel.transactionCapacity=100
tier1.channels.HdfsAuditChannel.byteCapacityBufferPercentage=20 


tier1.channels.HdfsRunChannel.type   = memory
tier1.channels.HdfsRunChannel.capacity = 1000
tier1.channels.HdfsRunChannel.transactionCapacity=100
tier1.channels.HdfsRunChannel.byteCapacityBufferPercentage=20


tier1.channels.HiveAuditChannel.type   = memory
tier1.channels.HiveAuditChannel.capacity = 1000
tier1.channels.HiveAuditChannel.transactionCapacity=100
tier1.channels.HiveAuditChannel.byteCapacityBufferPercentage=20 


tier1.channels.HiveRunChannel.type   = memory
tier1.channels.HiveRunChannel.capacity = 1000
tier1.channels.HiveRunChannel.transactionCapacity=100
tier1.channels.HiveRunChannel.byteCapacityBufferPercentage=20 


tier1.channels.StormWorkerChannel.type   = memory
tier1.channels.StormWorkerChannel.capacity = 1000
tier1.channels.StormWorkerChannel.transactionCapacity=100
tier1.channels.StormWorkerChannel.byteCapacityBufferPercentage=20 


tier1.channels.StormRunChannel.type   = memory
tier1.channels.StormRunChannel.capacity = 1000
tier1.channels.StormRunChannel.transactionCapacity=100
tier1.channels.StormRunChannel.byteCapacityBufferPercentage=20


tier1.channels.YarnAuditChannel.type   = memory
tier1.channels.YarnAuditChannel.capacity = 1000
tier1.channels.YarnAuditChannel.transactionCapacity=100
tier1.channels.YarnAuditChannel.byteCapacityBufferPercentage=20 


tier1.channels.YarnRunChannel.type   = memory
tier1.channels.YarnRunChannel.capacity = 1000
tier1.channels.YarnRunChannel.transactionCapacity=100
tier1.channels.YarnRunChannel.byteCapacityBufferPercentage=20