1. 程式人生 > >Windows64環境下 使用Flume將Mysql增量資料同步到Kafka

Windows64環境下 使用Flume將Mysql增量資料同步到Kafka

一.軟體準備

1.jdk1.7

4.maven 下載地址 

二.安裝並啟動Kafka

1.安裝kafka

此步驟看文章,比較詳細,相信不會有問題

2.按順序啟動kafka(windows cmd下)

    2.1 zookeeper守護程序

    zookeeper-server-start.bat ../../config/zookeeper.properties

    2.2 kafka守護程序

    kafka-server-start.bat ../../config/server.properties

    2.3 建立kafka topic(mysqltest為topic名

    kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 

    --partitions 1 --topic mysqltest

2.4 傳送訊息

    kafka-console-producer.bat --broker-list localhost:9092 --topic mysqltest

2.5 接受訊息

    kafka-console-consumer.bat --zookeeper localhost:2181 --topic mysqltest--from-beginning

    2.6 測試kafka是否成功

    在2.4步驟下的cmd中傳送任意字串看2.5步驟下的cmd能否接收到,能同步說明kafka訊息佇列安裝沒有問題

3.安裝並啟動flume

flume目前為止不支援db資料來源同步到kafka,但有第三方的外掛。

3.1 外掛編譯

    cmd下使用mvn compile以及mvn package命令編譯並打包到target目錄下(-Dmaven.test.skip=true 跳過test)

    

    3.2 拷貝jar包

    在flume根目錄下新建一個資料夾libExt

    將3.1打包的flume-ng-sql-source-1.5.1-SNAPSHOT.jar以及mysql驅動包mysql-connector-java-5.0.5-bin.jar拷貝到libExt下

    3.3 編寫.properties檔案

    在flume根目錄conf目錄下新建config.properties檔案

    a1.channels = ch-1
    a1.sources = src-1
    a1.sinks = k1
    ###########sql source#################
    # For each one of the sources, the type is defined
    a1.sources.src-1.type = org.keedio.flume.source.SQLSource
    a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/test

    # Hibernate Database connection properties
    a1.sources.src-1.hibernate.connection.user = root
    a1.sources.src-1.hibernate.connection.password = admin
    a1.sources.src-1.hibernate.connection.autocommit = true
    a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    a1.sources.src-1.run.query.delay=10000
    a1.sources.src-1.status.file.path = D://mylab//flume//apache-flume-1.6.0-bin
    a1.sources.src-1.status.file.name = sqlSource.status


    # Custom query
    a1.sources.src-1.start.from = 0
    a1.sources.src-1.custom.query = select id,name from user_table where id > [email protected]$ order by id asc

    a1.sources.src-1.batch.size = 1000
    a1.sources.src-1.max.rows = 1000


    a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    a1.sources.src-1.hibernate.c3p0.min_size=1
    a1.sources.src-1.hibernate.c3p0.max_size=10

    ##############################

    a1.channels.ch-1.type = memory
    a1.channels.ch-1.capacity = 10000
    a1.channels.ch-1.transactionCapacity = 10000
    a1.channels.ch-1.byteCapacityBufferPercentage = 20
    a1.channels.ch-1.byteCapacity = 800000

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = mysqltest
    a1.sinks.k1.brokerList = localhost:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel = c1

    a1.sinks.k1.channel = ch-1

    a1.sources.src-1.channels=ch-1

    3.4 啟動flume-ng

    windows cmd下啟動命令:

    java.exe -Xmx100m -Dlog4j.configuration=file:///D:\mylab\flume\apache-flume-1.6.0-bin\conf\log4j.properties -cp "D:\mylab\flume\apache-flume-1.6.0-bin\lib\*;D:\mylab\flume\apache-flume-1.6.0-bin\libExt\flume-ng-sql-source-1.5.1-SNAPSHOT.jar;D:\mylab\flume\apache-flume-1.6.0-bin\\libExt\mysql-connector-java-5.0.5-bin.jar" org.apache.flume.node.Application -f D:\mylab\flume\apache-flume-1.6.0-bin\conf\config.properties -n a1

4.測試同步

    


    成功!

PS:啟動flume不成功,檢視flume根目錄下conf下log4j.properties下配置的目錄下的日誌。