1. 程式人生 > >大資料專案之電信客服三

大資料專案之電信客服三

1.啟動Kafaka叢集

這裡的Kafka叢集搭建就不再說了,如果不會搭建可以看我之前的博文

首先啟動Zookeeper叢集,然後再啟動Kafka叢集

bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties

2.建立Kafka主題

bin/kafka-topics.sh --zookeeper cdh0:2181 --create --replication-factor 3 --partitions 3 --topic ctlog

3.檢視Kafka主題是否建立成功

bin/kafka-topics.sh --zookeeper cdh0:2181 --list

4.啟動一個Kafka的消費者,等待Flume的資訊的輸入

bin/kafka-console-consumer.sh --bootstrap-server cdh0:9092 --topic ctlog --from-beginning

5.配置Flume

建立ct_log.conf

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/package/log.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = cdh0:9092,cdh1:9092,cdh2:9092
a1.sinks.k1.topic = ctlog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6.執行Flume

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file testjob/ct_log.conf

 

到這裡基本就穩了,前面的生產資料程式碼在生產資料,Flume監控產生資料的檔案並將資料傳到Kafka,Kafka進行消費

這時候應該可以在Kafka的消費端看到資料的輸出