1. 程式人生 > >flume讀取binlog與kafka整合

flume讀取binlog與kafka整合

一、現將kafka調通

 

檢視zookeeper的topic

cd /usr/software/zookeeper/zookeeper/bin

./zkCli.sh start

ls /brokers/topics

 

先來說一下,刪除kafka無用topic

./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test

 

如果按照上述走是可以看到訊息的消費的。

 

 

啟動kafka

bin/kafka-server-start.sh config/server.properties

 

建立話題

./kafka-create-topic.sh -partition 1 -replica 1 -zookeeper centos1:2181 -topic test

 

 

看一下話題

./kafka-list-topic.sh -zookeeper centos1:2181

 

開啟producer話題

./kafka-console-producer.sh -broker-list centos1:9092 -topic test

 

開啟consumer

./kafka-console-consumer.sh -zookeeper centos1:2181 -topic test

 

 

 

 

二、再將flume調通

bin/flume-ng agent -c conf -f conf/flume-conf.properties -n sync &

 -c:表示配置檔案的目錄,在此我們配置了flume-env.sh,也在conf目錄下;

 -f:指定配置檔案,這個配置檔案必須在全域性選項的--conf引數定義的目錄下,就是說這個配置檔案要在前面配置的conf目錄下面;

 -n:表示要啟動的agent的名稱,也就是我們flume.properties配置檔案裡面,配置項的字首,這裡我們配的字首是【sync】;

 

結果遇到報錯:

[[email protected] flume]# Info: Sourcing environment configuration script /usr/software/flume/conf/flume-env.sh
Info: Including Hive libraries found via () for Hive access
+ exec /opt/jdk1.8.0_181/bin/java -Xms100m -Xmx200m -Dcom.sun.management.jmxremote -cp '/usr/software/flume/conf:/usr/local/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume-conf.properties -n sync
錯誤: 找不到或無法載入主類 org.apache.flume.node.Application

 

flume啟動Could not find or load main class org.apache.flume.node.Application 

 

修改flume的資料夾名稱後,啟動flume可能會失敗,錯誤資訊如下:
Error: Could not find or load main class org.apache.flume.node.Application
這個是因為環境變數的問題。 export看一下是不是有個FLUME_HOME的環境變數指向原來的資料夾,
如果是的話:


果然是環境變數配置錯了,然後修改過後

 source /etc/profile

 

 

此時我們向mysql表中開始插入資料發現consumer的客戶端中,沒有消費記錄。有以下截圖為準。

原因是日誌報錯。我們可以看一下flume日誌。

flume的日誌檔案配置不用我說了吧。

原因是缺少hibernate的配置項,看一下配置檔案,果然。

所以現在需要修改,參考文件修改吧。

 

 

 

我們先來看看配置檔案的寫法


a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1

###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://centos1:3306/hr

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = 
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.table = ef_arap
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = /usr/software/flume/logs
a1.sources.src-1.status.file.name = sqlSource.status

# Custom query
a1.sources.src-1.columns.to.select = *

a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

##############################

a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = centos1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1


a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

 

 

 

然後我們首先jps殺死以前的flume程序。

緊接著我們啟動。

./bin/flume-ng agent -c conf -f conf/flume-conf.properties -n a1 &

 

現在可以看到日誌是成功的。

可以看到consumer端已經收到了。

超級激動啊!但是很快日誌就有報錯了。

我們看一下到底是什麼報錯吧。

 

又有報錯。

Cannot commit transaction. Byte capacity allocated to store event body 640000.0reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
 

這個錯誤應該是調優的問題了。

沒有問題了。

 

參考文章:

kafka

https://www.cnblogs.com/xiaodf/p/6093261.html#4

 

flume

https://blog.csdn.net/qaz1qaz1qaz2/article/details/52825459