1. 程式人生 > >TiDB EcoSystem Tools 原理解讀(一):TiDB-Binlog 架構演進與實現原理

TiDB EcoSystem Tools 原理解讀(一):TiDB-Binlog 架構演進與實現原理

簡介

TiDB-Binlog 元件用於收集 TiDB 的 binlog,並提供實時備份和同步功能。該元件在功能上類似於 MySQL 的主從複製,MySQL 的主從複製依賴於記錄的 binlog 檔案,TiDB-Binlog 元件也是如此,主要的不同點是 TiDB 是分散式的,因此需要收集各個 TiDB 例項產生的 binlog,並按照事務提交的時間排序後才能同步到下游。如果你需要部署 TiDB 叢集的從庫,或者想訂閱 TiDB 資料的變更輸出到其他的系統中,TiDB-Binlog 則是必不可少的工具。

架構演進

TiDB-Binlog 這個元件已經發布了 2 年多時間,經歷過幾次架構演進,去年十月到現在大規模使用的是 Kafka 版本,架構圖如下:

TiDB-Binlog 架構演進

Kafka 版本的 TiDB-Binlog 主要包括兩個元件:

Pump:一個守護程序,在每個 TiDB 主機的後臺執行。其主要功能是實時記錄 TiDB 產生的 binlog 並順序寫入 Kafka 中。

Drainer: 從 Kafka 中收集 binlog,並按照 TiDB 中事務的提交順序轉化為指定資料庫相容的 SQL 語句或者指定格式的資料,最後同步到目的資料庫或者寫到順序檔案。

這個架構的工作原理為:

  • TiDB 需要與 Pump 繫結,即 TiDB 例項只能將它生成的 binlog 傳送到一個指定的 Pump 中;

  • Pump 將 binlog 先寫到本地檔案,再非同步地寫入到 Kafka;

  • Drainer 從 Kafka 中讀出 binlog,對 binlog 進行排序,對 binlog 解析後生成 SQL 或指定格式的資料再同步到下游。

根據使用者的反饋,以及我們自己做的一些測試,發現該版本主要存在一些問題。

首先,TiDB 的負載可能不均衡,部分 TiDB 業務較多,產生的 binlog 也比較多,對應的 Pump 的負載高,導致資料同步延遲高。

其次,依賴 Kafka 叢集,增加了運維成本;而且 TiDB 產生的單條 binlog 的大小可達 2G(例如批量刪除資料、批量寫入資料),需要配置 Kafka 的訊息大小相關設定,而 Kafka 並不太適合單條資料較大的場景。

最後,Drainer 需要讀取 Kafka 中的 binlog、對 binlog 進行排序、解析 binlog,同步資料到下游等工作,可以看出 Drainer 的工作較多,而且 Drainer 是一個單點,所以往往同步資料的瓶頸都在 Drainer。

以上這些問題我們很難在已有的框架下進行優化,因此我們對 TiDB-Binlog 進行了重構,最新版本的 TiDB-Binlog 的總體架構如下圖所示:

TiDB-Binlog 總體架構

新版本 TiDB-Binlog 不再使用 Kafka 儲存 binlog,仍然保留了 Pump 和 Drainer 兩個元件,但是對功能進行了調整:

  • Pump 用於實時記錄 TiDB 產生的 binlog,並將 binlog 按照事務的提交時間進行排序,再提供給 Drainer 進行消費。

  • Drainer 從各個 Pump 中收集 binlog 進行歸併,再將 binlog 轉化成 SQL 或者指定格式的資料,最終同步到下游。

該版本的主要優點為:

  1. 多個 Pump 形成一個叢集,可以水平擴容,各個 Pump 可以均勻地承擔業務的壓力。

  2. TiDB 通過內建的 Pump Client 將 binlog 分發到各個 Pump,即使有部分 Pump 出現故障也不影響 TiDB 的業務。

  3. Pump 內部實現了簡單的 kv 來儲存 binlog,方便對 binlog 資料的管理。

  4. 原來 Drainer 的 binlog 排序邏輯移到了 Pump 來做,而 Pump 是可擴充套件的,這樣就能提高整體的同步效能。

  5. Drainer 不再需要像原來一樣讀取一批 binlog 到記憶體裡進行堆排序,只需要依次讀取各個 Pump 的 binlog 進行歸併排序,這樣可以大大節省記憶體的使用,同時也更容易做記憶體控制。

由於該版本最大的特點是多個 Pump 組成了一個叢集(cluster),因此該版本命名為 cluster 版本。下面我們以最新的 cluster 版本的架構來介紹 TiDB-Binlog 的實現原理。

