1. 程式人生 > >TiDB Binlog 原始碼閱讀系列文章(四)Pump server 介紹

TiDB Binlog 原始碼閱讀系列文章(四)Pump server 介紹

作者: satoru

上篇文章 中,我們介紹了 TiDB 如何通過 Pump client 將 binlog 發往 Pump,本文將繼續介紹 Pump server 的實現,對應的原始碼主要集中在 TiDB Binlog 倉庫的 pump/server.go 檔案中。

啟動 Pump Server

Server 的啟動主要由兩個函式實現:NewServer(*Server).Start

NewServer 依照傳入的配置項建立 Server 例項,初始化 Server 執行所必需的欄位,以下簡單說明部分重要欄位:

  1. metrics:一個 MetricClient,用於定時向 Prometheus Pushgateway 推送 metrics。

  2. clusterID:每個 TiDB 叢集都有一個 ID,連線到同一個 TiDB 叢集的服務可以通過這個 ID 識別其他服務是否屬於同個叢集。

  3. pdCliPD Client,用於註冊、發現服務,獲取 Timestamp Oracle。

  4. tiStore:用於連線 TiDB storage engine,在這裡主要用於查詢事務相關的資訊(可以通過 TiDB 中的對應 interface 描述 瞭解它的功能)。

  5. storage:Pump 的儲存實現,從 TiDB 發過來的 binlog 就是通過它儲存的,下一篇文章將會重點介紹。

Server 初始化以後,就可以用 (*Server).Start

啟動服務。為了避免丟失 binlog,在開始對外提供 binlog 寫入服務之前,它會將當前 Server 註冊到 PD 上,確保所有執行中的 Drainer 都已經觀察到新增的 Pump 節點。這一步除了啟動對外的服務,還開啟了一些 Pump 正常運作所必須的輔助機制,下文會有更詳細的介紹。

Pump Server API

Pump Server 通過 gRPC 暴露出一些服務,這些介面定義在 tipb/pump.pb.go,包含兩個介面 WriteBinlogPullBinlogs

WriteBinlog

顧名思義,這是用於寫入 binlog 的介面,上篇文章中 Pump client 呼叫的就是這個。客戶端傳入的請求,是以下的格式:

type WriteBinlogReq struct {
  // The identifier of tidb-cluster, which is given at tidb startup.
  // Must specify the clusterID for each binlog to write.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // Payload bytes can be decoded back to binlog struct by the protobuf.
  Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}

其中 Payload 是一個用 Protobuf 序列化的 binlog,WriteBinlog 的 主要流程 就是將請求中的 Payload 解析成 binlog 例項,然後呼叫 storage.WriteBinlog 儲存下來。storage.WriteBinlog 將 binlog 持久化儲存,並對 binlog 按 start TS / commit TS 進行排序,詳細的實現將在下章展開討論。

PullBinlogs

PullBinlogs 是為 Drainer 提供的介面,用於按順序獲取 binlog。這是一個 streaming 介面,客戶端請求後得到一個 stream,可以從中不斷讀取 binlog。請求的格式如下:

type PullBinlogReq struct {
  // Specifies which clusterID of binlog to pull.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // The position from which the binlog will be sent.
  StartFrom Pos `protobuf:"bytes,2,opt,name=startFrom" json:"startFrom"`
}

// Binlogs are stored in a number of sequential files in a directory.
// The Pos describes the position of a binlog.
type Pos struct {
  // The suffix of binlog file, like .000001 .000002
  Suffix uint64 `protobuf:"varint,1,opt,name=suffix,proto3" json:"suffix,omitempty"`
  // The binlog offset in a file.
  Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
}

從名字可以看出,這個請求指定了 Drainer 要從什麼時間點的 binlog 開始同步。雖然 Pos 中有 SuffixOffset 兩個欄位,目前只有 Offset 欄位是有效的,我們把它用作一個 commit TS,表示只拉取這個時間以後的 binlog。

PullBinlogs 的 主要流程,是呼叫 storage.PullCommitBinlogs 得到一個可以獲取序列化 binlog 的 channel,將這些 binlog 通過 stream.Send 介面逐個傳送給客戶端。

輔助機制

上文提到 Pump 的正常運作需要一些輔助機制,本節將逐一介紹這些機制。

fake binlog

《TiDB-Binlog 架構演進與實現原理》 一文中,對 fake binlog 機制有以下說明:

“Pump 會定時(預設三秒)向本地儲存中寫入一條資料為空的 binlog,在生成該 binlog 前,會向 PD 中獲取一個 tso,作為該 binlog 的 start_tscommit_ts,這種 binlog 我們叫作 fake binlog。

……Drainer 通過如上所示的方式對 binlog 進行歸併排序,並推進同步的位置。那麼可能會存在這種情況:某個 Pump 由於一些特殊的原因一直沒有收到 binlog 資料,那麼 Drainer 中的歸併排序就無法繼續下去,正如我們用兩條腿走路,其中一隻腿不動就不能繼續前進。我們使用 Pump 一節中提到的 fake binlog 的機制來避免這種問題,Pump 每隔指定的時間就生成一條 fake binlog,即使某些 Pump 一直沒有資料寫入,也可以保證歸併排序正常向前推進。”

