Mysql 流增量寫入 Hdfs(一) --從 mysql 到 kafka
一. 概述
在大資料的靜態資料處理中,目前普遍採用的是用 Spark + Hdfs (Hive / Hbase) 的技術架構來對資料進行處理。
但有時候有其他的需求,需要從其他不同資料來源不間斷得采集資料,然後儲存到 Hdfs 中進行處理。而追加(append)這種操作在 Hdfs 裡面明顯是比較麻煩的一件事。所幸有了 Storm 這麼個流資料處理這樣的東西問世,可以幫我們解決這些問題。
不過光有 Storm 還不夠,我們還需要其他中介軟體來協助我們,讓所有其他資料來源都歸於一個通道。這樣就能實現不同資料來源以及 Hhdfs 之間的解耦。而這個中介軟體 Kafka 無疑是一個很好的選擇。
這樣我們就可以讓 Mysql 的增量資料不停得丟擲到 Kafka ,而後再讓 storm 不停得從 Kafka 對應的 Topic 讀取資料並寫入到 Hdfs 中。
二. 基本知識
2.1 Mysql binlog 介紹
binlog 即 Mysql 的二進位制日誌。它可以說是 Mysql 最重要的日誌了, 它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄 ,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。
上面所說的提到了 DDL 和 DML ,可能有些同學不瞭解,這裡順便說一下:
- DDL:資料定義語言DDL用來建立資料庫中的各種物件-----表、檢視、索引、同義詞、聚簇等如:CREATE TABLE/VIEW/INDEX/SYN/CLUSTER...
- DML:資料操縱語言DML主要有三種形式:插入(INSERT), 更新(UPDATE),以及刪除(DELETE)。
在 Mysql 中,binlog 預設是不開啟的,因為有大約 1% (官方說法)的效能損耗,如果要手動開啟,流程如下:
- vi編輯開啟mysql配置檔案:
vi /usr/local/mysql/etc/my.cnf
在[mysqld] 區塊設定/新增如下,
log-bin=mysql-bin
注意一定要在 [mysqld] 下。
- 重啟 Mysql
pkill mysqld /usr/local/mysql/bin/mysqld_safe --user=mysql &
2.2 kafka
這裡只對 Kafka 做一個基本的介紹,更多的內容可以度娘一波。
上面的圖片是 kafka 官方的一個圖片,我們目前只需要關注 Producers 和 Consumers 就行了。
Kafka 是一個分散式釋出-訂閱訊息系統。分散式方面由 Zookeeper 進行協同處理。訊息訂閱其實說白了吧,就是一個佇列,分為消費者和生產者,就像上圖中的內容,有資料來源充當 Producer 生產資料到 kafka 中,而有資料充當 Consumers ,消費 kafka 中的資料。
上圖中的 offset 指的是資料的寫入以及消費的位置的資訊,這是由 Zookeeper 管理的。也就是說,當 Consumers 重啟或是怎樣,需要重新從 kafka 讀取訊息時,總不能讓它從頭開始消費資料吧,這時候就需要有個記錄能告訴你從哪裡開始重新讀取。這就是 offset 。
kafka 中還有一個至關重要的概念,那就是 topic 。不過這個其實還是很好理解的,比如你要訂閱一些訊息,你肯定是不會訂閱所有訊息的吧,你只需要訂閱你感興趣的主題,比如攝影,程式設計,搞笑這些主題。而這裡主題的概念其實和 topic 是一樣的。 總之,可以將 topic 歸結為通道,kafka 中有很多個通道,不同的 Producer 向其中一個通道生產資料,也就是拋資料進去這個通道,Comsumers 不停得消費通道中的資料。
而我們要做的就是將 Mysql binlog 產生的資料拋到 kafka 中充當作生產者,然後由 storm 充當消費者,不停得消費資料並寫入到 Hdfs 中。
至於怎麼將 binlog 的資料拋到 kafka ,別急,下面我們就來介紹。
2.3 maxwell
maxwell 這個工具可以很方便得監聽 Mysql 的 binlog , 然後每當 binlog 發生變化時,就會以 json 格式丟擲對應的變化資料到 Kafka 中。 比如當向 mysql 一張表中插入一條語句的時候,maxwell 就會立刻監聽到 binlog 中有對應的記錄增加,然後將一些資訊包括插入的資料都轉化成 json 格式,然後拋到 kafka 指定的 topic 中。
ofollow,noindex" target="_blank">下載地址在這裡可以找到。
除了 Kafka 外,其實 maxwell 還支援寫入到其他各種中介軟體,比如 redis。
同時 maxwell 是比較輕量級的工具,只需要在 mysql 中新建一個數據庫供它記錄一些資訊,然後就可以直接執行。
三. 使用 maxwell 監聽 binlog
接下來我們將的是如果使用 maxwell ,讓它監聽 mysql 的 binlog 並拋到 kafka 中。maxwell 主要有兩種執行方式。一種是使用配置檔案,另一種則是在命令列中新增引數的方式執行。這裡追求方便,只使用命令列的方式進行演示。
這裡介紹一下簡單的將資料拋到 kafka 的命令列指令碼吧。
bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --port=3306
各項引數說明如下:
- user:mysql 使用者名稱
- password:mysql 密碼
- host:Mysql 地址
- producer:指定寫入的中介軟體型別,比如還有 redies
- kafka.bootstrap.servers:kafka 的地址
- kafka_topic:指明寫入到 kafka 哪個 topic
- port:mysql 埠
啟動之後,maxwell 便開始工作了,當然如果你想要讓這條命令可以在後臺執行的話,可以使用 Linux 的 nohup 命令,這裡就不多贅述,有需要百度即可。
這樣配置的話通常會將整個資料庫的增刪改都給拋到 kafka ,但這樣的需求顯然不常見,更常見的應該是具體監聽對某個庫的操作,或是某個表的操作。
在升級到 1.9.2(最新版本)後,maxwell 為我們提供這樣一個引數,讓我們可以輕鬆實現上述需求: --filter 。
這個引數通常包含兩個配置項,exclude 和 include。意思就是讓你指定排除哪些和包含哪些。比如我只想監聽 Adatabase 庫下的 Atable 表的變化。我可以這樣。
--filter='exclude: *.*, include: Adatabase.Atable'
這樣我們就可以輕鬆實現監聽 mysql binlog 的變化,並可以定製自己的需求。
OK,這一章我們介紹了 mysql binlog ,kafka 以及 maxwell 的一些內容,下一篇我們將會看到 storm 如何寫入 hdfs 以及定製一些策略。see you~~