1. 程式人生 > >【技術分享】mysql資料庫binlog的增量訂閱 消費元件 canal

【技術分享】mysql資料庫binlog的增量訂閱 消費元件 canal

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow

也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!

               


來源:https://github.com/AlibabaTech/canal 

背景

早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支援mysql5.x和oracle部分版本的日誌解析

基於日誌增量訂閱&消費支援的業務:

  1. 資料庫映象
  2. 資料庫實時備份
  3. 多級索引 (賣家和買家各自分庫索引)
  4. search build
  5. 業務cache重新整理
  6. 價格變化等重要業務訊息

專案介紹

名稱:canal [kə'næl]

譯意: 水道/管道/溝渠

語言: 純java開發

定位: 基於資料庫增量日誌解析,提供增量資料訂閱&消費,目前主要支援了mysql

工作原理

mysql主備複製實現


從上層來看,複製分成三步:

  1. master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
  2. slave將master的binary log events拷貝到它的中繼日誌(relay log);
  3. slave重做中繼日誌中的事件,將改變反映它自己的資料。

canal的工作原理:

原理相對比較簡單:

  1. canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log物件(原始為byte流)

架構

說明:

  • server代表一個canal執行例項,對應於一個jvm
  • instance對應於一個數據佇列 (1個server對應1..n個instance)

instance模組:

  • eventParser (資料來源接入,模擬slave協議和master進行互動,協議解析)
  • eventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)
  • eventStore (資料儲存)
  • metaManager (增量訂閱&消費資訊管理器)

資料物件格式:EntryProtocol.proto

Entry    Header        logfileName [binlog檔名]        logfileOffset [binlog position]        executeTime [發生的變更]        schemaName         tableName        eventType [insert/update/delete型別]    entryType   [事務頭BEGIN/事務尾END/資料ROWDATA]    storeValue  [byte資料,可展開,對應的型別為RowChange]    RowChange    isDdl       [是否是ddl變更操作,比如create table/drop table]    sql     [具體的ddl sql]    rowDatas    [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]        beforeColumns [Column型別的陣列]        afterColumns [Column型別的陣列]        Column     index           sqlType     [jdbc type]    name        [column name]    isKey       [是否為主鍵]    updated     [是否發生過變更]    isNull      [值是否為null]    value       [具體的內容,注意為文字]

說明:

  • 可以提供資料庫變更前和變更後的欄位內容,針對binlog中沒有的name,isKey等資訊進行補全
  • 可以提供ddl的變更語句

QuickStart

幾點說明:(mysql初始化)

a. canal的原理是基於mysql binlog技術,所以這裡一定需要開啟mysql的binlog寫入功能,並且配置binlog模式為row.

[mysqld]log-bin=mysql-bin #新增這一行就okbinlog-format=ROW #選擇row模式server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複
b. canal的原理是模擬自己為mysql slave,所以這裡一定需要做為mysql slave的相關許可權.
CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;

針對已有的賬戶可通過grant

啟動步驟:

1. 下載canal

下載部署包

wget http://canal4mysql.googlecode.com/files/canal.deployer-1.0.1.tar.gz

or

自己編譯

git clone [email protected]:otter-projects/canal.gitcd canal; mvn clean install -Dmaven.test.skip -Denv=release

編譯完成後,會在根目錄下產生target/canal.deployer-$version.tar.gz

2. 解壓縮

mkdir /tmp/canaltar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解壓完成後,進入/tmp/canal目錄,可以看到如下結構:

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bindrwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 confdrwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 libdrwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

3. 配置修改

應用引數:

vi conf/example/instance.properties
################################################### mysql serverIdcanal.instance.mysql.slaveId = 1234# position infocanal.instance.master.address = 127.0.0.1:3306 #改成自己的資料庫地址canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = # username/passwordcanal.instance.dbUsername = canal  #改成自己的資料庫資訊canal.instance.dbPassword = canal  #改成自己的資料庫資訊canal.instance.defaultDatabaseName =   #改成自己的資料庫資訊canal.instance.connectionCharset = UTF-8  #改成自己的資料庫資訊# table regexcanal.instance.filter.regex = .*\\..*#################################################

