1. 程式人生 > >ES 多channel、多sink

ES 多channel、多sink

1.配置檔案

a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

# Describe/configure the source
a1.sources.r1.type = jsyh.forward.source.kafka.KafkaSource
#a1.sources.r1.statServiceType = application
a1.sources.r1.zookeeperConnect = 192.168.1.33:2180,192.168.1.33:2181,192.168.1.33:2182
a1.sources.r1.topic = jsbank123
a1.sources.r1.groupId = forward
a1.sources.r1.kafka.consumer.timeout.ms = 100
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = ck
a1.sources.r1.selector.mapping.0 = c1
a1.sources.r1.selector.mapping.1 = c2
a1.sources.r1.selector.mapping.2 = c3
a1.sources.r1.selector.default = c3

# Describe the sink
a1.sinks.k1.type = jsyh.forward.sink.elasticsearch2.AppElasticSearchSink
a1.sinks.k1.hostNames = 192.168.1.35:9300,192.168.1.35:9301,192.168.1.35:9302
a1.sinks.k1.indexName = jsbank_app_prim
#a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = els5-35
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = jsyh.forward.sink.elasticsearch2.AppElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = jsyh.forward.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.sinkId = 1

a1.sinks.k2.type = jsyh.forward.sink.elasticsearch2.AppElasticSearchSink
a1.sinks.k2.hostNames = 192.168.1.35:9300,192.168.1.35:9301,192.168.1.35:9302
a1.sinks.k2.indexName = jsbank_app_prim
#a1.sinks.k2.indexType = bar_type
a1.sinks.k2.clusterName = els5-35
a1.sinks.k2.batchSize = 500
a1.sinks.k2.ttl = 5d
a1.sinks.k2.serializer = jsyh.forward.sink.elasticsearch2.AppElasticSearchDynamicSerializer
a1.sinks.k2.indexNameBuilder = jsyh.forward.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k2.sinkId = 2

a1.sinks.k3.type = jsyh.forward.sink.elasticsearch2.AppElasticSearchSink
a1.sinks.k3.hostNames = 192.168.1.35:9300,192.168.1.35:9301,192.168.1.35:9302
a1.sinks.k3.indexName = jsbank_app_prim
#a1.sinks.k3.indexType = bar_type
a1.sinks.k3.clusterName = els5-35
a1.sinks.k3.batchSize = 500
a1.sinks.k3.ttl = 5d
a1.sinks.k3.serializer = jsyh.forward.sink.elasticsearch2.AppElasticSearchDynamicSerializer
a1.sinks.k3.indexNameBuilder = jsyh.forward.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k3.sinkId = 3

# Use a channel which buffers events in memory
a1.channels.c1.transactionCapacity = 100000
a1.channels.c1.maxFileSize = 104857600
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/ssd/dmw-apm/app-forward-agent2-channel/1/checkpoint
a1.channels.c1.dataDirs = /mnt/ssd/dmw-apm/app-forward-agent2-channel/1/data

a1.channels.c2.transactionCapacity = 100000
a1.channels.c2.maxFileSize = 104857600
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /mnt/ssd/dmw-apm/app-forward-agent2-channel/2/checkpoint
a1.channels.c2.dataDirs = /mnt/ssd/dmw-apm/app-forward-agent2-channel/2/data

a1.channels.c3.transactionCapacity = 100000
a1.channels.c3.maxFileSize = 104857600
a1.channels.c3.type = file
a1.channels.c3.checkpointDir = /mnt/ssd/dmw-apm/app-forward-agent2-channel/3/checkpoint
a1.channels.c3.dataDirs = /mnt/ssd/dmw-apm/app-forward-agent2-channel/3/data

# Bind the source and sink to the channel
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3

2.程式碼中新增:

private int channelSelectorKey = 0;
headers.put(KafkaSourceConstants.FLUME_CHANNEL_SELECTOR_KEY, "" + this.channelSelectorKey++);
if(this.channelSelectorKey >= KafkaSourceConstants.FLUME_CHANNEL_SELECTOR_KEY_MAX) {
     this.channelSelectorKey = 0;
}