.

作者:張學程

本文為 DM 原始碼閱讀系列文章的第九篇,在 上篇文章 中我們詳細介紹了 DM 對 online schema change 方案的同步支援,對 online schema change 同步方案以及實現細節等邏輯進行了分析。

在本篇文章中,我們將對 shard DDL 同步機制以及 checkpoint 機制等進行詳細的介紹,內容包括 shard group 的定義、shard DDL 的同步協調處理流程、checkpoint 機制以及與之相關的 safe mode 機制。

shard DDL 機制的實現

DM 中通過 庫表路由與列值轉換 功能,實現了對分庫分表合併場景下 DML 的同步支援。但當需要同步的各分表存在 DDL 變更時,還需要對 DDL 的同步進行更多額外的處理。有關分表合併時 shard DDL 同步需要處理的問題以及 DM 中的同步支援原理,請先閱讀 TiDB Ecosystem Tools 原理解讀系列(三)TiDB-DM 架構設計與實現原理

shard group

這篇文章 中,我們介紹了 DM 在處理 shard DDL 同步時引入了兩級 shard group 的概念,即用於執行分表合併同步任務的各 DM-worker 組成的 shard group、每個 DM-worker 內需要進行合表同步的各上游分表組成的 shard group。

DM-worker 組成的 shard group

由 DM-worker 組成的 shard group 是由叢集部署拓撲及同步任務配置決定的,即任務配置檔案中定義的需要進行合表同步的所有上游 MySQL 例項對應的所有 DM-worker 例項即組成了一個 shard group。為了表示同步過程中的相關動態資訊,DM-master 內部引入了兩個概念:

  • Lock:對於每組需要進行合併的表,其中每一條需要進行同步協調的 shard DDL,由一個 Lock 例項進行表示;每個 Lock 例項在有 shard DDL 需要協調同步時被建立、在協調同步完成後被銷燬;在 dmctl 中使用 show-ddl-locks 命令檢視到的每一個 Lock 資訊即對應一個該例項

  • LockKeeper:維護所有的 Lock 例項資訊並提供相關的操作介面

Lock 中各主要成員變數的作用如下:

