1. 程式人生 > >大資料系列之分散式釋出訂閱訊息系統Kafka(四)Kafka與Flume的3種整合

大資料系列之分散式釋出訂閱訊息系統Kafka(四)Kafka與Flume的3種整合

前面我們已經介紹了Flume,現在我們將Kafka與Flume整合

先看一下Flume的結構組成:

          

我們可以發現,將Flume與Kafka進行整合無非3種情況,Flume作為生產者——Sink輸出到Kafka,Flume作為消費者——Source接受Kafka的輸出,Flume既做生產者也做消費者,格式為Source接受資料——Kafka(作為Flume的Channel)——Sink輸出資料,其中第一種方式最常用Flume作為生產者——Sink輸出到Kafka。

1.Flume作為生產者——Sink輸出到Kafka(用的最多,基本用的是這種整合方式)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = s10:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

                                          

               

2.Flume作為消費者——Source接受Kafka的輸出

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = s10:9092
a1.sources.r1.kafka.topics = test
a1.sources.r1.kafka.consumer.group.id = g4

a1.sinks.k1.type = logger

a1.channels.c1.type=memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
                     

          

3.Flume既做生產者也做消費者

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sinks.k1.type = logger

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = s10:9092
a1.channels.c1.kafka.topic = test
a1.channels.c1.kafka.consumer.group.id = g6
a1.channels.c1.parseAsFlumeEvent = false
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1