微服務MySQL分庫分表資料到MongoDB同步方案[轉]
需求背景
近年來,微服務概念持續火熱,網路上針對微服務和單體架構的討論也是越來越多,面對日益增長的業務需求是,很多公司做技術架構升級時優先選用微服務方式。我所在公司也是選的這個方向來升級技術架構,以支撐更大訪問量和更方便的業務擴充套件。
發現問題
微服務拆分主要分兩種方式:拆分業務系統不拆分資料庫,拆分業務系統拆分庫。如果資料規模小的話大可不必拆分資料庫,因為拆分資料看必將面對多維度資料查詢,跨程序之間的事務等問題。而我所在公司隨著業務發展單資料庫例項已經不能滿足業務需要,所以選擇了拆分業務系統同時拆分資料庫的模式,所以也面臨著以上的問題。本文主要介紹多維度資料實時查詢解決方案。當前系統架構和儲存結構如下:
image
解決思路
要對多資料庫資料進行查詢,首先就需要將資料庫同步到一起以方便查詢
為了滿足大資料量資料需求,所以優先選擇NOSQL資料庫做同步庫
NOSQL資料庫基本無法進行關聯查詢,所以需要將關係資料進行拼接操作,轉換成非關係型資料
業務多維度查詢需要實時性,所以需要選擇NOSQL中實時性相對比較好的資料庫:MongoDB
根據以上思路,總結資料整合架構如下圖所示:
image
解決方案
目前網上一些資料同步案例分兩種:MQ訊息同步和binlog資料讀取同步
先說MQ訊息同步,該同步方式我所在公司試用過一段時間,發現以下問題:
資料圍繞業務進行,對業務關鍵性資料操作傳送MQ訊息,對業務系統依賴性比較高 對於資料庫中存量資料需要單獨處理 對於工具表還需要單獨維護同步 每次新增資料表都需要重新新增MQ邏輯
考慮到以上問題,用MQ方式同步資料並不是最優解決辦法
使用binlog 資料讀取方式目前有一些成熟方案,比如tungsten replicator,但這些同步工具只能實現資料1:1複製,資料複製過程自定義邏輯新增比較麻煩,不支援分庫分表資料歸集操作。綜上所述,最優方案應該是讀取後binlog後自行處理後續資料邏輯。目前binlog讀取binlog工具中最成熟的方案應該就是alibaba開源的canal了。
canal
canal是阿里巴巴mysql資料庫binlog的增量訂閱&消費元件 。阿里雲DRDS、阿里巴巴TDDL 二級索引、小表複製. 都是基於canal做的,應用廣泛。
canal原理相對比較簡單:
canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log物件(原始為byte流)
我使用的是canal的HA模式,由zookeeper選舉可用例項,每個資料庫一個instance,服務端配置如下:
目錄:
conf
database1
-instance.properties
database2
-instance.properties
canal.properties
instance.properties
canal.instance.mysql.slaveId = 1001
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .\..
canal.instance.filter.black.regex =
canal.properties
canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.detecting.enable = true
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.destinations= example,p4-test
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
部署資料流如下:
image
tip:
雖然canal同時支援mixed和row型別的binlog日誌,但是獲取行資料時如果是mixed型別的日誌則獲取不到表名,所以本方案暫只支援row格式的binlog
資料同步
建立canal client應用訂閱canal讀取的binlog資料
1.開啟多instance 訂閱,訂閱多個instance
public void initCanalStart() {
List destinations = canalProperties.getDestination();
final List canalClientList = new ArrayList<>();
if (destinations != null && destinations.size() > 0) {
for (String destination : destinations) {
// 基於zookeeper動態獲取canal server的地址,建立連結,其中一臺server發生crash,可以支援failover
CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
CanalClient client = new CanalClient(destination, connector);
canalClientList.add(client);
client.start();
}
}
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal client");
for (CanalClient canalClient : canalClientList) {
canalClient.stop();
}
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}
});
}
訂閱訊息處理
private void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
connector.connect();
connector.subscribe();
while (running) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size > 0) {
saveEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾資料
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
根據資料庫事件處理訊息,過濾訊息列表,對資料變動進行處理,用到資訊為:
insert :schemaName,tableName,beforeColumnsList
update :schemaName,tableName,afterColumnsList
delete :schemaName,tableName,afterColumnsList
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
logger.info(row_format,
entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql());
continue;
}
DataService dataService = SpringUtil.getBean(DataService.class);
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
} else if (eventType == EventType.INSERT) {
dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
} else if (eventType == EventType.UPDATE) {
dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
} else {
logger.info("未知資料變動型別:{}", eventType);
}
}
}
ColumnsList轉換成MongoTemplate 可用的資料類:DBObject,順便做下資料型別轉換
public static DBObject columnToJson(List columns) {
DBObject obj = new BasicDBObject();
try {
for (CanalEntry.Column column : columns) {
String mysqlType = column.getMysqlType();
//int型別,長度11以下為Integer,以上為long
if (mysqlType.startsWith("int")) {
int lenBegin = mysqlType.indexOf('(');
int lenEnd = mysqlType.indexOf(')');
if (lenBegin > 0 && lenEnd > 0) {
int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
if (length > 10) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
continue;
}
}
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));
} else if (mysqlType.startsWith("bigint")) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
} else if (mysqlType.startsWith("decimal")) {
int lenBegin = mysqlType.indexOf('(');
int lenCenter = mysqlType.indexOf(',');
int lenEnd = mysqlType.indexOf(')');
if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
if (length == 0) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
continue;
}
}
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));
} else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
} else if (mysqlType.equals("date")) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
} else if (mysqlType.equals("time")) {
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
} else {
obj.put(column.getName(), column.getValue());
}
}
} catch (ParseException e) {
e.printStackTrace();
}
return obj;
}
tip:
DBObject物件如果同時用於儲存原始資料和組合資料或其他資料,使用時應該深度拷貝物件生成副本,然後使用副本
資料拼接
我們獲取了資料庫資料後做拼接操作,比如兩張使用者表:
user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}
拼接後mongo資料為:
user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})
接收到的資料資訊很多,如何才能簡單的觸發資料拼接操作呢?
先看我們能獲取的資訊:schemaName,tableName,DBObject,Event(insert,update,delete)
將這些資訊標識拼接起來看看:/schemaName/tableName/Event(DBObject),沒錯,就是一個標準的restful連結。只要我們實現一個簡單的springMVC 就能自動獲取需要的資料資訊進行拼接操作。
先實現@Controller,定義名稱為Schema,value對應schemaName
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Schema {
String value() default "";
}
然後實現@RequestMapping,定義名稱為Table,直接使用Canal中的EventType 對應RequestMethod
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Table {
String value() default "";
CanalEntry.EventType[] event() default {};
}
然後建立springUtil,實現介面ApplicationContextAware,應用啟動 載入的時候初始化兩個Map:intanceMap,handlerMap
private static ApplicationContext applicationContext = null;
//庫名和資料處理Bean對映Map
private static Map instanceMap = new HashMap();
//路勁和資料處理Method對映Map
private static Map handlerMap = new HashMap();
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
//初始化instanceMap資料
instanceMap();
//初始化handlerMap資料
handlerMap();
}
}
private void instanceMap() {
Map beans = applicationContext.getBeansWithAnnotation(Schema.class);
for (Object bean : beans.values()) {
Class clazz = bean.getClass();
Object instance = applicationContext.getBean(clazz);
Schema schema = clazz.getAnnotation(Schema.class);
String key = schema.value();
instanceMap.put(key, instance);
logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
}
}
private void handlerMap() {
if (instanceMap.size() <= 0)
return;
for (Map.Entry entry : instanceMap.entrySet()) {
if (entry.getValue().getClass().isAnnotationPresent(Schema.class)) {
Schema schema = entry.getValue().getClass().getAnnotation(Schema.class);
String schemeName = schema.value();
Method[] methods = entry.getValue().getClass().getMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(Table.class)) {
Table table = method.getAnnotation(Table.class);
String tName = table.value();
CanalEntry.EventType[] events = table.event();
//未標明資料事件型別的方法不做對映
if (events.length < 1) {
continue;
}
//同一個方法可以對映多張表
for (int i = 0; i < events.length; i++) {
String path = "/" + schemeName + "/" + tName + "/" + events[i].getNumber();
handlerMap.put(path, method);
logger.info("handlerMap [{}:{}]", path, method.getName());
}
} else {
continue;
}
}
} else {
continue;
}
}
}
呼叫方法:
public static void doEvent(String path, DBObject obj) throws Exception {
String[] pathArray = path.split("/");
if (pathArray.length != 4) {
logger.info("path 格式不正確:{}", path);
return;
}
Method method = handlerMap.get(path);
Object schema = instanceMap.get(pathArray[1]);
//查詢不到對映Bean和Method不做處理
if (method == null || schema == null) {
return;
}
try {
long begin = System.currentTimeMillis();
logger.info("integrate data:{},{}", path, obj);
method.invoke(schema, new Object[]{obj});
logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
} catch (Exception e) {
logger.error("呼叫組合邏輯異常", e);
throw new Exception(e.getCause());
}
}
資料拼接訊息處理:
@Schema("demo_user")
public class UserService {
@Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
public void saveUser_UserInfo(DBObject userInfo) {
String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
DBCollection collection = completeMongoTemplate.getCollection("user");
DBObject queryObject = new BasicDBObject("user_no", userNo);
DBObject user = collection.findOne(queryObject);
if (user == null) {
user = new BasicDBObject();
user.put("user_no", userNo);
user.put("userInfo", userInfo);
collection.insert(user);
} else {
DBObject updateObj = new BasicDBObject("userInfo", userInfo);
DBObject update = new BasicDBObject("$set", updateObj);
collection.update(queryObject, update);
}
}
}
示例原始碼