1. 程式人生 > >利用Flume將MySQL表資料準實時抽取到HDFS、MySQL、Kafka

利用Flume將MySQL表資料準實時抽取到HDFS、MySQL、Kafka

軟體版本號 jdk1.8、apache-flume-1.6.0-bin、kafka_2.8.0-0.8.0、zookeeper-3.4.5

叢集環境安裝請先測試; 

參考以下作者資訊,特此感謝;

http://blog.csdn.net/wzy0623/article/details/73650053

https://www.cnblogs.com/sunyaxue/p/6645415.html

需要向/usr/local/bigdata/apache-flume-1.6.0-bin/lib 放入三個jar包

flume-ng-sql-source-1.3.7.jar   -->flume的mysql source 下載地址: https://github.com/keedio/flume-ng-sql-source/

mysql-connector-java-5.1.35.jar -->mysql 驅動包 應該都有吧。

flume-mysql-sink-1.0-SNAPSHOT.jar -->自定義的mysql sink參考第二個網址;打的jar包

三個jar包的下載地址: http://download.csdn.net/download/kongfanyu/10271848

重點是flume的配置檔案資訊如下:

agent.channels = ch1 ch2 ch3
agent.sinks = HDFS mysqlSink kfk
agent.sources = sql-source


agent.sources.sql-source.type = org.keedio.flume.source.SQLSource
agent.sources.sql-source.channels = ch1 ch2 ch3
agent.sources.sql-source.connection.url = jdbc:mysql://192.168.2.164:3306/test
agent.sources.sql-source.user = root
agent.sources.sql-source.password = root
agent.sources.sql-source.table = wlslog
agent.sources.sql-source.columns.to.select = *
agent.sources.sql-source.incremental.column.name = id
agent.sources.sql-source.incremental.value = 0
agent.sources.sql-source.run.query.delay=5000
agent.sources.sql-source.status.file.path = /var/lib/flume
agent.sources.sql-source.status.file.name = sql-source.status
#ch1
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 1000
agent.channels.ch1.transactionCapacity = 100
#ch2
agent.channels.ch2.type = memory
agent.channels.ch2.capacity = 1000
agent.channels.ch2.transactionCapacity = 100
#ch3
agent.channels.ch3.type = memory
agent.channels.ch3.capacity = 1000
agent.channels.ch3.transactionCapacity = 100


agent.sinks.HDFS.channel = ch1
agent.sinks.HDFS.type = hdfs
agent.sinks.HDFS.hdfs.path = hdfs://dbserver:9000/flume/mysql3
agent.sinks.HDFS.hdfs.fileType = DataStream
agent.sinks.HDFS.hdfs.writeFormat = Text
agent.sinks.HDFS.hdfs.rollSize = 268435456
agent.sinks.HDFS.hdfs.rollInterval = 0
agent.sinks.HDFS.hdfs.rollCount = 0


# 自定義的 mysqlSink
agent.sinks.mysqlSink.type = org.flume.mysql.sink.MysqlSink
agent.sinks.mysqlSink.url=jdbc:mysql://192.168.2.171:3306/test
agent.sinks.mysqlSink.user=root
agent.sinks.mysqlSink.password=123456
agent.sinks.mysqlSink.tableName=wlslog
agent.sinks.mysqlSink.column_name=id,time_stamp,category,type,servername,code,msg
agent.sinks.mysqlSink.channel = ch2


# 配置 kafka sink
agent.sinks.kfk.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kfk.brokerList=192.168.2.171:9092,192.168.2.172:9092,192.168.2.173:9092,192.168.2.174:9092,192.168.2.175:9092
agent.sinks.kfk.topic=mytopic
agent.sinks.kfk.requiredAcks = 1
agent.sinks.kfk.batchSize = 2
agent.sinks.kfk.channel = ch3

有問題留言.