1. 程式人生 > >微服務MySQL分庫分表資料到MongoDB同步方案[轉]

微服務MySQL分庫分表資料到MongoDB同步方案[轉]

需求背景

近年來,微服務概念持續火熱,網路上針對微服務和單體架構的討論也是越來越多,面對日益增長的業務需求是,很多公司做技術架構升級時優先選用微服務方式。我所在公司也是選的這個方向來升級技術架構,以支撐更大訪問量和更方便的業務擴充套件。
發現問題

微服務拆分主要分兩種方式:拆分業務系統不拆分資料庫,拆分業務系統拆分庫。如果資料規模小的話大可不必拆分資料庫,因為拆分資料看必將面對多維度資料查詢,跨程序之間的事務等問題。而我所在公司隨著業務發展單資料庫例項已經不能滿足業務需要,所以選擇了拆分業務系統同時拆分資料庫的模式,所以也面臨著以上的問題。本文主要介紹多維度資料實時查詢解決方案。當前系統架構和儲存結構如下:

image
解決思路

要對多資料庫資料進行查詢,首先就需要將資料庫同步到一起以方便查詢

為了滿足大資料量資料需求,所以優先選擇NOSQL資料庫做同步庫

NOSQL資料庫基本無法進行關聯查詢,所以需要將關係資料進行拼接操作,轉換成非關係型資料

業務多維度查詢需要實時性,所以需要選擇NOSQL中實時性相對比較好的資料庫:MongoDB

根據以上思路,總結資料整合架構如下圖所示:

image
解決方案

目前網上一些資料同步案例分兩種:MQ訊息同步和binlog資料讀取同步

先說MQ訊息同步,該同步方式我所在公司試用過一段時間,發現以下問題:

資料圍繞業務進行,對業務關鍵性資料操作傳送MQ訊息,對業務系統依賴性比較高

對於資料庫中存量資料需要單獨處理

對於工具表還需要單獨維護同步

每次新增資料表都需要重新新增MQ邏輯

考慮到以上問題,用MQ方式同步資料並不是最優解決辦法

使用binlog 資料讀取方式目前有一些成熟方案,比如tungsten replicator,但這些同步工具只能實現資料1:1複製,資料複製過程自定義邏輯新增比較麻煩,不支援分庫分表資料歸集操作。綜上所述,最優方案應該是讀取後binlog後自行處理後續資料邏輯。目前binlog讀取binlog工具中最成熟的方案應該就是alibaba開源的canal了。
canal

canal是阿里巴巴mysql資料庫binlog的增量訂閱&消費元件 。阿里雲DRDS、阿里巴巴TDDL 二級索引、小表複製. 都是基於canal做的,應用廣泛。
canal原理相對比較簡單:

canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議

mysql master收到dump請求,開始推送binary log給slave(也就是canal)

canal解析binary log物件(原始為byte流)

我使用的是canal的HA模式,由zookeeper選舉可用例項,每個資料庫一個instance,服務端配置如下:

目錄:

conf
database1
-instance.properties
database2
-instance.properties
canal.properties

instance.properties

canal.instance.mysql.slaveId = 1001
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .\..
canal.instance.filter.black.regex =

canal.properties

canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.detecting.enable = true
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.destinations= example,p4-test
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署資料流如下:

image

tip:
雖然canal同時支援mixed和row型別的binlog日誌,但是獲取行資料時如果是mixed型別的日誌則獲取不到表名,所以本方案暫只支援row格式的binlog
資料同步

建立canal client應用訂閱canal讀取的binlog資料

1.開啟多instance 訂閱,訂閱多個instance

public void initCanalStart() {
    List destinations = canalProperties.getDestination();
    final List canalClientList = new ArrayList<>();
    if (destinations != null && destinations.size() > 0) {
        for (String destination : destinations) {
            // 基於zookeeper動態獲取canal server的地址,建立連結,其中一臺server發生crash,可以支援failover
            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
            CanalClient client = new CanalClient(destination, connector);
            canalClientList.add(client);
            client.start();
        }
    }
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try {
                logger.info("## stop the canal client");
                for (CanalClient canalClient : canalClientList) {
                    canalClient.stop();
                }
            } catch (Throwable e) {
                logger.warn("##something goes wrong when stopping canal:", e);
            } finally {
                logger.info("## canal client is down.");
            }
        }
    });
}

訂閱訊息處理

private void process() {
    int batchSize = 5 * 1024;
    while (running) {
        try {
            MDC.put("destination", destination);
            connector.connect();
            connector.subscribe();
            while (running) {
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size > 0) {
                    saveEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交確認
                // connector.rollback(batchId); // 處理失敗, 回滾資料
            }
        } catch (Exception e) {
            logger.error("process error!", e);
        } finally {
            connector.disconnect();
            MDC.remove("destination");
        }
    }
}

根據資料庫事件處理訊息,過濾訊息列表,對資料變動進行處理,用到資訊為:

insert :schemaName,tableName,beforeColumnsList

update :schemaName,tableName,afterColumnsList

delete :schemaName,tableName,afterColumnsList

