1. 程式人生 > >基於canal的實時資料同步

基於canal的實時資料同步

適用場景

使用canal做資料備份而不用mysql自帶的主從備份的場景主要為:

  1. 跨資料庫的資料備份,例如mysql => oracle
  2. 資料異構,即對同一份資料做不同的分庫分表查詢。例如賣家和買家各自分庫索引

maven

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.2</version> </dependency>

java

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.
client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import org.apache.commons.lang.StringUtils; public class SimpleCanalClient { public static void main(String[] args) throws Exception { String destination = "example"; CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), destination, "", ""); connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int batchSize = 5 * 1024; while (true) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { synchronizedData(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾資料 } } /** * 同步資料 * @param entries * @throws Exception */ private static void synchronizedData(List<Entry> entries) throws Exception { for (Entry entry : entries) { if (entry.getEntryType() != EntryType.ROWDATA) { continue; } RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); String tableName = entry.getHeader().getTableName(); for (RowData rowData : rowChange.getRowDatasList()) { String sql = getSql(rowChange.getEventType(),tableName,rowData); System.out.println(sql); // TODO 執行sql語句 } } } /** * 獲取增刪改的sql * @param eventType * @param tableName * @param rowData * @return */ private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){ String sql = null; switch (eventType) { case INSERT: sql = getInsertSql(tableName,rowData.getAfterColumnsList()); break; case UPDATE: sql = getUpdateSql(tableName,rowData.getAfterColumnsList()); break; case DELETE: sql = getDeleteSql(tableName,rowData.getBeforeColumnsList()); break; default: break; } return sql; } private static String getInsertSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String keys = ""; String values = ""; for(int i=0;i<columns.size();i++){ if(i != 0) { keys += ","; values += ","; } keys += columns.get(i).getName(); values += getValue(columns.get(i)); } String format = "INSERT INTO %s (%s) VALUES (%s)"; return String.format(format,tableName,keys,values); } private static String getUpdateSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String sets = ""; String where = ""; for(Column column : columns){ if(column.getIsKey()){ where = column.getName() + "=" + getValue(column); continue; } if(!StringUtils.isBlank(sets)) { sets += ","; } sets += column.getName() + "=" + getValue(column); } String format = "UPDATE %s SET %s WHERE %s"; return String.format(format,tableName,sets,where); } private static String getDeleteSql(String tableName,List<Column> columns){ if(columns.size() == 0 || StringUtils.isBlank(tableName)){ return null; } String where = ""; for(Column column : columns){ if(column.getIsKey()){ where = column.getName() + "=" + getValue(column); continue; } } String format = "DELETE FROM %s WHERE %s"; return String.format(format,tableName,where); } private static String getValue(Column column){ if(column.getIsNull()){ return "null"; } return String.format("'%s'",column.getValue()); } }

資料一致性

單機單點消費mysql的log-bin後直接更新到備份資料庫中,資料一致性沒有問題。但是如果變成分散式環境以及消費mysql的log-bin後將更新資料推到MQ中由多節點消費更新到多個備份資料庫中,則會出現資料更新時序和資料一致性的問題。

而以上程式碼在update sql中除了獲取值變化了的欄位,也反查資料庫獲取了未變化的欄位。因此每次update的sql實際上是該條記錄的全量資料。

通過在表中加上時間戳欄位作為記錄的版本號,用邏輯刪除取代物理刪除delete,修改以上程式碼的sql拼接,insert操作時忽略主鍵衝突、update操作時僅更新版本號(時間戳)舊的記錄,可以極大避免資料不一致的現象,也解決了MQ重複消費的問題。

`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

再通過定時任務,每天一次增量資料更新,每週一次全量資料更新,保證資料的最終一致性。