工作原理

binlog

首先我們先介紹一下 TiDB 中的 binlog,TiDB 的事務採用 2pc 演算法,一個成功的事務會寫兩條 binlog,包括一條 Prewrite binlog 和 一條 Commit binlog;如果事務失敗,會發一條 Rollback binlog。

binlog 的結構定義為:

// Binlog 記錄事務中所有的變更,可以用 Binlog 構建 SQL
message Binlog {
    // Binlog 的型別,包括 Prewrite、Commit、Rollback 等
    optional BinlogType  tp = 1 [(gogoproto.nullable) = false];
    
    // Prewrite, Commit 和 Rollback 型別的 binlog 的 start_ts,記錄事務開始的 ts
    optional int64  start_ts = 2 [(gogoproto.nullable) = false];
    
    // commit_ts 記錄事務結束的 ts,只記錄在 commit 型別的 binlog 中
    optional int64  commit_ts = 3 [(gogoproto.nullable) = false];
    
    // prewrite key 只記錄在 Prewrite 型別的 binlog 中,
    // 是一個事務的主鍵,用於查詢該事務是否提交
    optional bytes  prewrite_key = 4;
    
    // prewrite_value 記錄在 Prewrite 型別的 binlog 中,用於記錄每一行資料的改變
    optional bytes  prewrite_value = 5;
    
    // ddl_query 記錄 ddl 語句
    optional bytes  ddl_query = 6;
    
    // ddl_job_id 記錄 ddl 的 job id
    optional int64  ddl_job_id  = 7 [(gogoproto.nullable) = false];
}

binlog 及相關的資料結構定義見: binlog.proto

其中 start_ts 為事務開始時的 ts,commit_ts 為事務提交的 ts。ts 是由物理時間和邏輯時間轉化而成的,在 TiDB 中是唯一的,由 PD 來統一提供。在開始一個事務時,TiDB 會請求 PD,獲取一個 ts 作為事務的 start_ts,在事務提交時則再次請求 PD 獲取一個 ts 作為 commit_ts。 我們在 Pump 和 Drainer 中就是根據 binlog 的 commit_ts 來對 binlog 進行排序的。

TiDB 的 binlog 記錄為 row 模式,即儲存每一行資料的改變。資料的變化記錄在  prewrite_value 欄位中,該欄位的資料主要由序列化後的 TableMutation 結構的資料組成。TableMutation 的結構如下所示:

// TableMutation 儲存表中資料的變化
message TableMutation {
    // 表的 id,唯一標識一個表
    optional int64 table_id = 1 [(gogoproto.nullable) = false];
    
    // 儲存插入的每行資料
    repeated bytes inserted_rows = 2;
    
    // 儲存修改前和修改後的每行的資料
    repeated bytes updated_rows = 3;
    
    // 已廢棄
    repeated int64 deleted_ids = 4;
    
    // 已廢棄
    repeated bytes deleted_pks = 5;
    
    // 刪除行的資料
    repeated bytes deleted_rows  = 6;
    
    // 記錄資料變更的順序
    repeated MutationType sequence = 7;
}

下面以一個例子來說明 binlog 中是怎麼儲存資料的變化的。

例如 table 的結構為:

create table test (id int, name varchar(24), primary key id)

按照順序執行如下 SQL:

begin;
insert into test(id, name) values(1, "a");
insert into test(id, name) values(2, "b");
update test set name = "c" where id = 1;
update test set name = "d" where id = 2;
delete from test where id = 2;
insert into test(id, name) values(2, "c");
commit;

則生成的 TableMutation 的資料如下所示:

inserted_rows:
1, "a"
2, "b"
2, "c"
 
updated_rows:
1, "a", 1, "c"
2, "b", 2, "d"
 
deleted_rows:
2, "d"
 
sequence:
Insert, Insert, Update, Update, DeleteRow, Insert

可以從例子中看出,sequence 中儲存的資料變更型別的順序為執行 SQL 的順序,具體變更的資料內容則儲存到了相應的變數中。

Drainer 在把 binlog 資料同步到下游前,就需要把上面的這些資料還原成 SQL,再同步到下游。