成員變數作用
ID用於標識一個 lock,由同步任務名、合併後同步到的目標表對應的 schema 及 table 名構造得到
Task該 lock 所對應的同步任務名
Owner該 lock 的 owner 對應的 ID,即第一個向 DM-master 上報 shard DDL 資訊的 DM-worker 對應的 ID
remain尚未上報待同步 shard DDL 資訊的 DM-worker 數量
ready標識各 DM-worker 是否已上報過待同步 shard DDL 資訊
ddls該 lock 對應的需要進行協調同步到下游的 DDL statements(shard DDL 通過 TiDB parser 轉換後可能會被分拆為多條 DDL

DM-worker 內分表組成的 shard group

每個 DM-worker 內的 shard group 是由對應上游 MySQL 例項內分表及同步任務配置決定的,即任務配置檔案中定義的對應 MySQL 例項內需要進行合併同步到同一個下游目標表的所有分表組成一個 shard group。在 DM-worker 內部,我們維護了下面兩個物件:

  • ShardingGroup:對於每一組需要進行合併的表,由一個 ShardingGroup 例項進行表示;每個 ShardGroup 例項在同步任務啟動階段被建立,在任務停止時被銷燬

  • ShardingGroupKeeper:維護所有的 ShardingGroup 例項資訊並提供相關的操作介面

ShardingGroup 中各主要成員變數的作用如下:

成員變數作用
sourceID當前 DM-worker 對應於上游 MySQL 的 source-id
remain尚未收到對應 shard DDL 的分表數量
sources標識是否已收到各上游分表對應的 shard DDL 資訊
meta當前 shard group 內各分表收到的 DDL 相關資訊

shard DDL 同步流程

對於兩級 shard group,DM 內部在依次完成兩個級別的 相應的 shard DDL 同步協調。

  1. 對於 DM-worker 內由各分表組成的 shard group,其 shard DDL 的同步在對應 DM-worker 內部進行協調

  2. 對於由各 DM-worker 組成的 shard group,其 shard DDL 的同步由 DM-master 進行協調

DM-worker 間 shard DDL 協調流程

我們基於在 這篇文章 中展示過的僅包含兩個 DM-worker 的 shard DDL 協調流程示例(如下圖)來了解 DM 內部的具體實現。

  1. DM-worker-1 將 shard DDL 資訊傳送給 DM-master

    a. 當 DM-worker-1 內部 shard DDL 協調完成時,DM-worker-1 將對應的 shard DDL 資訊儲存在 channel 中供 DM-master 通過 gRPC 獲取

    b. DM-master 在 fetchWorkerDDLInfo 方法中以 gRPC streaming 的方式讀取到 DM-worker-1 的 shard DDL 資訊

    c. DM-master 呼叫 ShardingGroupKeeper 的 TrySync 方法建立對應的 lock 資訊並在 lock 中標記已收到 DM-worker-1 的 shard DDL 資訊

  2. DM-master 將 lock 資訊發回給 DM-worker-1

    a. DM-master 以 gRPC streaming 的方式將 lock 資訊傳送給 DM-worker-1

    b. DM-worker-1 將來自 DM-master 的 lock 資訊儲存在記憶體中用於在 DM-master 請求 DM-worker 執行/跳過 shard DDL 時進行驗證

  3. DM-worker-2 將 shard DDL 資訊傳送給 DM-master(流程與 step.1 一致)

  4. DM-master 將 lock 資訊發回給 DM-worker-2(流程與 step.2 一致)

  5. DM-master 協調 DM-worker-1 向下遊同步 shard DDL

    a. DM-master 根據 step.1 與 step.3 時收到的 shard DDL 資訊判定已經收到 shard group 內所有 DM-worker 的 shard DDL 資訊

    b. DM-master 在 resolveDDLLock 方法中向 DM-worker-1 傳送向下遊同步 shard DDL 的請求Exec 引數為 true

  6. DM-worker-1 向下遊同步 shard DDL

    a. DM-worker-1 接收到來自 DM-master 的向下遊執行 shard DDL 的請求

    b. DM-worker-1 構造 DDL job 並新增到 DDL 執行佇列中

    c. DM-worker-1 將 shard DDL 執行結果儲存在 channel 中供 DM-master 通過 gRPC 獲取

  7. DM-worker-2 忽略向下遊同步 shard DDL

    a. DM-master 獲取 DM-worker-1 向下遊同步 shard DDL 的結果判斷得知 DM-worker-1 同步 shard DDL 成功

    b. DM-master 向 DM-worker-2 傳送忽略向下遊同步 shard DDL 的請求Exec 引數為 false

    c. DM-worker-2 根據 DM-master 請求忽略向下遊同步 shard DDL

DM-worker 內 shard DDL 同步流程

我們基於在 實現原理文章 中展示過的一個 DM-worker 內僅包含兩個分表 (table_1,table_2) 的 shard DDL(僅一條 DDL)協調處理流程示例來了解 DM 內部的具體實現。

  1. DM-worker 收到 table_1 的 DDL

    a. 根據 DDL 及 binlog event position 等資訊更新對應的 shard group

    b. 確保 binlog replication 過程已進入 safe mode(後文介紹 checkpoint 機制時會再介紹 safe mode)

    c. 更新 table_1 的 checkpoint(後文會詳細介紹 checkpoint 機制)

  2. DM-worker 繼續解析後續的 binlog event

    根據 step.1 時返回的更新後的 shard group 資訊得知還未收到 shard group 內所有分表對應的 shard DDL,不向下游同步 shard DDL 並繼續後續解析

  3. 忽略 table_1 的 DML 並同步 table_2 的 DML

    由於 table_1 已收到 shard DDL 但 shard DDL 自身還未完成同步忽略對 table_1 相關 DML 的同步

  4. DM-worker 收到 table_2 的 DDL(流程與 step.1 一致)

  5. DM-worker 向下遊同步 shard DDL

    a. 根據 step.4 時返回的更新後的 shard group 資訊得知已經收到 shard group 內所有分表對應的 shard DDL

    b. 嘗試讓 binlog replication 過程退出 safe mode

    c. 將當前 shard DDL 同步完成後 re-sync 時重新同步 step.3 忽略的 DML 所需的相關資訊儲存在 channel 中

    d. 等待已分發的所有 DML 同步完成(確保等待併發同步的 DML 都同步到下游後再對下游 schema 進行變更)

    e. 將 shard DDL 相關資訊儲存在 channel 中以進行 DM-worker 間的同步(見前文 DM-worker 間 shard DDL 協調流程

    f. 待 DM-worker 間協調完成後,向下遊同步 shard DDL

  6. 將 binlog 的解析位置重定向回 step.1 對應 DDL 後的 binlog event position 進入 re-sync 階段

    根據 step.5 中儲存的資訊將 binlog 的解析位置重定向回 step.1 對應的 DDL 後的 binlog event position

  7. 重新解析 binlog event

  8. 對於不同表的 DML 做不同的處理

    a. 對於 table_1 在 step.3 時忽略的 DML,解析後向下游同步

    b. 對於 table_2 的 DML,根據 checkpoint 資訊忽略向下遊同步

  9. 解析到達 step.4 時 DDL 對應的 binlog position,re-sync 階段完成

    a. 解析 binlog position 到達 step.4 的 DDL

    b. 結束 re-sync 過程

  10. 繼續進行後續的 DDL 與 DML 的同步

需要注意的是,在上述 step.1 與 step.4 之間,如果有收到 table_1 的其他 DDL,則對於該 shard group,需要協調同步由一組 shard DDL 組成的 ShardingSequence。當在 step.9 對其中某一條 shard DDL 同步完成後,如果有更多的未同步的 shard DDL 需要協調處理,則會重定向到待處理的下一條 shard DDL 對應的位置重新開始解析 binlog event

checkpoint 機制的實現

DM 中通過 checkpoint 機制來實現同步任務中斷後恢復時的續傳功能。對於 load 階段,其 checkpoint 機制的實現在 DM 原始碼閱讀系列文章(四)dump/load 全量同步的實現 文章中我們已經進行了介紹,本文不再贅述。在本文中,我們將介紹 binlog replication 增量同步階段的 checkpoint 機制的實現及與之相關的 safe mode 機制的實現。

checkpoint 機制

DM 在 binlog replication 階段以 binlog event 對應的 position 為 checkpoint,包括兩類:

  1. 全域性 checkpiont:對應已成功解析並同步到下游的 binlog event 的 position,同步任務中斷恢復後將從該位置重新進行解析與同步

  2. 每個需要同步 table 的 checkpoint:對應該 table 已成功解析並同步到下游的 binlog event 的 position,主要用於在 re-sync 過程中避免對已同步的資料進行重複同步

DM 的 checkpoint 資訊儲存在下游資料庫中,通過 RemoteCheckPoint 物件進行讀寫,其主要成員變數包括:

  • globalPoint:用於儲存全域性 checkpoint

  • points:用於儲存各 table 的 checkpoint

checkpoint 資訊在下游資料庫中對應的 schema 通過 createTable 方法進行建立,其中各主要欄位的含義為:

欄位含義
id標識待同步資料對應的上游資料來源,當前該欄位值對應為 source-id
cp_schemacheckpoint 資訊所屬 table 對應的 schema 名稱,對於全域性 checkpoint 該欄位值為空字串
cp_tablecheckpoint 資訊所屬 table 的名稱,對於全域性 checkpoint 該欄位值為空字串
binlog_namecheckpoint 資訊的 binlog filename
binlog_poscheckpoint 資訊的 binlog event position
is_global標識該條 checkpoint 資訊是否是全域性 checkpoint

對於全域性 checkpoint,在以下情況下會更新記憶體中的資訊:

對於各 table checkpoint,在以下情況下會更新記憶體中的資訊:

對於全域性與 table 的 checkpoint,會在以下情況下 flush 到下游資料庫中:

值得注意的是,在 shard DDL 未同步到下游之前,為確保中斷恢復後仍能繼續整個 shard DDL 的協調過程,DM 不會將全域性 checkpoint 更新為比 shard DDL 起始 position 更大的 positionDM 也不會將 shard DDL 協調過程中對應 table 的 checkpoint flush 到下游

safe mode 機制

當同步任務中斷恢復後,DM 在 binlog replication 階段通過 checkpoint 機制保證了重新開始同步的起始點前的資料都已經成功同步到了下游資料庫中,即保證了 at-least-once 語義。但由於 flush checkpoint 與同步 DDL、DML 到下游不是在同一個事務中完成的,因此從 checkpoint 開始重新同步時,可能存在部分資料被重複同步的可能,即不能保證 at-most-once 。

在 DM 的 binlog replication 階段,通過增加 safe mode 機制確保了重複同步資料時的可重入,即:

目前,safe mode 會在以下情況時啟用:

小結

本篇文章詳細地介紹了 shard DDL 機制與 checkpoint 機制的實現,內容包括了兩級 shard group 的定義與 DM-worker 間及 DM-worker 內的 shard DDL 同步協調處理流程、checkpoint 機制及與之相關的 safe mode 機制。下一篇文章中,我們將介紹用於保證 DM 正確性與穩定性的測試框架的實現,敬請期待。

相關文章