1. 程式人生 > >Flume監控檔案內容拉取到Kafka消費

Flume監控檔案內容拉取到Kafka消費

1 zookeeper的安裝配置

1.1 ZK配置(安裝過程省略)
1.1.1安裝完成後進入到zk的安裝目錄下的conf目錄

[[email protected] ~]$ cd /home/install/zookeeper/conf
[[email protected] conf]$ 

1.1.2重新命名zoo_sample.cfg 為zoo.cfg

[[email protected] conf]$ mv zoo_sample.cfg zoo.cfg

1.1.3修改zoo.conf配置檔案

[[email protected]
conf]$ vi zoo.cfg
# example sakes.
# 設定zookeeper的資料存放路徑
dataDir=/home/hadoop/install/zookeeper/data

..............
# 配置zookeeper叢集地址 第一個埠用於選舉leader
# 第二個埠用於leader宕機以後再次選舉新的leader
server.1=192.168.13.128:2888:3888
server.2=192.168.13.129:2888:3888
server.3=192.168.13.131:2888:3888

1.1.4建立zookeeper的資料存放路徑並將id寫入到myid檔案(需要手動建立myid檔案)

[[email protected] conf]$ cd ../
[[email protected] zookeeper]$ mkdir data
[[email protected] zookeeper]$ echo 1 >> data/myid

1.2叢集同步zookeeper資料夾(所有檔案都會被同步)

[[email protected] zookeeper]$ cd ../
[[email protected] install]$ xsync zookeeper/

xsyc是個同步指令碼,指令碼內容詳見
https://blog.csdn.net/huoliangwu/article/details/84591893

1.3分別啟動叢集上的zookeeper 並檢視狀態

[[email protected] zookeeper]$ ./bin/zkServer.sh start
[[email protected] zookeeper]$ ./bin/zkServer.sh status

下次寫一個zookeeper叢集啟動的指令碼

2 Kafka 安裝配置

2.1 Kafka配置(安裝過程省略)
2.1.1Kafka安裝目錄下建立logs目錄

[[email protected] kafka]$ mkdir logs

2.1.2修改配置檔案

[[email protected] kafka]$ cd config/
[[email protected] config]$ vi server.properties
#broker的全域性唯一編號,不能重複
broker.id=0
#是否允許刪除topic
delete.topic.enable=true
#處理網路請求的執行緒數量
num.network.threads=3
#用來處理磁碟IO的執行緒數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩衝區大小
socket.request.max.bytes=104857600
#kafka執行日誌存放的路徑
log.dirs=/home/hadoop/install/kafka/logs
#topic在當前broker上的分割槽個數
num.partitions=1
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連線Zookeeper叢集地址
zookeeper.connect=mini01:2181,mini02:2181,mini03:2181

2.2分發Kafka安裝後的目錄

[[email protected] config]$ cd ../../
[[email protected] install]$ xsync kafka/

2.3分別修改叢集其他機器上的配置檔案 修改broker.id broker.id不得重複

mini02       broker.id=1
mini03       broker.id=2

2.4啟動叢集

[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
3 Flume安裝配置

3.1 ZFlume配置(安裝過程省略)
修改 flume-env.sh 配置檔案,主要是JAVA_HOME變數設定

# during Flume startup.

# Enviroment variables can be set here.

export JAVA_HOME=/home/hadoop/install/jdk1.8.0_111.jdk/

3.2驗證是否安裝成功

[[email protected] flume]$ bin/flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

出現提示便表示安裝成功

Flume拉取檔案資料到Kafka消費訊息

新建flume配置檔案 flume2kafka.conf

[[email protected] flume] vi conf/flume2kafka.conf
#定義了當前agent的名字叫做a1
a1.sources = r1        
a1.sinks = k1        
a1.channels = c1 

# Describe/configure the source
a1.sources.r1.type = exec        
a1.sources.r1.command = tail -F /home/hadoop/logs.tsv
a1.sources.r1.shell=/bin/sh -c

# Describe the sink   
a1.sinks.k1.type = logger  

# Use a channel which buffers events in memory
a1.channels.c1.type = memory                
a1.channels.c1.capacity = 1000            
a1.channels.c1.transactionCapacity = 100    

# Bind the source and sink to the channel    
a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

# 指定Flume sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = mini01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

動態造資料

[[email protected] ~]$ test/getlog.sh 
16991082028	餘建堂	18401456522	楊佔昊	46-08-04 15:07:46	1182

19641660102	劉洋	13059125383	郭振君	75-10-14 04:42:31	3926

14361606522	劉優	14692570569	陳猛	33-07-01 13:57:10	1700

17755364600	霍風浩	13059125383	郭振君	90-04-12 14:32:53	5587

15093813308	賈明燦	15060932038	閔強	90-02-11 04:42:22	1416

19641660102	劉洋	18506948961	冀纓菲	25-06-24 06:19:43	2622

15060932038	閔強	13305040991	高永斌	05-05-13 21:10:10	5015

13113007783	孫良明	14692570569	陳猛	94-08-12 03:35:48	3909

啟動 flume

[[email protected] flume]$ bin/flume-ng agent -c conf -f conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console

啟動kafka消費者

[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper mini01:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
13658626467	劉海濤	13288940364	賈鑫瑜	61-12-23 17:35:42	2615
19920594188	段雪鵬	17755364600	霍風浩	74-11-18 17:42:22	6740
17533432302	張文舉	14865818526	常天罡	05-05-12 06:43:29	2569
15142556083	趙曉露	18491428393	張苗	51-02-03 20:39:20	2719
13305040991	高永斌	14692570569	陳猛	75-08-08 18:50:56	5506
19641660102	劉洋		14385342683	陳凱	49-09-22 04:50:07	3719