另外需要說明的是,TiDB 在寫 binlog 時,會同時向 TiKV 發起寫資料請求和向 Pump 傳送 Prewrite binlog,如果 TiKV 和 Pump 其中一個請求失敗,則該事務失敗。當 Prewrite 成功後,TiDB 向 TiKV 發起 Commit 訊息,並非同步地向 Pump 傳送一條 Commit binlog。由於 TiDB 是同時向 TiKV 和 Pump 傳送請求的,所以只要保證 Pump 處理 Prewrite binlog 請求的時間小於等於 TiKV 執行 Prewrite 的時間,開啟 binlog 就不會對事務的延遲造成影響。

Pump Client

從上面的介紹中我們知道由多個 Pump 組成一個叢集,共同承擔寫 binlog 的請求,那麼就需要保證 TiDB 能夠將寫 binlog 的請求儘可能均勻地分發到各個 Pump,並且需要識別不可用的 Pump,及時獲取到新加入叢集中 Pump 資訊。這部分的工作是在 Pump Client 中實現的。

Pump Client 以包的形式整合在 TiDB 中,程式碼連結:pump_client

Pump Client 維護 Pump 叢集的資訊,Pump 的資訊主要來自於 PD 中儲存的 Pump 的狀態資訊,狀態資訊的定義如下(程式碼連結:Status):

type Status struct {
    // Pump/Drainer 例項的唯一標識
    NodeID string `json:"nodeId"`
    
    // Pump/Drainer 的服務地址
    Addr string `json:"host"`
    
    // Pump/Drainer 的狀態,值可以為 online、pausing、paused、closing、offline
    State string `json:"state"`
    
    // Pump/Drainer 是否 alive(目前沒有使用該欄位)
    IsAlive bool `json:"isAlive"`
    
    // Pump的分數,該分數是由節點的負載、磁碟使用率、儲存的資料量大小等因素計算得來的,
    // 這樣 Pump Client 可以根據分數來選取合適的 Pump 傳送 binlog(待實現)
    Score int64 `json:"score"`
    
    // Pump 的標籤,可以通過 label 對 TiDB 和 Pump 進行分組,
    // TiDB 只能將 binlog 傳送到相同 label 的 Pump(待實現)
    Label *Label `json:"label"`
    
    // Pump: 儲存的 binlog 的最大的 commit_ts
    // Drainer:已消費的 binlog 的最大的 commit_ts
    MaxCommitTS int64 `json:"maxCommitTS"`
    
    // 該狀態資訊的更新時間對應的 ts.
    UpdateTS int64 `json:"updateTS"`
}

Pump Client 根據 Pump 上報到 PD 的資訊以及寫 binlog 請求的實際情況將 Pump 劃分為可用 Pump 與不可用 Pump 兩個部分。

劃分的方法包括:

  • 初始化時從 PD 中獲取所有 Pump 的資訊,將狀態為 online 的 Pump 加入到可用 Pump 列表中,其他 Pump 加入到非可用列表中。

  • Pump 每隔固定的時間會發送心跳到 PD,並更新自己的狀態。Pump Client 監控 PD 中 Pump 上傳的狀態資訊,及時更新記憶體中維護的 Pump 資訊,如果狀態由非 online 轉換為 online 則將該 Pump 加入到可用 Pump 列表;反之加入到非可用列表中。

  • 在寫 binlog 到 Pump 時,如果該 Pump 在重試多次後仍然寫 binlog 失敗,則把該 Pump 加入到非可用 Pump 列表中。

  • 定時傳送探活請求(資料為空的 binlog 寫請求)到非可用 Pump 列表中的狀態為 online 的 Pump,如果返回成功,則把該 Pump 重新加入到可用 Pump 列表中。

通過上面的這些措施,Pump Client 就可以及時地更新所維護的 Pump 叢集資訊,保證將 binlog 傳送到可用的 Pump 中。

另外一個問題是,怎麼保證 Pump Client 可以將 binlog 寫請求均勻地分發到各個 Pump?我們目前提供了幾種路由策略:

  • range: 按照順序依次選取 Pump 傳送 binlog,即第一次選取第一個 Pump,第二次選取第二個 Pump...

  • hash:對 binlog 的 start_ts 進行 hash,然後選取 hash 值對應的 Pump。

  • score:根據 Pump 上報的分數按照加權平均演算法選取 Pump 傳送 binlog(待實現)。

需要注意的地方是,以上的策略只是針對 Prewrite binlog,對於 Commit binlog,Pump Client 會將它傳送到對應的 Prewrite binlog 所選擇的 Pump,這樣做是因為在 Pump 中需要將包含 Prewrite binlog 和 Commit binlog 的完整 binlog(即執行成功的事務的 binlog)提供給 Drainer,將 Commit binlog 傳送到其他 Pump 沒有意義。

