1. 程式人生 > >Redis 主從複製(Replication)

Redis 主從複製(Replication)

為了保證服務的可用性,現代資料庫都提供了複製功能,同時在多個程序中維護一致的資料狀態。 Redis 支援一主多從的複製架構,該功能被簡化成了一條 `SLAVEOF` 命令,下面通過條命令來解析 Redis 的主從複製機制。 # 通過 tcpdump 觀察 在本機上通過 redis-server 啟動兩個服務,然後通過 tcpdump 觀察主從間的互動情況: ```bash redis-server --port 6379 --requirepass 123456 # 啟動 master redis-server --port 6380 --masterauth 123456 # 啟動 slave tcpdump -t -i lo0 host localhost and port 6379 | awk -F ']' '{print $1"]"$3}' # 在 localhost:6380 上執行 SLAVEOF localhost 6379 建立同步連線,進入 Full-ReSync 階段 localhost.59297 > localhost.6379: Flags [S] localhost.6379 > localhost.59297: Flags [S.] localhost.59297 > localhost.6379: Flags [P.] "PING" localhost.6379 > localhost.59297: Flags [P.] "NOAUTH Authentication required." localhost.59297 > localhost.6379: Flags [P.] "AUTH 123456" localhost.6379 > localhost.59297: Flags [P.] "OK" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF listening-port 6380" localhost.6379 > localhost.59297: Flags [P.] "OK": localhost.59297 > localhost.6379: Flags [P.] "REPLCONF capa eof" localhost.6379 > localhost.59297: Flags [P.] "OK": localhost.59297 > localhost.6379: Flags [P.] "PSYNC ? -1" localhost.6379 > localhost.59297: Flags [P.] "FULLRESYNC 8efb6ca4edf1258c05a5ced43b0c73fe4deb1908 1" localhost.6379 > localhost.59297: Flags [P.] [|RESP: localhost.6379 > localhost.59297: Flags [P.] "REDIS0007M-z^Iredis-ver^F3.2.11M-z" [|RESP # 完成 Full-ReSync 後進入 Propagation 階段 localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "1" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "1" localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "15" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "15" localhost.6379 > localhost.59297: Flags [P.] "SELECT" "0" "SET" "KEY" "VALUE" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "85" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "85" localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY2" "VALUE2" localhost.6379 > localhost.59297: Flags [P.] "MSET" "KEY3" "VALUE3" "KEY4" "VALUE4" "KEY5" "VALUE5" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "256" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "256" localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "270" localhost.59297 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "270" # 在 localhost:6380 上執行 DEBUG SLEEP 60 模擬網路中斷的情況 localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY6" "VALUE6" localhost.6379 > localhost.59297: Flags [P.] "SET" "KEY7" "VALUE7" localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.6379 > localhost.59297: Flags [P.] "MSET" "KEY8" "VALUE8" "KEY9" "VALUE9" localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.6379 > localhost.59297: Flags [P.] "PING" localhost.59297 > localhost.6379: Flags [.] localhost.59297 > localhost.6379: Flags [R.] # 舊的同步連線斷開後重新建立同步連線,進入 Partical-ReSync 階段 localhost.59313 > localhost.6379: Flags [S] localhost.6379 > localhost.59313: Flags [S.] localhost.59313 > localhost.6379: Flags [P.] "PING" localhost.6379 > localhost.59313: Flags [P.] "NOAUTH Authentication required." localhost.59313 > localhost.6379: Flags [P.] "AUTH 123456" localhost.6379 > localhost.59313: Flags [P.] "OK" localhost.59313 > localhost.6379: Flags [P.] "REPLCONF listening-port 6380" localhost.6379 > localhost.59313: Flags [P.] "OK" localhost.59313 > localhost.6379: Flags [P.] "REPLCONF capa eof" localhost.6379 > localhost.59313: Flags [P.] "OK" localhost.59313 > localhost.6379: Flags [P.] "PSYNC 8efb6ca4edf1258c05a5ced43b0c73fe4deb1908 271" localhost.6379 > localhost.59313: Flags [P.] "CONTINUE" localhost.6379 > localhost.59313: Flags [P.] "PING" "PING" "SET" "KEY6" "VALUE6" "PING" "SET" "KEY7" "VALUE7" "PING" "MSET" "KEY8" "VALUE8" "KEY9" "VALUE9" "PING" "PING" localhost.59313 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "519" localhost.59313 > localhost.6379: Flags [P.] "REPLCONF" "ACK" "519" localhost.6379 > localhost.59313: Flags [P.] "PING" localhost.59313 > localhost.6379: Flags [P.]: "REPLCONF" "ACK" "533" localhost.59313 > localhost.6379: Flags [P.]: "REPLCONF" "ACK" "533" ``` 整個過程可以分為 `Full-ReSync`, `Command-Propagate`, `Partical-ReSync` 總共 3 階段: ```text +----------------------+ +---------------------+ | redisServer (master) | | redisServer (slave) | | localhost:6379 | | localhost:6380 | +----------------------+ +---------------------+ | slaves | | master | +----------------------+ +---------------------+ | | +----------------+ +-------------+ | redisClient[?] | | redisClient | +----------------+ +-------------+ | ^ <<<<<<<<<<<<<<<<<< PING <<<<<<<<<<<<<<<<< | | Step 1 : 檢查套接字與 master 狀態 | >>>>>>>>>>>>> PONG / NOAUTH >>>>>>>>>>>>> | | | <<<<<<<<<<<<<<<<<< AUTH <<<<<<<<<<<<<<<<< | | Step 2 : 身份驗證 | >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>> | | | <<<< REPLCONF listening-port [port] <<<<< | | Step 3 : 傳送 slave 埠 Full-ReSync >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>> | | | <<<<<< REPLCONF capa [eof|psync2] <<<<<<< | | Step 4 : 檢查命令相容性 | >>>>>>>>>>>>>>>>>>> OK >>>>>>>>>>>>>>>>>> | | | <<<<<<<<<<<<<< PSYNC ? -1 <<<<<<<<<<<<<<< | | | >>>>>> FULLRESYNC [replid] [offset] >>>>> Step 6 : 執行全量同步 | V | BGSAVE | V v >>>>>>>>>>>>> RDB Snapshot >>>>>>>>>>>>>> ^ <<<<<<<<< REPLCONF ACK [offset] <<<<<<<<< | >>>>>>>>>>>>>>> COMMAND 1 >>>>>>>>>>>>>>> | >>>>>>>>>>>>>>> COMMAND 2 >>>>>>>>>>>>>>> | <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<< 心跳檢測 & 命令傳播 Command-Propagate >>>>>>>>>>>>>>>>>> PING >>>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>> COMMAND 3 >>>>>>>>>>>>>>> | <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<< | <<<<<<<< REPLCONF ACK [offset+?] <<<<<<<< v >>>>>>>>>>>>>>>>>> PING >>>>>>>>>>>>>>>>> ^ ========================================= | ====== The Same With Full-ReSync ======== | ========================================= | | Partical-ReSync <<<<<<<< PSYNC [replid] [offset] <<<<<<<< 重連後執行部分同步 | | | >>>>>>>>>>>>>>> CONTINUE >>>>>>>>>>>>>>>> | >>>>>>>>>>>>>>> COMMAND N >>>>>>>>>>>>>>> v >>>>>>>>>>>>>>> COMMAND ... >>>>>>>>>>>>> ``` # PSYNC 命令 最初 Redis 用於同步的命令是`SYNC`,每次重連執行該命令時都會生成、傳輸、載入整個完整的 RDB 快照,嚴重佔用機器資源與網路頻寬。為了解決這一問題,後續版本的 Redis 追加了`PSYNC`命令,該命令支援以下兩種同步模式: - **全量重新同步`Full-ReSync`** - slave 首次連線 master - master 與 slave 之間的狀態差異過大 - **部分重新同步`Partical-ReSync`** - 網路抖動導致同步連線斷開重連 - sentinel 機制導致 master 節點發生變更 ## 資料結構 下面看看 redisServer 中與`PSYNC`相關的資料結構: ```c struct redisServer { /* * 節點ID 與 複製偏移量 * * 若當前節點是 master * server.replid 就是 server.runid * * 若當前節點原本是 master,轉化為 slave 節點後 * server.replid 與 server.master_repl_offset 會被新 master 的同步資訊覆蓋 * * 若當前節點原本是 slave,被提升為 master 節點後 * rserver.eplid2 與 server.second_replid_offset 會記錄當前節點作為 slave 時的同步資訊 */ char runid[CONFIG_RUN_ID_SIZE+1]; /* 當前節點的執行時ID(每次重啟都會發生變化) */ char replid[CONFIG_RUN_ID_SIZE+1]; /* 當前 master 節點的 runid */ char replid2[CONFIG_RUN_ID_SIZE+1]; /* 當前 master 節點作為 slave 節點時連線的 master 的 runid */ long long master_repl_offset; /* 當前 master 節點的複製偏移量 */ long long second_replid_offset; /* 當前 master 節點作為 slave 節點時的同步偏移量 */ /* * 複製積壓緩衝 * * master 只維護一個全域性的 server.repl_backlog,由所有 slave 節點共享 * 為了減少記憶體佔用,server.repl_backlog 僅在 slave 節點存在時按需建立 */ char *repl_backlog; /* 複製積壓緩衝(環形緩衝)*/ long long repl_backlog_size; /* 積壓緩衝大小 */ long long repl_backlog_histlen; /* 積壓資料長度 */ long long repl_backlog_idx; /* 積壓緩衝尾部(可寫位置)*/ long long repl_backlog_off; /* 積壓緩衝首位元組對應的同步偏移量(master offset)*/ } ``` #### **執行ID**  無論主從,每個 Redis 伺服器會在啟動時生成一個長度為 40 的十六進位制字串作為執行ID`runid`: - 當 slave 首次請求同步時,會將 master 返回的`server.runid`儲存至`server.replid` - 當 slave 重新請求同步時,會將之前儲存的`server.replid`傳送給 master: - 如果該 ID 與當前 master 的`server.runid`不一致,則必須執行一次全量重新同步 - 如果該 ID 與當前 master 的`server.runid`一致,則可以嘗試執行部分同步操作 #### **複製偏移量**  主從雙方都會維護一個單位為位元組的複製偏移量`offset`,通過該偏移量可以判斷主從間的狀態是否一致: - master 向 slave 傳播 N 位元組資料後,會將自己的複製偏移量增加 N - slave 接收到 master 傳來的 N 個位元組資料時,會將自己的複製偏移量增加 N  當 master 接收到 `REPLCONF ACK` 中的偏移量時,可以據此判斷髮送給 slave 的資料是否發生了丟失,並重發丟失的資料。 #### **積壓緩衝**  master 端維護了一個定長的積壓緩衝佇列`backlog`。  master 向 slave 傳播命令時會同時將命令放入該佇列,因此緩衝區裡會保留一部分最新的命令。  slave 發出同步請求時,如果 slave 的偏移量之後 **(offset+1)** 的資料存在於積壓緩衝,master 才會執行部分同步。 # 同步流程 ## SLAVE 視角 slave 接收到`SLAVEOF`命令後,會呼叫`replicaofCommand`開始執行主從同步: ```c void replicaofCommand(client *c) { // ... if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { // 如果接收到的命令是 SLAVE NO ONE 則斷開主從同步 // ... } } else { if (c->flags & CLIENT_SLAVE) { return; // 如果已經是客戶端是一個 slave 節點,則拒絕該命令 } if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { return; // 如果已經連線上 SLAVEOF 中指定的 master 節點,則直接返回 } // 如果尚未連線任意 master 節點,則根據 masterhost 與 masterport 建立 TCP 連線 // 並註冊監聽函式 syncWithMaster } } void syncWithMaster(connection *conn) { // 向 master 節點發送 PING 命令 if (server.repl_state == REPL_STATE_CONNECTING) { server.repl_state = REPL_STATE_RECEIVE_PONG; err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); // 傳送 PING 命令 // ... } // 監聽到 master 對 PING 命令的響應 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-NOPERM",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { goto error; } server.repl_state = REPL_STATE_SEND_AUTH; // 只處理 master 響應值為 PONG、NOAUTH、NOPERM 的情況 } // 根據 master 對 PING 的響應值,判斷是否需要授權 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masteruser && server.masterauth) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH", server.masteruser,server.masterauth,NULL); // 傳送 AUTH 命令 // ... server.repl_state = REPL_STATE_RECEIVE_AUTH; } else { // 如果沒有設定 server.masteruser 與 server.masterauth 授權資訊,則跳過 AUTH server.repl_state = REPL_STATE_SEND_PORT; } } // 此處略過以下步驟: // 使用 REPLCONF listening-port 命令將 slave 的埠告知 master // 使用 REPLCONF ip-address 命令將 slave 的 IP 告知 master // 使用 REPLCONF capa eof / capa psync2 命令將 slave 相容性(支援的特性)告知 master // 開始傳送 PSYNC 命令 if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; } // 讀取 PSYNC 命令的響應 psync_result = slaveTryPartialResynchronization(conn,1); // 如果監聽到 CONTINUE 響應,跳過全量同步 if (psync_result == PSYNC_CONTINUE) return; // 如果返回值為 PSYNC_FULLRESYNC 或 PSYNC_NOT_SUPPORTED // 開始執行執行全量同步,註冊 readSyncBulkPayload 監聽 RDB 檔案下載 if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { // ... goto error; } server.repl_state = REPL_STATE_TRANSFER; // ... } int slaveTryPartialResynchronization(connection *conn, int read_reply) { if (!read_reply) { if (server.cached_master) { // server.cached_master 中存在記錄,嘗試執行部分同步 psync_replid = server.cached_master->replid; } else { psync_replid = "?"; // server.cached_master 中不存在記錄,只能執行全量同步 } // 發起 PSYNC 命令 reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL); // ... return PSYNC_WAIT_REPLY; } reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); // 讀取 PSYNC 響應 // 如果 master 響應 FULLRESYNC 則直接進行全量同步 if (!strncmp(reply,"+FULLRESYNC",11)) { // ... return PSYNC_FULLRESYNC; } // 如果 master 響應 CONTINUE 則嘗試執行部分同步 if (!strncmp(reply,"+CONTINUE",9)) { // ... return PSYNC_CONTINUE; } // master 暫時無法處理 PSYNC 命令 —> PSYNC_TRY_LATER // master 不支援 PSYNC 命令 -> PSYNC_NOT_SUPPORTED } ``` ## MASTER 視角 master 接收到`PSYNC`命令後,會呼叫`syncCommand`開啟同步流程: ```c void syncCommand(client *c) { // 接收到 slave 傳送的 PSYNC 命令 if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { return; // 無需全量同步,直接返回 } } // 若程式碼執行至此處,意味著部分同步失敗,需要執行全量同步 // master 會執行 BGSAVE 命令生成快照並傳輸給 slave // 同步 RDB 快照的方式有兩種: // 基於磁碟(Disk-backed):在磁碟生成 RDB 快照檔案,然後再傳輸給 slave // 無盤(Diskless):直接將 RDB 快照資料寫入 slave socket } int masterTryPartialResynchronization(client *c) { long long psync_offset; // 該 slave 最新的同步偏移量 char *master_replid; // slave 同步偏移量對應的 master 的 runid /* * 以下情況可以避免全量同步: * 1. slave 最近一次同步的 master 是當前例項(網路抖動) * 2. slave 與當前節點原本是同個 master 的從節點,且當前節點的同步偏移量 second_replid_offset 較大(維護重啟、故障切換)*/ if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) ||psync_offset > server.second_replid_offset)) { goto need_full_resync; // 不滿足 PSYNC 條件,需要執行全量同步 } /* * 以下情況只能執行全量同步: * 1. master 沒有初始化積壓緩衝 * 2. slave 的同步偏移量落後於積壓緩衝 */ if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { goto need_full_resync; // 進行全量同步 } // 若程式碼執行至此處,意味著可以執行部分同步 listAddNodeTail(server.slaves,c); // 根據客戶端是否相容 PSYNC2,返回不同的 CONTINUE 響應 if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } // CONTINUE 命令後面,緊接著就是 server.repl_backlog 的內容 psync_len = addReplyReplicationBacklog(c,psync_offset); // ... } ``` # 心跳 & 命令傳播 Redis 每秒會執行一次定時任務`replicationCron`,其中就包含主從同步間的心跳,可以發現主從雙方的心跳頻率是不一致的: ```c void replicationCron(void) { // slave 定時向 master 傳送 REPLCONF ACK 命令 if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC)) { addReplyArrayLen(c,3); addReplyBulkCString(c,"REPLCONF"); addReplyBulkCString(c,"ACK"); addReplyBulkLongLong(c,c->reploff); } // master 定時向 slave 傳送 PING 命令 if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && listLength(server.slaves)) { robj *ping_argv[1]; ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); decrRefCount(ping_argv[0]); } } ``` master 在呼叫`call`函式執行客戶端傳過來的命令時,會將命令傳播給 slave 並同時寫入積壓緩衝: ```c void call(client *c, int flags) { // ... if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) { // 當前命令是否需要傳播 if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); } } void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { // ... if (flags & PROPAGATE_REPL) replicationFeedSlaves(server.slaves,dbid,argv,argc); } void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { // 如果當前節點沒有 slave 節點或複製積壓緩衝,立即返回 if (server.repl_backlog == NULL && listLength(slaves) == 0) return; // 向 repl_backlog 中批量寫入命令 if (server.repl_backlog) { char aux[LONG_STR_SIZE+3]; // 命令緩衝,用於序列化 redis 命令 /* 寫入當前批次的命令數量 */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); /* 逐個遍歷命令,將其序列化後寫入 repl_backlog */ for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); feedReplicationBacklogWithObject(argv[j]); feedReplicationBacklog(aux+len+1,2); } } // 將命令批量傳播給所有 slaves 對應的 client listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->
value; /* 寫入當前批次的命令數量 */ addReplyArrayLen(slave,argc); /* 逐個遍歷命令,將傳播給 slave 節點 */ for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } } ``` # 相關引數 ### slave-serve-stale-data  主從節點斷開時或同步未完成時,slave 如何響應客戶端請求 - **yes**:正常響應命令,但是不保證資料質量 - **no**:拒絕響應命令,返回 **SYNC with master in progress** ### repl-diskless-sync  執行全量同步時,master 如何將 RDB 快照傳輸給 slave - **no**:先在磁碟生成 RDB 檔案再進行傳輸(低頻寬網路) - **yes**:直接將 RDB 快照寫入 slave 的 socket(低速磁碟 + 高頻寬網路) ### repl-ping-slave-period  master 向 slave 傳送 `PING` 心跳的間隔,預設 10s 傳送一次 ### repl-backlog-size  同步積壓緩衝的空間,預設值大小為 1mb。  由於主從連線斷開後,所有的命令都會積壓在這裡,如果該值太小會導致 `PSYNC` 命令會無法執行部分同步。  如果 master 需要執行大量寫命令,或者 slave 需要較長時間才能重連成功,則需要根據實際情況進行估算。 ### min-slaves-to-write & min-slaves-max-lag  則當不滿足下列條件時,master 會拒絕寫命令直至恢復: - 連線當前 master 的 slave 數量大於等於 **min-slaves-to-write** 個節點連線正常 - 連線正常的 slave 節點中不少於 **min-slaves-to-write** 個節點的延遲時間小於 ***min-slaves-max-lag** 秒  啟用這兩個選項後,寫命令大概率能夠被複制到 **min-slaves-to-write** 個從節點中,減少了命令丟失的概率。
至此,對 redis 的主從同步分析完畢,後續將對 redis 的一些其他細節進行分享,感謝