genForwardBinlog 實現了這個機制,它裡面是一個定時迴圈,每隔一段時間(預設 3 秒,可通過 gen-binlog-interval 選項配置)檢查一下是否有新的 binlog 寫入,如果沒有,就呼叫 writeFakeBinlog 寫一條假的 binlog。

判斷是否有新的 binlog 寫入,是通過 lastWriteBinlogUnixNano 這個變數,每次有新的寫入都會 將這個變數設定為當前時間

垃圾回收

由於儲存容量限制,顯然 Pump 不能無限制地儲存收到的 binlog,因此需要有一個 GC (Garbage Collection) 機制來清理沒用的 binlog 釋放空間,gcBinlogFile 就負責 GC 的排程。有兩個值會影響 GC 的排程:

  1. gcInterval:控制 GC 檢查的週期,目前寫死在程式碼裡的設定是 1 小時

  2. gcDuration:binlog 的儲存時長,每次 GC 檢查就是 通過當前時間和 gcDuration 計算出 GC 時間點,在這個時間點之前的 binlog 將被 GC 在 gcBinlogFile 的迴圈中,用 select 監控著 3 種情況:

select {
case <-s.ctx.Done():
  log.Info("gcBinlogFile exit")
  return
case <-s.triggerGC:
  log.Info("trigger gc now")
case <-time.After(gcInterval):
}

3 個 case 分別對應:server 退出,外部觸發 GC,定時檢查這三種情況。其中 server 退出的情況我們直接退出迴圈。另外兩種情況都會繼續,計算 GC 時間點,交由 storage.GC 執行。

Heartbeat

心跳機制用於定時(預設兩秒)向 PD 傳送 Server 最新狀態,由 (*pumpNode).HeartBeat 實現。狀態是由 JSON 編碼的 Status 例項,主要記錄 NodeIDMaxCommitTS 之類的資訊。

HTTP API 實現

Pump Server 通過 HTTP 方式暴露出一些 API,主要提供運維相關的介面。

路徑Handler說明
GET /statusStatus返回所有 Pump 節點的狀態。
PUT /state/{nodeID}/{action}ApplyAction支援 pause 和 close 兩種 action,可以暫停和關閉 server。接到請求的 server 會確保使用者指定的 nodeID 跟自己的 nodeID 相匹配,以防誤操作。
GET /drainersAllDrainers返回通過當前 PD 服務可以發現的所有 Drainer 的狀態,一般用於除錯時確定 Pump 是否能如預期地發現 Drainer。
GET /debug/binlog/{ts}BinlogByTS通過指定的 timestamp 查詢 binlog,如果查詢結果是一條 Prewrite binlog,還會額外輸出 MVCC 相關的資訊。
POST /debug/gc/triggerTriggerGC手動觸發一次 GC,如果 GC 已經在執行中,請求將被忽略。

下線 Pump Server

下線一個 Pump server 的流程通常由 binlogctl 命令發起,例如:

bin/binlogctl -pd-urls=localhost:2379 -cmd offline-pump -node-id=My-Host:8240

binlogctl 先通過 nodeID 在 PD 發現的 Pump 節點中找到指定的節點,然後呼叫上一小節中提到的介面 PUT /state/{nodeID}/close

在 Server 端,ApplyAction 收到 close 後會將節點狀態置為 Closing(Heartbeat 程序會定時將這類狀態更新到 PD),然後另起一個 goroutine 呼叫 CloseClose 首先呼叫 cancel,通過 context 將關停訊號發往協作的 goroutine,這些 goroutine 主要就是上文提到的輔助機制執行的 goroutine,例如在 genForwardBinlog 中設計了在 context 被 cancel 時退出:

for {
  select {
  case <-s.ctx.Done():
     log.Info("genFakeBinlog exit")
     return

ClosewaitGroup 等待這些 goroutine 全部退出。這時 Pump 仍然能正常提供 PullBinlogs 服務,但是寫入功能 已經停止Close 下一行呼叫了 commitStatus,這時節點的狀態是 Closing,對應的分支呼叫了 waitSafeToOffline 來確保到目前為止寫入的 binlog 都已經被所有的 Drainer 讀到了。waitSafeToOffline 先往 storage 中寫入一條 fake binlog,由於此時寫入功能已經停止,可以確定這將是這個 Pump 最後的一條 binlog。之後就是在迴圈中定時檢查所有 Drainer 已經讀到的 Binlog 時間資訊,直到這個時間已經大於 fake binlog 的 CommitTS

waitSafeToOffline 等待結束後,就可以關停 gRPC 服務,釋放其他資源。

小結

本文介紹了 Pump server 的啟動、gRPC API 實現、輔助機制的設計以及下線服務的流程,希望能幫助大家在閱讀原始碼時有一個更清晰的思路。在上面的介紹中,我們多次提到 storage 這個實體,用來儲存和查詢 binlog 的邏輯主要封裝在這個模組內,這部分內容將在下篇文章為大家作詳細介紹。

原文閱讀https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-4/

相關推薦

no