Pump Client 向 Pump 提交寫 binlog 的請求介面為 pump.proto 中的 WriteBinlog,使用 grpc 傳送 binlog 請求。

Pump

Pump 主要用來承擔 binlog 的寫請求,維護 binlog 資料,並將有序的 binlog 提供給 Drainer。我們將 Pump 抽象成了一個簡單的 kv 資料庫,key 為 binlog 的 start _ts(Priwrite binlog) 或者 commit_ts(Commit binlog),value 為 binlog 的元資料,binlog 的資料則存在資料檔案中。Drainer 像查資料庫一樣的來獲取所需要的 binlog。

Pump 內建了 leveldb 用於儲存 binlog 的元資訊。在 Pump 收到 binlog 的寫請求時,會首先將 binlog 資料以 append 的形式寫到檔案中,然後將 binlog 的 ts、型別、資料長度、所儲存的檔案以及在檔案中的位置資訊儲存在 leveldb 中,如果為 Prewrite binlog,則以 start_ts作為 key;如果是 Commit binlog,則以 commit_ts 作為 key。

當 Drainer 向 Pump 請求獲取指定 ts 之後的 binlog 時,Pump 則查詢 leveldb 中大於該 ts 的 binlog 的元資料,如果當前資料為 Prewrite binlog,則必須找到對應的 Commit binlog;如果為 Commit binlog 則繼續向前推進。這裡有個問題,在 binlog 一節中提到,如果 TiKV 成功寫入了資料,並且 Pump 成功接收到了 Prewrite binlog,則該事務就提交成功了,那麼如果在 TiDB 傳送 Commit binlog 到 Pump 前發生了一些異常(例如 TiDB 異常退出,或者強制終止了 TiDB 程序),導致 Pump 沒有接收到 Commit binlog,那麼 Pump 中就會一直找不到某些 Prewrite binlog 對應的 Commit binlog。這裡我們在 Pump 中做了處理,如果某個 Prewrite binlog 超過了十分鐘都沒有找到對應的 Commit binlog,則通過 binlog 資料中的 prewrite_key 去查詢 TiKV 該事務是否提交,如果已經提交成功,則 TiKV 會返回該事務的 commit_ts;否則 Pump 就丟棄該條 Prewrite binlog。

binlog 元資料中提供了資料儲存的檔案和位置,可以通過這些資訊讀取 binlog 檔案的指定位置獲取到資料。因為 binlog 資料基本上是按順序寫入到檔案中的,因此我們只需要順序地讀 binlog 檔案即可,這樣就保證了不會因為頻繁地讀取檔案而影響 Pump 的效能。最終,Pump 以 commit_ts 為排序標準將 binlog 資料傳輸給 Drainer。Drainer 向 Pump 請求 binlog 資料的介面為 pump.proto 中的 PullBinlogs,以 grpc streaming 的形式傳輸 binlog 資料。

值得一提的是,Pump 中有一個 fake binlog 機制。Pump 會定時(預設三秒)向本地儲存中寫入一條資料為空的 binlog,在生成該 binlog 前,會向 PD 中獲取一個 ts,作為該 binlog 的 start_tscommit_ts,這種 binlog 我們叫作 fake binlog。這樣做的原因在 Drainer 中介紹。

Drainer

Drainer 從各個 Pump 中獲取 binlog,歸併後按照順序解析 binlog、生成 SQL 或者指定格式的資料,然後再同步到下游。

既然要從各個 Pump 獲取資料,Drainer 就需要維護 Pump 叢集的資訊,及時獲取到新增加的 Pump,並識別出不可用的 Pump,這部分功能與 Pump Client 類似,Drainer 也是通過 PD 中儲存的 Pump 的狀態資訊來維護 Pump 資訊。另外需要注意的是,如果新增加了一個 Pump,必須讓該 Pump 通知 Drainer 自己上線了,這麼做是為了保證不會丟資料。例如:

叢集中已經存在 Pump1 和 Pump2,Drainer 讀取 Pump1 和 Pump2 的資料並進行歸併:

