1. 程式人生 > >Flume採集一個數據源對應多個channel,多個sink

Flume採集一個數據源對應多個channel,多個sink

Flume採集一個數據源對應多個channel,多個mysqlsink(自定義sink)

a1.sources= r1
a1.channels= c1
a1.sinks= s1 s2 s3

# flume 1.6.0
#a1.sources.r1.type = exec
#a1.sources.r2.shell = /bin/bash -c
#a1.sources.r1.command =
 tail -n +$(tail -n1 /var/log/flume/position.log) -F /data/test/test.log | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/var/log/flume/position.log";fflush("")}'
/var/log/flume/position.log - # flume-ng-1.6.0-cdh5.11.0 a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /var/log/flume/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/test/test.log a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.interceptors = i1 a1.sources.
r1.interceptors.i1.type =com.yimen.data.flume.sink.Interceptor$Builder a1.channels.c1.type = memory a1.channels.c1.capacity = 5000 a1.channels.c1.transactionCapacity = 1000 a1.channels.c2.type = memory a1.channels.c2.capacity = 5000 a1.channels.c2.transactionCapacity = 1000 a1.channels.c3.type =
memory a1.channels.c3.capacity = 5000 a1.channels.c3.transactionCapacity = 1000 a1.sinks.s1.channel = c1 #a1.sinks.s1.type = logger a1.sinks.s1.type = com.yimen.data.flume.sink.MysqlSink a1.sinks.s1.hostname = 192.168.10.190 a1.sinks.s1.port = 3306 a1.sinks.s1.username = root a1.sinks.s1.password = 123456 a1.sinks.s1.testtype = type1 a1.sinks.s1.databaseName = test a1.sinks.s1.tableName = table1 a1.sinks.s1.batchSize = 10 a1.sinks.s2.channel = c2 a1.sinks.s2.type = com.yimen.data.flume.sink.MysqlSink a1.sinks.s2.hostname = 192.168.10.190 a1.sinks.s2.port = 3306 a1.sinks.s2.username = root a1.sinks.s2.password = 123456 a1.sinks.s2.testtype = type2 a1.sinks.s2.databaseName = test a1.sinks.s2.tableName = table2 a1.sinks.s2.batchSize = 10 a1.sinks.s3.channel = c3 a1.sinks.s3.type = com.yimen.data.flume.sink.MysqlSink a1.sinks.s3.hostname = 192.168.10.190 a1.sinks.s3.port = 3306 a1.sinks.s3.username = root a1.sinks.s3.password = 123456 a1.sinks.s3.testtype = type3 a1.sinks.s3.databaseName = test a1.sinks.s3.tableName = table3 a1.sinks.s3.batchSize = 10

../bin/flume-ng agent -n a1 -c conf -f ../conf/test_mysql.conf -Dflume.root.logger=DEBUG,console