說明:

  • canal.instance.connectionCharset 代表資料庫的編碼方式對應到java中的編碼型別,比如UTF-8,GBK , ISO-8859-1

4. 準備啟動

sh bin/startup.sh

5. 檢視日誌

vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

具體instance的日誌:

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

6. 關閉

sh bin/stop.sh

it's over.

ClientExample

依賴配置:(目前暫未正式釋出到mvn倉庫,所以需要各位下載canal原始碼後手工執行下mvn clean install -Dmaven.test.skip)

<dependency>    <groupId>com.alibaba.otter</groupId>    <artifactId>canal.client</artifactId>    <version>1.0.0</version></dependency>

1. 建立mvn標準工程:

mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

2. 修改pom.xml,新增依賴

3. ClientSample程式碼

package com.alibaba.otter.canal.sample;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {    public static void main(String args[]) {        // 建立連結        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),                                                                                            11111), "example", "", "");        int batchSize = 1000;        int emptyCount = 0;        try {            connector.connect();            connector.subscribe(".*\\..*");            connector.rollback();            int totalEmtryCount = 120;            while (emptyCount < totalEmtryCount) {                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料                long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    emptyCount++;                    System.out.println("empty count : " + emptyCount);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                    }                } else {                    emptyCount = 0;                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);                    printEntry(message.getEntries());                }                connector.ack(batchId); // 提交確認                // connector.rollback(batchId); // 處理失敗, 回滾資料            }            System.out.println("empty too many times, exit");        } finally {            connector.disconnect();        }    }    private static void printEntry(List<Entry> entrys) {        for (Entry entry : entrys) {            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {                continue;            }            RowChange rowChage = null;            try {                rowChage = RowChange.parseFrom(entry.getStoreValue());            } catch (Exception e) {                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),                                           e);            }            EventType eventType = rowChage.getEventType();            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",                                             entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                                             entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                                             eventType));            for (RowData rowData : rowChage.getRowDatasList()) {                if (eventType == EventType.DELETE) {                    printColumn(rowData.getBeforeColumnsList());                } else if (eventType == EventType.INSERT) {                    printColumn(rowData.getAfterColumnsList());                } else {                    System.out.println("-------> before");                    printColumn(rowData.getBeforeColumnsList());                    System.out.println("-------> after");                    printColumn(rowData.getAfterColumnsList());                }            }        }    }    private static void printColumn(List<Column> columns) {        for (Column column : columns) {            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());        }    }}

4. 執行Client

首先啟動Canal Server,可參加QuickStart : http://agapple.iteye.com/blogs/1796070

啟動Canal Client後,可以從控制檯從看到類似訊息:

empty count : 1empty count : 2empty count : 3empty count : 4

此時代表當前資料庫無變更資料

5. 觸發資料庫變更

mysql> use test;Database changedmysql> CREATE TABLE `xdual` (    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,    ->   PRIMARY KEY (`ID`)    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;Query OK, 0 rows affected (0.06 sec)mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以從控制檯中看到:

empty count : 1empty count : 2empty count : 3empty count : 4================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERTID : 4    update=trueX : 2013-02-05 23:29:46    update=true

最後:

整個程式碼在附件中可以下載,如有問題可及時聯絡。

canal.sample.tar.gz (2.2 KB)


軟體官網:https://github.com/AlibabaTech/canal



-------------------------------------------------------------------------------------

黑夜路人,一個關注開源技術、喜歡分享、樂於學習的程式設計師


部落格:http://blog.csdn.net/heiyeshuwu

微博:http://weibo.com/heiyeluren

微信:heiyeluren2012  

想獲取更多IT技術相關資訊,歡迎關注微信!

二維碼掃描快速關注微信:



           

給我老師的人工智慧教程打call!http://blog.csdn.net/jiangjunshow

這裡寫圖片描述