Pump1 儲存的 binlog 為{ 1,3,5,7,9 },Pump2 儲存的 binlog 為{2,4,6,10}。Drainer 從兩個 Pump 獲取 binlog,假設當前已經讀取到了{1,2,3,4,5,6,7}這些 binlog,已處理的 binlog 的位置為 7。此時 Pump3 加入叢集,從 Pump3 上報自己的上線資訊到 PD,到 Drainer 從 PD 中獲取到 Pump3 資訊需要一定的時間,如果 Pump3 沒有通知 Drainer 就直接提供寫 binlog 服務,寫入了 binlog{8,12},Drainer 在此期間繼續讀取 Pump1 和 Pump2 的 binlog,假設讀取到了 9,之後才識別到了 Pump3 並將 Pump3 加入到歸併排序中,此時 Pump3 的 binlog 8 就丟失了。為了避免這種情況,需要讓 Pump3 通知 Drainer 自己已經上線,Drainer 收到通知後將 Pump3 加入到歸併排序,並返回成功給 Pump3,然後 Pump3 才能提供寫 binlog 的服務。

Drainer 通過如上所示的方式對 binlog 進行歸併排序,並推進同步的位置。那麼可能會存在這種情況:某個 Pump 由於一些特殊的原因一直沒有收到 binlog 資料,那麼 Drainer 中的歸併排序就無法繼續下去,正如我們用兩條腿走路,其中一隻腿不動就不能繼續前進。我們使用 Pump 一節中提到的 fake binlog 的機制來避免這種問題,Pump 每隔指定的時間就生成一條 fake binlog,即使某些 Pump 一直沒有資料寫入,也可以保證歸併排序正常向前推進。

Drainer 將所有 Pump 的資料按照 commit_ts 進行歸併排序後,將 binlog 資料傳遞給 Drainer 中的資料解析及同步模組。通過上面的 binlog 格式的介紹,我們可以看出 binlog 檔案中並沒有儲存表結構的資訊,因此需要在 Drainer 中維護所有庫和表的結構資訊。在啟動 Drainer 時,Drainer 會請求 TiKV,獲取到所有歷史的 DDL job 的資訊,對這些 DDL job 進行過濾,使用 Drainer 啟動時指定的 initial-commit-ts(或者 checkpoint 中儲存的 commit_ts)之前的 DDL 在記憶體中構建庫和表結構資訊。這樣 Drainer 就有了一份 ts 對應時間點的庫和表的快照,在讀取到 DDL 型別的 binlog 時,則更新庫和表的資訊;讀取到 DML 型別的 binlog 時,則根據庫和表的資訊來生成 SQL。

在生成 SQL 之後,就可以同步到下游了。為了提高 Drainer 同步的速度,Drainer 中使用多個協程來執行 SQL。在生成 SQL 時,我們會使用主鍵/唯一鍵的值作為該條 SQL 的 key,通過對 key 進行 hash 來將 SQL 傳送到對應的協程中。當每個協程收集到了足夠多的 SQL,或者超過了一定的時間,則將這一批的 SQL 在一個事務中提交到下游。

但是有些 SQL 是相關的,如果被分到了不同的協程,那 SQL 的執行順序就不能得到保證,造成資料的不一致。例如:

SQL1: delete from test.test where id = 1;

SQL2: replace into test.test (id, name ) values(1, "a");

按照順序執行後表中存在 id = 1 該行資料,如果這兩條 SQL 分別分配到了協程 1 和協程 2 中,並且協程 2 先執行了 SQL,則表中不再存在 id = 1 的資料。為了避免這種情況的發生,Drainer 中加入了衝突檢測的機制,如果檢測出來兩條 SQL 存在衝突(修改了同一行資料),則暫時不將後面的 SQL 傳送到協程,而是生成一個 Flush 型別的 job 傳送到所有的協程, 每個協程在遇到 Flush job 時就會馬上執行所快取的 SQL。接著才會把該條有衝突的 SQL 傳送到對應的協程中。下面給出一個例子說明一下衝突檢測的機制:

有以下這些 SQL,其中 id 為表的主鍵:

SQL1: update itest set id = 4, name = "c", age = 15 where id = 3;    key: 3, 4

SQL2:  update itest set id = 5, name = "b", age = 14 where id = 2;   key:5, 2

SQL3:delete from itest where id = 3;                                key: 3
  1. 首先將 SQL1 傳送到指定的協程,這時所有的 keys 為[3,4];

  2. SQL2 的 key[5,2]與 keys 中的[3,4]都沒有衝突,將 SQL2 傳送到指定的協程,這時 keys 為[3,4,5,2];

  3. SQL3 的 key[3]與 keys 中的[3]存在衝突,傳送 Flush job 到所有協程,SQL1 和 SQL2 被執行,清空 keys;

  4. 將 SQL3 傳送到指定的協程,同時更新 keys 為[3]。

Drainer 通過以上這些機制來高效地同步資料,並且保證資料的一致。