1. 程式人生 > >阿里Canal框架(資料同步中介軟體)初步實踐

阿里Canal框架(資料同步中介軟體)初步實踐

最近在工作中需要處理一些大資料量同步的場景,正好運用到了canal這款資料庫中介軟體,因此特意花了點時間來進行該中介軟體的的學習和總結。

背景介紹

早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。

適用版本

支援mysql5.7及以下版本

傳統的主從同步原理

master將資料記錄到了binlog日誌裡面,然後slave會通過一個io執行緒去讀取master那邊指定位置點開始的binlog日誌內容,並將相應的資訊寫會到slave這邊的relay日誌裡面,最後slave會有單獨的sql執行緒來讀取這些master那邊執行的sql語句記錄,達成兩端的資料同步。

傳統的mysql主從同步實現的原理圖如下所示:

Canal中介軟體功能

基於純java語言開發,可以用於做增量資料訂閱和消費功能。

相比於傳統的資料同步,我們通常需要進行先搭建主從架構,然後使用binlog日誌進行讀取,然後指定需要同步的資料庫,資料庫表等資訊。但是隨著我們業務的不斷複雜,這種傳統的資料同步方式以及開始變得較為繁瑣,不夠靈活。

canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議mysql master收到dump請求,開始推送binary log給slave(也就是canal),canal解析binary log物件(原始為byte流),通過對binlog資料進行解析即可獲取需要同步的資料,在進行同步資料的過程中還可以加入開發人員的一些額外邏輯處理,比較開放。

Binlog的三種基本型別分別為:

STATEMENT模式只記錄了sql語句,但是沒有記錄上下文資訊,在進行資料恢復的時候可能會導致資料的丟失情況

ROW模式除了記錄sql語句之外,還會記錄每個欄位的變化情況,能夠清楚的記錄每行資料的變化歷史,但是會佔用較多的空間,需要使用mysqlbinlog工具進行檢視。

MIX模式比較靈活的記錄,例如說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了資料更新或者刪除情況下就會變為row模式

Canal環境搭建

需要先登入mysql資料庫,檢查binlog功能是否有開啟。

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF    |
+---------------+-------+
1 row in set (0.00 sec)

 

如果顯示狀態為OFF表示該功能未開啟,那麼這個時候就需要到my.ini裡面進行相關配置了,在原來的my.ini配置底部插入以下內容:

server-id=192
log-bin=mysql-bin
binlog_format = ROW

 

當再次通過客戶端檢視log_bin狀態為ON的時候,就表示binlog已經開啟:

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

 

然後在mysql裡面新增以下的相關使用者和許可權:

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

 

開啟之後,我們可以前往canal的官方地址進行相應版本的安裝包進行下載:
https://github.com/alibaba/canal/releases

下載好指定的版本之後,找到裡面的bin目錄底下的startup指令碼,啟動。

啟動之後會發現黑窗停止在這樣一行的內容上,然後就不動了

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Listening for transport dt_socket at address: 9099

 

這時候需要前往日誌資料夾底下canallogs,檢視canal日誌檔案是否已經開啟,如果顯示以下內容,就表示啟動已經成功

2019-05-06 10:41:56.116 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-05-06 10:41:56.144 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-05-06 10:41:56.145 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2019-05-06 10:41:56.233 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.164.1:11111]
2019-05-06 10:41:58.179 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now .....

 

canal server的預設埠號為:11111,如果需要調整的話,可以去到conf目錄底下的canal.properties檔案中進行修改。

啟動了canal的server之後,便是基於java的客戶端搭建了。

首先在canalconf目錄底下建立一個獨立的資料夾(檔案命名 idea_user_data),用於做額外的資料來源配置:


然後建立一份特定的properties檔案:(名稱最好為:instance.properties),這裡面只需要建立properties檔案即可,其餘幾份檔案會自動生成,instance.properties可以直接從example資料夾裡面進行copy。

 

