簡述

CloudCanal 近期實現了 MySQL(RDS) 到 ClickHouse 實時同步的能力,功能包含全量資料遷移、增量資料遷移、結構遷移能力,以及附帶的監控、告警、HA等能力(平臺自帶)。

ClickHouse 本身並不直接支援 Update 和 Delete 能力,但是他自帶的 MergeTree 系列表中 CollapsingMergeTreeVersionedCollapsingMergeTree 可變相實現實時增量的目的,並且效能完全夠用,能夠比較輕鬆達到 1k RPS 以上的能力。

接下來的文章,簡要介紹 CloudCanal 是如何實現這個能力,以及作為使用者我們怎麼比較好的使用這個能力。

技術點

結構遷移

CloudCanal 預設提供結構遷移,預設選擇 CollapsingMergeTree 作為表引擎,並增加一個預設欄位 __cc_ck_sign,源主鍵作為 sortKey,如下示例:

 CREATE TABLE console.worker_stats
(
`id` Int64,
`gmt_create` DateTime,
`worker_id` Int64,
`cpu_stat` String,
`mem_stat` String,
`disk_stat` String,
`__cc_ck_sign` Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(__cc_ck_sign)
ORDER BY id
SETTINGS index_granularity = 8192

ClickHouse 表引擎中,CollapsingMergeTree 和 VersionedCollapsingMergeTree 都能通過標記位按規則摺疊資料,從而達到更新和刪除的效果。VersionedCollapsingMergeTree 相比 CollapsingMergeTree 優勢在於同一條資料的不同變更可以亂序寫入,但是 CloudCanal 選擇 CollapsingMergeTree 主要原因在於2點

    1. CloudCanal 中同一條記錄必定是按源庫變更順序寫入,不存在亂序情況
    1. 不需要維護 VersionedCollapsingMergeTree 中的 Version 欄位(版本,也可以起其他名字)

所以 CloudCanal 選擇了 CollapsingMergeTree 作為預設表引擎。

寫資料

CloudCanal 寫資料主要包含全量和增量兩種,即單次搬遷存量資料和長期同步,兩者寫入略有不同。全量寫入對端主要工作是批量和多執行緒,因為 CloudCanal 結構遷移預設設定了標記位欄位 __cc_ck_sign default 值為 1, 所以就不需要做特殊處理。

對於增量, CloudCanal 則需要做 3 件事情。

  • 轉換 Update、Delete 操作為 Insert

    這一步有兩件事情要做,第一件是按照操作型別,填充標記欄位值,其中 Insert 和 Update 為 1 ,Delete 為 -1 ,第二件是將對應增量資料的前映象或者後鏡像填充到結果記錄中,以便後續 insert 寫入。
 for (CanalRowChange rowChange : rowChanges) {
switch (rowChange.getEventType()) {
case INSERT: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getAfterColumnsList().add(nonDeleteCol);
records.add(rowData.getAfterColumnsList());
} break;
}
case UPDATE: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getBeforeColumnsList().add(deleteCol);
records.add(rowData.getBeforeColumnsList()); rowData.getAfterColumnsList().add(nonDeleteCol);
records.add(rowData.getAfterColumnsList());
} break;
}
case DELETE: {
for (CanalRowData rowData : rowChange.getRowDatasList()) {
rowData.getBeforeColumnsList().add(deleteCol);
records.add(rowData.getBeforeColumnsList());
} break;
}
default:
throw new CanalException("not supported event type,eventType:" + rowChange.getEventType());
}
}
  • 按表歸組

    因為 IUD 操作已全部轉換為 Insert, 且為全映象(所有欄位都填充了值),所以可以按表歸組,然後批量寫入。即使單執行緒也能滿足大部分場景的同步效能要求。
protected Map<TableUnit, List<CanalRowChange>> groupByTable(IncrementMessage message) {
Map<TableUnit, List<CanalRowChange>> data = new HashMap<>();
for (ParsedEntry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntryType.ROWDATA) {
CanalRowChange rowChange = entry.getRowChange();
if (!rowChange.isDdl()) {
List<CanalRowChange> changes = data.computeIfAbsent(new TableUnit(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()), k -> new ArrayList<>());
changes.add(rowChange);
}
}
} return data;
}
  • 並行寫入

    將按表歸組的資料使用並行執行框架執行,具體不詳述。

舉個"栗子"

  • 新增資料來源

  • 建立任務,選擇資料來源和庫,並連線成功,點選下一步

  • 選擇資料同步,建議規格至少選擇 1 GB.目前 MySQL->ClickHouse 結構遷移自動過濾,所以選擇無效。點選下一步

  • 選擇表,預設 ClickHouse 上建立 CollapsingMergeTree 表引擎,並自動新增 __cc_ck_sign 摺疊標記欄位。點選下一步

  • 選擇欄位,點選下一步

  • 建立任務

  • 等待任務自動結構遷移、全量遷移、資料同步追上

  • 造點 Insert、Update、Delete 負載

  • 延遲追平狀態,停止負載

  • 檢查源端 MySQL 表資料,以其中一張表為例

  • 檢查對端 ClickHouse 表資料,不一致?!!

  • 手動優化下表,資料一致。雖然可以等待 ClickHouse 自動優化,但是如果需要直接得到準確結果,可手動優化(注意:手動優化可能導致資料庫機器壓力過大)

常見問題

我在ClickHouse上已經建立了表怎麼辦?

目前比較建議直接使用 CloudCanal 自動結構遷移的方式來建立任務。

如果已建表為 CollapsingMergeTree 表引擎,請將標記位欄位改成 __cc_ck_sign Int8 DEFAULT 1`,再建立任務(此時就不再自動結構遷移,而是使用已存在表)。

如果為其他表引擎,暫時不支援(主要是不支援增量能力,需要 CloudCanal 進一步探索)。

同步過去的資料什麼時候合併?

當 CloudCanal 同步資料到 ClickHouse 時,ClickHouse 並不會實時合併資料,也沒有一致性可言,所以一般情況是等待合併,或者直接手動合併(造成機器高負載、高IO),optimize table worker_stats FINAL

DDL 怎麼做?

目前 CloudCanal 還未支援到 ClickHouse 的 DDL 同步,產品實現上,目前是忽略的。所以如果做 DDL ,加欄位建議對端先加,再加源端,減欄位反之。

總結

本文簡要介紹了 CloudCanal 實現 MySQL(RDS) 到 ClickHouse 資料遷移同步的能力,具備一站式、資料實時特點,從技術點、例子、以及常見問題角度展開。文章如有錯誤,煩請大家勘誤,後續也歡迎大家試用,提供寶貴的意見和建議。

CloudCanal-免費好用的企業級資料同步工具,歡迎品鑑。

瞭解產品可以檢視官方網站: http://www.clougence.com

CloudCanal社群:https://www.askcug.com/