Windows64環境下 使用Flume將Mysql增量資料同步到Kafka
一.軟體準備
1.jdk1.7
4.maven 下載地址
二.安裝並啟動Kafka
1.安裝kafka
此步驟看文章,比較詳細,相信不會有問題
2.按順序啟動kafka(windows cmd下)
2.1 zookeeper守護程序
zookeeper-server-start.bat ../../config/zookeeper.properties
2.2 kafka守護程序
kafka-server-start.bat ../../config/server.properties
2.3 建立kafka topic(mysqltest為topic名)
kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic mysqltest
2.4 傳送訊息
kafka-console-producer.bat --broker-list localhost:9092 --topic mysqltest
2.5 接受訊息
kafka-console-consumer.bat --zookeeper localhost:2181 --topic mysqltest--from-beginning
2.6 測試kafka是否成功
在2.4步驟下的cmd中傳送任意字串看2.5步驟下的cmd能否接收到,能同步說明kafka訊息佇列安裝沒有問題
3.安裝並啟動flume
flume目前為止不支援db資料來源同步到kafka,但有第三方的外掛。
3.1 外掛編譯
cmd下使用mvn compile以及mvn package命令編譯並打包到target目錄下(-Dmaven.test.skip=true 跳過test)
3.2 拷貝jar包
在flume根目錄下新建一個資料夾libExt
將3.1打包的flume-ng-sql-source-1.5.1-SNAPSHOT.jar以及mysql驅動包mysql-connector-java-5.0.5-bin.jar拷貝到libExt下
3.3 編寫.properties檔案
在flume根目錄conf目錄下新建config.properties檔案
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/test
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = admin
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = D://mylab//flume//apache-flume-1.6.0-bin
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,name from user_table where id > [email protected]$ order by id asc
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
##############################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mysqltest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
3.4 啟動flume-ng
windows cmd下啟動命令:
java.exe -Xmx100m -Dlog4j.configuration=file:///D:\mylab\flume\apache-flume-1.6.0-bin\conf\log4j.properties -cp "D:\mylab\flume\apache-flume-1.6.0-bin\lib\*;D:\mylab\flume\apache-flume-1.6.0-bin\libExt\flume-ng-sql-source-1.5.1-SNAPSHOT.jar;D:\mylab\flume\apache-flume-1.6.0-bin\\libExt\mysql-connector-java-5.0.5-bin.jar" org.apache.flume.node.Application -f D:\mylab\flume\apache-flume-1.6.0-bin\conf\config.properties -n a1
4.測試同步
成功!
PS:啟動flume不成功,檢視flume根目錄下conf下log4j.properties下配置的目錄下的日誌。