1. 程式人生 > >實時抽取mysql的資料工具----canal(一)

實時抽取mysql的資料工具----canal(一)

1、準備:

github:https://github.com/alibaba/canal

裡面有包括canal的文件,server端 client端的 例子 原始碼包等等。

2、canal概述:

canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。

早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支援mysql5.x和oracle部分版本的日誌解析


基於日誌增量訂閱&消費支援的業務:


資料庫映象
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務訊息

keyword:資料庫同步,增量訂閱&消費。

3、canal工作原理:

 

從上層來看,複製分成三步:


master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重做中繼日誌中的事件,將改變反映它自己的資料。

4、部署canal:

部署canal-server:

(1)開啟mysql的binlog功能,並配置binlog模式為row。

在my.cnf 加入如下:

    [mysqld]
    log-bin=mysql-bin #新增這一行就ok
    binlog-format=ROW #選擇row模式
    server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複

(2)在mysql中 配置canal資料庫管理使用者,配置相應許可權(repication許可權)

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

(3)下載canal https://github.com/alibaba/canal/releases

 解壓到相應資料夾

tar -zxvf canal canal

canal 檔案目錄結構

    drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
    drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
    drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
    drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs


修改配置 instance.properties

vim canal/conf/example/instance.properties


    #################################################
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
     
    # position info,需要改成自己的資料庫資訊
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.master.journal.name =
    canal.instance.master.position =
    canal.instance.master.timestamp =
     
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
     
    # username/password,需要改成自己的資料庫資訊
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName = canal_test
    canal.instance.connectionCharset = UTF-8
     
    # table regex
    canal.instance.filter.regex = .*\\..*
     
    #################################################


然後cd到bin目錄  啟動和停止canal-server

啟動  

./startup.sh

停止

./stop.sh


驗證啟動狀態,檢視log檔案

vim canal/log/canal/canal.log


    2014-07-18 10:21:08.525 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2014-07-18 10:21:08.609 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]
    2014-07-18 10:21:09.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......


上述日誌資訊顯示啟動canal成功

執行canal-client例項:

(1)建立例項maven工程

mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample


(2)新增pom依賴:

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.0.12</version>
    </dependency>

(3)更新依賴 mvn install

(4)canal-client.java  例項程式碼

    /**
     * Created by hp on 14-7-17.
     */
    import java.net.InetSocketAddress;
    import java.util.List;
     
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.alibaba.otter.canal.client.*;
    import org.jetbrains.annotations.NotNull;
     
    public class ClientSample {
     
        public static void main(String args[]) {
            // 建立連結
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                    11111), "example", "", "");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmtryCount = 1200;
                while (emptyCount < totalEmtryCount) {
                    Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
     
                    connector.ack(batchId); // 提交確認
                    // connector.rollback(batchId); // 處理失敗, 回滾資料
                }
     
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
     
        private static void printEntry(@NotNull List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
     
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
     
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
     
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
     
        private static void printColumn(@NotNull List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    }


(5)執行java例項

啟動後看到控制端資訊:

    empty count : 1
    empty count : 2
    empty count : 3
    empty count : 4


(6)觸發資料庫變更

    create table test (
    uid int (4) primary key not null auto_increment,
    name varchar(10) not null);
     
    insert into test (name) values('10');


(7)client 抓取mysql資訊:

    ================> binlog[mysql-bin.000016:3281] , name[canal_test,test] , eventType : INSERT
    uid : 7    update=false
    name : 10    update=false
    empty count : 1
    empty count : 2


5、部署過程中產生問題:

(1)啟動失敗,log日誌中地址正在使用

1、11111埠正在被佔用 可以用 ls -i:11111 檢視監聽程序誰佔用埠 或者 用 ps -ef | grep 11111 檢視哪個程序佔用埠號  然後 kill -9 程序號  殺掉佔用程序

2、可以編輯 canal/conf/canal.properties 中的埠號 ,改為不佔用的埠

(2)canal無法抓取mysql觸發資料庫改變的資訊

1、檢查mysql是否開啟binlog寫入功能  檢查binlog 是否為行模式。

show variables like "binlog_format"

2、檢查my.cnf 和 instance.properties 等配置檔案填寫資訊是否正確。

3、檢查client 程式碼 除錯例項程式碼

4、版本相容問題,canal 1.8 換成 canal 1.7 繼續測試

5、檢視所有日誌檔案 分析日誌