首先是匯入相應的依賴檔案:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

 

單機版本的canal連線案例

單機版本的環境比較好搭建,相應的程式碼如下:

首先是canal客戶端的配置類

/**
 * @author idea
 * @date 2019/5/6
 * @Version V1.0
 */
public class CanalConfig {

    public static String CANAL_ADDRESS="127.0.0.1";

    public static int PORT=11111;

    public static String DESTINATION="idea_user_data";

    public static String FILTER=".*\..*";
}

 

客戶端程式碼:

package com.sise.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static com.sise.config.CanalConfig.*;

/**
 * @author idea
 * @date 2019/5/6
 * @Version V1.0
 */
public class CanalClient {


    private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

    public static void main(String args[]) {

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_ADDRESS,
                PORT), DESTINATION, "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(FILTER);
            connector.rollback();
            try {
                while (true) {
                    //嘗試從master那邊拉去資料batchSize條記錄,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        dataHandle(message.getEntries());
                    }
                    connector.ack(batchId);

                    //當佇列裡面堆積的sql大於一定數值的時候就模擬執行
                    if (SQL_QUEUE.size() >= 10) {
                        executeQueueSql();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }


    /**
     * 模擬執行佇列裡面的sql語句
     */
    public static void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            System.out.println("[sql]----> " + sql);
        }
    }

    /**
     * 資料處理
     *
     * @param entrys
     */
    private static void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
        for (Entry entry : entrys) {
            if (EntryType.ROWDATA == entry.getEntryType()) {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                EventType eventType = rowChange.getEventType();
                if (eventType == EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 儲存更新語句
     *
     * @param entry
     */
    private static void saveUpdateSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> newColumnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");
                for (int i = 0; i < newColumnList.size(); i++) {
                    sql.append(" " + newColumnList.get(i).getName()
                            + " = '" + newColumnList.get(i).getValue() + "'");
                    if (i != newColumnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                List<Column> oldColumnList = rowData.getBeforeColumnsList();
                for (Column column : oldColumnList) {
                    if (column.getIsKey()) {
                        //暫時只支援單一主鍵
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 儲存刪除語句
     *
     * @param entry
     */
    private static void saveDeleteSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getBeforeColumnsList();
                StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");
                for (Column column : columnList) {
                    if (column.getIsKey()) {
                        //暫時只支援單一主鍵
                        sql.append(column.getName() + "=" + column.getValue());
                        break;
                    }
                }
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 儲存插入語句
     *
     * @param entry
     */
    private static void saveInsertSql(Entry entry) {
        try {
            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            List<RowData> rowDatasList = rowChange.getRowDatasList();
            for (RowData rowData : rowDatasList) {
                List<Column> columnList = rowData.getAfterColumnsList();
                StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append(columnList.get(i).getName());
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(") VALUES (");
                for (int i = 0; i < columnList.size(); i++) {
                    sql.append("'" + columnList.get(i).getValue() + "'");
                    if (i != columnList.size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                SQL_QUEUE.add(sql.toString());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

}

 

啟動程式之後,我們對資料庫表進行10次左右的修改操作之後,便可以從控制檯中看到sql的列印資訊。

關於canal叢集搭建的一些坑

在實際開發中,如果只有一臺canal機器作為server,當該臺機器掛掉之後,服務就會終止,那麼這個時候我們便需要引入叢集部署的方式了。

搭建canal叢集的環境需要先搭建好相應的zk叢集模式。zk的叢集搭建網上資料很多,這裡就不進行講解了。

canal搭建叢集的一些資料可以參考以下連結:
https://github.com/alibaba/canal/wiki/AdminGuide

canal在搭建HA模式的時候有幾個容易掉坑的步驟:
canal.properties配置裡面需要新增zk的地址,同時canal.instance.global.spring.xml

需要修改為classpath:spring/default-instance.xml


每臺機子的canal裡面的具體instance所在目錄的名稱需要統一,每個例項都有對應的slaveId,他們的id需要保證不重複。搭建好了canal叢集環境之後,然後程式碼部分需要在連結的那個模組進行稍微的調整:

CanalConnector connector = CanalConnectors.newClusterConnector(CLUSTER_ADDRESS, DESTINATION, "", "");

 

為了保證master在某些特殊場景下掛掉,mysql需要搭建為雙M模式,那麼我們這個時候可以在每個canal機器的instance配置檔案中加入master的地址和standby的地址:

canal.instance.master.address=******
canal.instance.standby.address = ******

 

同時對於detecing也需要進行配置修改

canal.instance.detecting.enable = true ## 需要開啟心跳檢查
canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳檢查sql
canal.instance.detecting.interval.time = 3 ##心跳檢查頻率
canal.instance.detecting.retry.threshold = 3  ## 心跳檢查失敗次數閥值,當超過這個次數之後,就會自動切換到standby上邊的機器進行binlog的訂閱讀取
canal.instance.detecting.heartbeatHaEnable = true  ## 是否開啟master和standby的主動切換

 

ps: master和standby進行切換機器的時候可能會有時間延遲。

啟動2臺canal機器,可以在zk裡面檢視到canal註冊的節點資訊:


通過模擬測試,關閉當前埠為11111的canal機器,節點資訊會自動更換為第二臺canal進行替換:

ClusterCanalConnector和SimpleCanalConnector類發現了username和password的引數,但是似乎具體配置中並沒有做具體的設定,這是為什麼呢?

後來也在github上邊檢視到了一些網友的相關討論:

canal結合kafka傳送sql資料案例

pom依賴:

     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.1</version>
        </dependency>

 

kafka的配置類:

public class KafkaProperties
{
    public final static String ZK_CONNECTION = "XXX.XXX.XXX.XXX:2181";
    public final static String BROKER_LIST_ADDRESS = "XXX.XXX.XXX.XXX:9092";
    public final static String GROUP_ID = "group1";
    public final static String TOPIC = "USER-DATA";
}

 

關於kafka的環境搭建步驟比較簡單,網上有很多的資料,這裡就不多一一介紹了。
首先是kafka的producer部分程式碼:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;

import java.util.Properties;

import static com.sise.kafka.KafkaProperties.TOPIC;

/**
 * @author idea
 * @date 2019/5/7
 * @Version V1.0
 */
public class KafkaProducerDemo extends Thread {

    public static Logger log = Logger.getLogger(KafkaProducerDemo.class);

    //kafka的連結地址要使用hostname 預設9092埠
    private static final String BROKER_LIST = BROKER_LIST_ADDRESS;

    private static KafkaProducer<String, String> producer = null;


    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    /*
    初始化配置
     */
    private static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void sendMsg(String msg) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != e) {
                    log.info("send error" + e.getMessage());
                } else {
                    System.out.println("send success");
                }
            }
        });
    }

}

 

接著是consumer部分的程式碼:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author idea
 * @date 2019/5/7
 * @Version V1.0
 */
public class KafkaConsumerDemo extends Thread {

    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumerDemo(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK_CONNECTION);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("【receive】" + new String(it.next().message()));
        }
    }


}

 

然後需要在CanalClient 的executeQueueSql函數出進行部分功能的修改:

 /**
     * 給kafka傳送sql語句
     */
    public static void executeQueueSql() {
        int size = SQL_QUEUE.size();
        for (int i = 0; i < size; i++) {
            String sql = SQL_QUEUE.poll();
            //傳送sql給kafka
            KafkaProducerDemo.sendMsg(sql);
        }
    }

 

為了驗證程式是否正常,啟動canal和kafka之後,對canal監聽的資料庫裡面的表進行資料資訊的修改,然後canal會將修改的binlog裡面的sql放入佇列中,當佇列滿了之後便向kafka中進行傳送:


consumer端接受到資料之後控制檯便打印出相應內容:

 

&n