RowChange rowChage = null;
    try {
        rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    EventType eventType = rowChage.getEventType();
    logger.info(row_format,
            entry.getHeader().getLogfileName(),
            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
            entry.getHeader().getTableName(), eventType,
            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
        logger.info(" sql ----> " + rowChage.getSql());
        continue;
    }
    DataService dataService = SpringUtil.getBean(DataService.class);
    for (RowData rowData : rowChage.getRowDatasList()) {
        if (eventType == EventType.DELETE) {
            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.INSERT) {
            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else if (eventType == EventType.UPDATE) {
            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
        } else {
            logger.info("未知資料變動型別:{}", eventType);
        }
    }
}

ColumnsList轉換成MongoTemplate 可用的資料類:DBObject,順便做下資料型別轉換

public static DBObject columnToJson(List columns) {
    DBObject obj = new BasicDBObject();
    try {
        for (CanalEntry.Column column : columns) {
            String mysqlType = column.getMysqlType();
            //int型別,長度11以下為Integer,以上為long
            if (mysqlType.startsWith("int")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
                    if (length > 10) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));
            } else if (mysqlType.startsWith("bigint")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
            } else if (mysqlType.startsWith("decimal")) {
                int lenBegin = mysqlType.indexOf('(');
                int lenCenter = mysqlType.indexOf(',');
                int lenEnd = mysqlType.indexOf(')');
                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
                    if (length == 0) {
                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
                        continue;
                    }
                }
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("date")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
            } else if (mysqlType.equals("time")) {
                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
            } else {
                obj.put(column.getName(), column.getValue());
            }
        }
    } catch (ParseException e) {
        e.printStackTrace();
    }
    return obj;
}

tip:
DBObject物件如果同時用於儲存原始資料和組合資料或其他資料,使用時應該深度拷貝物件生成副本,然後使用副本
資料拼接

我們獲取了資料庫資料後做拼接操作,比如兩張使用者表:

user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}

拼接後mongo資料為:

user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的資料資訊很多,如何才能簡單的觸發資料拼接操作呢?

先看我們能獲取的資訊:schemaName,tableName,DBObject,Event(insert,update,delete)

將這些資訊標識拼接起來看看:/schemaName/tableName/Event(DBObject),沒錯,就是一個標準的restful連結。只要我們實現一個簡單的springMVC 就能自動獲取需要的資料資訊進行拼接操作。

先實現@Controller,定義名稱為Schema,value對應schemaName

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public  @interface Schema {
 String value() default "";
}

然後實現@RequestMapping,定義名稱為Table,直接使用Canal中的EventType 對應RequestMethod

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public  @interface Table {
    String value() default "";
    CanalEntry.EventType[] event() default {};
}

然後建立springUtil,實現介面ApplicationContextAware,應用啟動 載入的時候初始化兩個Map:intanceMap,handlerMap

private static ApplicationContext applicationContext = null;
//庫名和資料處理Bean對映Map
private static Map instanceMap = new HashMap();
//路勁和資料處理Method對映Map
private static Map handlerMap = new HashMap();
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
    if (SpringUtil.applicationContext == null) {
        SpringUtil.applicationContext = applicationContext;
        //初始化instanceMap資料
        instanceMap();
        //初始化handlerMap資料
        handlerMap();
    }
}
private void instanceMap() {
    Map beans = applicationContext.getBeansWithAnnotation(Schema.class);
    for (Object bean : beans.values()) {
        Class clazz = bean.getClass();
        Object instance = applicationContext.getBean(clazz);
        Schema schema = clazz.getAnnotation(Schema.class);
        String key = schema.value();
        instanceMap.put(key, instance);
        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
    }
}
private void handlerMap() {
    if (instanceMap.size() <= 0)
        return;
    for (Map.Entry entry : instanceMap.entrySet()) {
        if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) {
            Schema schema = entry.getValue().getClass().getAnnotation(Schema.class);
            String schemeName = schema.value();
            Method[] methods = entry.getValue().getClass().getMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(Table.class)) {
                    Table table = method.getAnnotation(Table.class);
                    String tName = table.value();
                    CanalEntry.EventType[] events = table.event();
                    //未標明資料事件型別的方法不做對映
                    if (events.length < 1) {
                        continue;
                    }
                    //同一個方法可以對映多張表
                    for (int i = 0; i < events.length; i++) {
                        String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber();
                        handlerMap.put(path, method);
                        logger.info("handlerMap [{}:{}]", path, method.getName());
                    }
                } else {
                    continue;
                }
            }
        } else {
            continue;
        }
    }
}

呼叫方法:

public static void doEvent(String path, DBObject obj) throws Exception {
    String[] pathArray = path.split("/");
    if (pathArray.length != 4) {
        logger.info("path 格式不正確:{}", path);
        return;
    }
    Method method = handlerMap.get(path);
    Object schema = instanceMap.get(pathArray[1]);
    //查詢不到對映Bean和Method不做處理
    if (method == null || schema == null) {
        return;
    }
    try {
        long begin = System.currentTimeMillis();
        logger.info("integrate data:{},{}", path, obj);
        method.invoke(schema, new Object[]{obj});
        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
    } catch (Exception e) {
        logger.error("呼叫組合邏輯異常", e);
        throw new Exception(e.getCause());
    }
}

資料拼接訊息處理:

@Schema("demo_user")
public class UserService {
    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
    public void saveUser_UserInfo(DBObject userInfo) {
        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
        DBCollection collection = completeMongoTemplate.getCollection("user");
        DBObject queryObject = new BasicDBObject("user_no", userNo);
        DBObject user = collection.findOne(queryObject);
        if (user == null) {
            user = new BasicDBObject();
            user.put("user_no", userNo);
            user.put("userInfo", userInfo);
            collection.insert(user);
        } else {
            DBObject updateObj = new BasicDBObject("userInfo", userInfo);
            DBObject update = new BasicDBObject("$set", updateObj);
            collection.update(queryObject, update);
        }
    }
}

示例原始碼