1. 程式人生 > >Redis原始碼解析:15Resis主從複製之從節點流程

Redis原始碼解析:15Resis主從複製之從節點流程

Redis原始碼解析:15Resis主從複製之從節點流程

 

版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/gqtcgq/article/details/51172085

         Redis的主從複製功能,可以實現Redis例項的高可用,避免單個Redis 伺服器的單點故障,並且可以實現負載均衡。

 

一:主從複製過程

         Redis的複製功能分為同步(sync)和命令傳播(commandpropagate)兩個操作:

         同步操作用於將從節點的資料庫狀態更新至主節點當前所處的資料庫狀態;

         命令傳播操作則用於在主節點的資料庫狀態被修改,導致主從節點的資料庫狀態不一致時,讓主從節點的資料庫重新回到一致狀態;

   

1:同步

         當客戶端向從節點發送SLAYEOF命令,或者從節點的配置檔案中配置了slaveof選項時,從節點首先需要執行同步操作,也就是將從節點的資料庫狀態更新至主節點當前所處的資料庫狀態。

         在Redis2.8版本之前,從節點對主節點的同步操作,是通過從節點向主節點發送SYNC命令來完成。過程如下:

         a:從節點向主節點發送SYNC命令;

         b:主節點收到SYNC命令後,執行BGSAVE命令,在後臺生成一個RDB檔案,並使用一個緩衝區記錄從現在開始執行的所有寫命令。

         c:當主節點的BGSAVE命令執行完畢時,主節點會將生成的RDB檔案傳送給從節點,從節點接收並載人這個 RDB檔案,將自己的資料庫狀態更新至主伺服器執行BGSAYE命令時的狀態。

         d:主節點將記錄在緩衝區裡面的所有寫命令傳送給從節點,從節點執行這些寫命令,將自己的資料庫狀態更新至主節點資料庫當前所處的狀態。

 

2:命令傳播

         在同步操作執行完畢之後,主從伺服器兩者的資料庫將達到一致狀態。但當主節點執行客戶端傳送的寫命令時,主從伺服器狀態將不再一致。

         為了讓主從伺服器再次回到一致狀態,主伺服器將自己執行的寫命令,傳送給從伺服器執行,當從伺服器執行了相同的寫命令之後,主從伺服器將再次回到一致狀態。

 

3:完全重同步和部分重同步

         以上就是舊版Redis執行主從複製時的過程。它有個缺點,就是當主從節點間的連線斷開後,從節點會發送SYNC命令來重新進行一次完整複製操作。這樣即使斷開期間主節點的變化很小(甚至沒有),也需要將主節點中的所有資料重新快照並傳送一次。這種實現方式顯然不太理想。

         自2.8版開始,Redis支援部分重同步功能。該功能通過”PSYNC”命令實現。部分重同步是基於如下3點實現的:

         a:從節點會儲存主節點的執行ID。每個Redis 執行例項均會擁有一個唯一的執行ID,每當例項重啟後,就會自動生成一個新的執行ID。

         b:在命令傳播階段,主節點每將一個命令傳送給從節點時,都會同時把該命令存放到一個積壓佇列(backlog)中,並記錄下當前積壓佇列中,存放的命令的偏移量範圍。

         c:同時,從節點接收到主節點傳來的命令時,會記錄下該命令的偏移量。主節點和所有從節點都記錄了命令的偏移量。

 

         當主從連線準備就緒後,從節點會發送一條”PSYNC”命令,格式為”PSYNC  <runid> <offset>”。

         從節點第一次連線主節點是,置runid為”?”,offset為”-1”。如果是斷鏈重連,則從節點發送之前儲存的主節點執行ID和複製偏移。

         主節點收到”PSYNC”命令後,會執行以下判斷來決定此次重連是否可以執行部分重同步:

         a:首先判斷從節點傳送來的<runid>是否和自己的執行ID相同;

         b:然後判斷從節點傳送來的複製偏移量<offset>是否在積壓佇列中;

         如果以上兩個條件都滿足,則可以執行部分重同步,並將積壓佇列中相應的命令傳送給從節點。如果不滿足,主節點會進行一次完全重同步,也就是進行之前版本中收到”SYNC”命令後的操作。

 

         主從複製功能是從節點主動發起,主節點配合完成的,因此,本文先介紹從節點在主從複製時的流程。

         注意,下面的流程都基於Redis3.0.5版本。

 

二:從節點屬性

         在Redis原始碼中,表示Redis伺服器的全域性結構體struct redisServer  server中,與主從複製相關的,從節點屬性如下:

         server.masterhost:記錄主節點的ip地址;

         server.masterport:記錄主節點的埠號;

         server.repl_transfer_s:socket描述符,用於主從複製過程中,從節點與主節點之間的TCP通訊,包括主從節點間的握手通訊、接收RDB資料,以及後續的命令傳播過程;

         server.repl_transfer_fd:檔案描述符,用於從節點將收到的RDB資料寫到本地臨時檔案;

         server.repl_transfer_tmpfile:從節點上,用於記錄RDB資料的臨時檔名;

         server.repl_state:記錄主從複製過程中,從節點的狀態。

         server.master:當從節點接受完主節點發來的RDB資料之後,進入命令傳播過程。從節點就將主節點當成一個客戶端看待。server.master就是redisClient結構的主節點客戶端,從節點接收該server.master發來的命令,像處理普通客戶端的命令請求一樣進行處理,從而實現了從節點和主節點之間的同步。

         server.master->reploff:從節點記錄的複製偏移量,每次收到主節點發來的命令時,就會將命令長度增加到該複製偏移量上,以保持和主節點複製偏移量的一致。

         server.master->replrunid:從節點記錄的主節點執行ID。

 

         server.cached_master:主從節點複製過程中(具體應該是命令傳播過程中),如果從節點與主節點之間連線斷掉了,會呼叫freeClient(server.master),關閉與主節點客戶端的連線。為了後續重連時能夠進行部分重同步,在freeClient中,會呼叫replicationCacheMaster函式,將server.master儲存到server.cached_master。該redisClient結構中記錄了主節點的執行ID,以及複製偏移。當後續與主節點的連線又重新建立起來的時候,使用這些資訊進行部分重同步,也就是傳送"PSYNC  <runid>  <offset>"命令。

 

         server.repl_master_runid和server.repl_master_initial_offset:從節點發送"PSYNC  <runid> <offset>"命令後,如果主節點不支援部分重同步,則會回覆資訊為"+FULLRESYNC <runid>  <offset>",表示要進行完全重同步,其中<runid>表示主節點的執行ID,記錄到server.repl_master_runid中,<offset>表示主節點的初始複製偏移,記錄到server.repl_master_initial_offset中。

 

三:建鏈和握手過程

         從節點在收到客戶端發來的”slaveof”命令時,或者在配置檔案中配置了”slaveof”選項時,就會向主節點建鏈,開始主從複製過程。

         在主節點將實際的RDB資料傳送給從節點之前,還需要經歷握手過程,這非常類似於TCP建鏈的三次握手。該過程由從節點主動發起,主節點作出相應的迴應。握手過程如下:

         該握手過程中,從節點的狀態會發生轉換,從REDIS_REPL_CONNECT狀態起,一直到REDIS_REPL_RECEIVE_PSYNC狀態期間,都算是握手過程。

 

1:TCP建鏈

         在Redis原始碼中,使用server.repl_state記錄從節點的狀態。在Redis初始化時,該狀態為REDIS_REPL_NONE。

         當從節點收到客戶端使用者發來的”SLAVEOF” 命令時,或者在讀取配置檔案,發現了”slaveof”配置選項,就會將server.repl_state置為REDIS_REPL_CONNECT狀態。該狀態表示下一步需要向主節點發起TCP建鏈。

         在定時執行的函式serverCron中,會呼叫replicationCron函式檢查主從複製的狀態。該函式中,一旦發現當前的server.repl_state為REDIS_REPL_CONNECT,則會呼叫函式connectWithMaster,向主節點發起TCP建鏈請求,其程式碼如下:

 
  1. int connectWithMaster(void) {

  2. int fd;

  3.  
  4. fd = anetTcpNonBlockBestEffortBindConnect(NULL,

  5. server.masterhost,server.masterport,REDIS_BIND_ADDR);

  6. if (fd == -1) {

  7. redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",

  8. strerror(errno));

  9. return REDIS_ERR;

  10. }

  11.  
  12. if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==

  13. AE_ERR)

  14. {

  15. close(fd);

  16. redisLog(REDIS_WARNING,"Can't create readable event for SYNC");

  17. return REDIS_ERR;

  18. }

  19.  
  20. server.repl_transfer_lastio = server.unixtime;

  21. server.repl_transfer_s = fd;

  22. server.repl_state = REDIS_REPL_CONNECTING;

  23. return REDIS_OK;

  24. }

 

         server.masterhost和server.masterport分別記錄了主節點的IP地址和埠號。它們要麼是在slaveof選項中配置,要麼是”SLAVEOF”命令中的引數。

         首先呼叫anetTcpNonBlockBestEffortBindConnect,向主節點發起connect建鏈請求;該函式建立socket描述符,將該描述符設定為非阻塞,必要情況下會繫結本地地址,然後connect向主節點發起TCP建鏈請求。該函式返回建鏈的socket描述符fd;

         然後註冊socket描述符fd上的可讀和可寫事件,事件回撥函式都為syncWithMaster,該函式用於處理主從節點間的握手過程;      

         然後將socket描述符記錄到server.repl_transfer_s中。置主從複製狀態server.repl_state為REDIS_REPL_CONNECTING,表示從節點正在向主節點建鏈;

 

2:複製握手

         當主從節點間的TCP建鏈成功之後,就會觸發socket描述符server.repl_transfer_s上的可寫事件,從而呼叫函式syncWithMaster。該函式處理從節點與主節點間的握手過程。也就是從節點在向主節點發送TCP建鏈請求,到接收RDB資料之前的過程。其程式碼如下:

 
  1. void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {

  2. char tmpfile[256], *err = NULL;

  3. int dfd, maxtries = 5;

  4. int sockerr = 0, psync_result;

  5. socklen_t errlen = sizeof(sockerr);

  6. REDIS_NOTUSED(el);

  7. REDIS_NOTUSED(privdata);

  8. REDIS_NOTUSED(mask);

  9.  
  10. /* If this event fired after the user turned the instance into a master

  11. * with SLAVEOF NO ONE we must just return ASAP. */

  12. if (server.repl_state == REDIS_REPL_NONE) {

  13. close(fd);

  14. return;

  15. }

  16.  
  17. /* Check for errors in the socket. */

  18. if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)

  19. sockerr = errno;

  20. if (sockerr) {

  21. redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",

  22. strerror(sockerr));

  23. goto error;

  24. }

  25.  
  26. /* Send a PING to check the master is able to reply without errors. */

  27. if (server.repl_state == REDIS_REPL_CONNECTING) {

  28. redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");

  29. /* Delete the writable event so that the readable event remains

  30. * registered and we can wait for the PONG reply. */

  31. aeDeleteFileEvent(server.el,fd,AE_WRITABLE);

  32. server.repl_state = REDIS_REPL_RECEIVE_PONG;

  33. /* Send the PING, don't check for errors at all, we have the timeout

  34. * that will take care about this. */

  35. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);

  36. if (err) goto write_error;

  37. return;

  38. }

  39.  
  40. /* Receive the PONG command. */

  41. if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {

  42. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  43.  
  44. /* We accept only two replies as valid, a positive +PONG reply

  45. * (we just check for "+") or an authentication error.

  46. * Note that older versions of Redis replied with "operation not

  47. * permitted" instead of using a proper error code, so we test

  48. * both. */

  49. if (err[0] != '+' &&

  50. strncmp(err,"-NOAUTH",7) != 0 &&

  51. strncmp(err,"-ERR operation not permitted",28) != 0)

  52. {

  53. redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",err);

  54. sdsfree(err);

  55. goto error;

  56. } else {

  57. redisLog(REDIS_NOTICE,

  58. "Master replied to PING, replication can continue...");

  59. }

  60. sdsfree(err);

  61. server.repl_state = REDIS_REPL_SEND_AUTH;

  62. }

  63.  
  64. /* AUTH with the master if required. */

  65. if (server.repl_state == REDIS_REPL_SEND_AUTH) {

  66. if (server.masterauth) {

  67. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);

  68. if (err) goto write_error;

  69. server.repl_state = REDIS_REPL_RECEIVE_AUTH;

  70. return;

  71. } else {

  72. server.repl_state = REDIS_REPL_SEND_PORT;

  73. }

  74. }

  75.  
  76. /* Receive AUTH reply. */

  77. if (server.repl_state == REDIS_REPL_RECEIVE_AUTH) {

  78. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  79. if (err[0] == '-') {

  80. redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);

  81. sdsfree(err);

  82. goto error;

  83. }

  84. sdsfree(err);

  85. server.repl_state = REDIS_REPL_SEND_PORT;

  86. }

  87.  
  88. /* Set the slave port, so that Master's INFO command can list the

  89. * slave listening port correctly. */

  90. if (server.repl_state == REDIS_REPL_SEND_PORT) {

  91. sds port = sdsfromlonglong(server.port);

  92. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",

  93. "listening-port",port, NULL);

  94. sdsfree(port);

  95. if (err) goto write_error;

  96. sdsfree(err);

  97. server.repl_state = REDIS_REPL_RECEIVE_PORT;

  98. return;

  99. }

  100.  
  101. /* Receive REPLCONF listening-port reply. */

  102. if (server.repl_state == REDIS_REPL_RECEIVE_PORT) {

  103. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  104. /* Ignore the error if any, not all the Redis versions support

  105. * REPLCONF listening-port. */

  106. if (err[0] == '-') {

  107. redisLog(REDIS_NOTICE,"(Non critical) Master does not understand "

  108. "REPLCONF listening-port: %s", err);

  109. }

  110. sdsfree(err);

  111. server.repl_state = REDIS_REPL_SEND_CAPA;

  112. }

  113.  
  114. /* Inform the master of our capabilities. While we currently send

  115. * just one capability, it is possible to chain new capabilities here

  116. * in the form of REPLCONF capa X capa Y capa Z ...

  117. * The master will ignore capabilities it does not understand. */

  118. if (server.repl_state == REDIS_REPL_SEND_CAPA) {

  119. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",

  120. "capa","eof",NULL);

  121. if (err) goto write_error;

  122. sdsfree(err);

  123. server.repl_state = REDIS_REPL_RECEIVE_CAPA;

  124. return;

  125. }

  126.  
  127. /* Receive CAPA reply. */

  128. if (server.repl_state == REDIS_REPL_RECEIVE_CAPA) {

  129. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  130. /* Ignore the error if any, not all the Redis versions support

  131. * REPLCONF capa. */

  132. if (err[0] == '-') {

  133. redisLog(REDIS_NOTICE,"(Non critical) Master does not understand "

  134. "REPLCONF capa: %s", err);

  135. }

  136. sdsfree(err);

  137. server.repl_state = REDIS_REPL_SEND_PSYNC;

  138. }

  139.  
  140. /* Try a partial resynchonization. If we don't have a cached master

  141. * slaveTryPartialResynchronization() will at least try to use PSYNC

  142. * to start a full resynchronization so that we get the master run id

  143. * and the global offset, to try a partial resync at the next

  144. * reconnection attempt. */

  145. if (server.repl_state == REDIS_REPL_SEND_PSYNC) {

  146. if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {

  147. err = sdsnew("Write error sending the PSYNC command.");

  148. goto write_error;

  149. }

  150. server.repl_state = REDIS_REPL_RECEIVE_PSYNC;

  151. return;

  152. }

  153.  
  154. /* If reached this point, we should be in REDIS_REPL_RECEIVE_PSYNC. */

  155. if (server.repl_state != REDIS_REPL_RECEIVE_PSYNC) {

  156. redisLog(REDIS_WARNING,"syncWithMaster(): state machine error, "

  157. "state should be RECEIVE_PSYNC but is %d",

  158. server.repl_state);

  159. goto error;

  160. }

  161.  
  162. psync_result = slaveTryPartialResynchronization(fd,1);

  163. if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

  164.  
  165. /* Note: if PSYNC does not return WAIT_REPLY, it will take care of

  166. * uninstalling the read handler from the file descriptor. */

  167.  
  168. if (psync_result == PSYNC_CONTINUE) {

  169. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");

  170. return;

  171. }

  172.  
  173. /* PSYNC failed or is not supported: we want our slaves to resync with us

  174. * as well, if we have any (chained replication case). The mater may

  175. * transfer us an entirely different data set and we have no way to

  176. * incrementally feed our slaves after that. */

  177. disconnectSlaves(); /* Force our slaves to resync with us as well. */

  178. freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

  179.  
  180. /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC

  181. * and the server.repl_master_runid and repl_master_initial_offset are

  182. * already populated. */

  183. if (psync_result == PSYNC_NOT_SUPPORTED) {

  184. redisLog(REDIS_NOTICE,"Retrying with SYNC...");

  185. if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {

  186. redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",

  187. strerror(errno));

  188. goto error;

  189. }

  190. }

  191.  
  192. /* Prepare a suitable temp file for bulk transfer */

  193. while(maxtries--) {

  194. snprintf(tmpfile,256,

  195. "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());

  196. dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);

  197. if (dfd != -1) break;

  198. sleep(1);

  199. }

  200. if (dfd == -1) {

  201. redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));

  202. goto error;

  203. }

  204.  
  205. /* Setup the non blocking download of the bulk file. */

  206. if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)

  207. == AE_ERR)

  208. {

  209. redisLog(REDIS_WARNING,

  210. "Can't create readable event for SYNC: %s (fd=%d)",

  211. strerror(errno),fd);

  212. goto error;

  213. }

  214.  
  215. server.repl_state = REDIS_REPL_TRANSFER;

  216. server.repl_transfer_size = -1;

  217. server.repl_transfer_read = 0;

  218. server.repl_transfer_last_fsync_off = 0;

  219. server.repl_transfer_fd = dfd;

  220. server.repl_transfer_lastio = server.unixtime;

  221. server.repl_transfer_tmpfile = zstrdup(tmpfile);

  222. return;

  223.  
  224. error:

  225. aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);

  226. close(fd);

  227. server.repl_transfer_s = -1;

  228. server.repl_state = REDIS_REPL_CONNECT;

  229. return;

  230.  
  231. write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */

  232. redisLog(REDIS_WARNING,"Sending command to master in replication handshake: %s", err);

  233. sdsfree(err);

  234. goto error;

  235. }

 

         函式中如果發生了錯誤,則錯誤處理的方式是:刪除socket描述符上註冊的可讀和可寫事件,然後關閉描述符,置狀態server.repl_state為REDIS_REPL_CONNECT,等待下次呼叫replicationCron時重連主節點;

 

         首先檢查當前主從複製狀態server.repl_state是否為REDIS_REPL_NONE,若是,則說明握手過程期間,從節點收到了客戶端執行的"slave  no  one"命令,因此直接關閉socket描述符,然後返回;

         然後呼叫getsockopt,檢查當前socket描述符的錯誤,若出錯,則執行錯誤處理流程;

        

         如果當前的複製狀態為REDIS_REPL_CONNECTING,則說明是從節點connect主節點成功後,觸發了描述符的可寫事件,從而呼叫的該回調函式。這種情況下,先刪除描述符上的可寫事件,然後將狀態設定為REDIS_REPL_RECEIVE_PONG,向主節點發送"PING"命令,然後返回;

 

         如果當前的複製狀態為REDIS_REPL_RECEIVE_PONG,則說明從節點收到了主節點對於"PING"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆資訊,正常情況下,主節點的回覆只能有三種情況:"+PONG","-NOAUTH"和"-ERR operation not permitted"(老版本的redis主節點),如果收到的回覆不是以上的三種,則直接進入錯誤處理程式碼流程。否則,將複製狀態置為REDIS_REPL_SEND_AUTH(不返回);

         當前的複製狀態為REDIS_REPL_SEND_AUTH,如果配置了"masterauth"選項,則向主節點發送"AUTH"命令,後跟"masterauth"選項的值,然後將狀態置為REDIS_REPL_RECEIVE_AUTH,然後返回;

       如果從節點沒有配置"masterauth"選項,則將複製狀態置為REDIS_REPL_SEND_PORT(不返回);

 

         如果當前的複製狀態為REDIS_REPL_RECEIVE_AUTH,說明從節點收到了主節點對於"AUTH"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆,如果回覆資訊的首位元組為"-",說明認證失敗,直接進入錯誤處理流程;否則,將狀態置為REDIS_REPL_SEND_PORT(不返回);

         如果當前複製狀態為REDIS_REPL_SEND_PORT,則向主節點發送"REPLCONF listening-port  <port>"命令,告知主節點本身的埠號,然後將複製狀態置為REDIS_REPL_RECEIVE_PORT後返回;

 

         如果當前的複製狀態為REDIS_REPL_RECEIVE_PORT,說明從節點收到了主節點對於"REPLCONF listening-port"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆,如果回覆資訊的首位元組為"-",說明主節點不認識該命令,這不是致命錯誤,只是記錄日誌而已;然後將複製狀態設定為REDIS_REPL_SEND_CAPA(不返回);

         如果當前的複製狀態為REDIS_REPL_SEND_CAPA,則向主節點發送"REPLCONF capa  eof"命令,告知主節點本身的"能力",然後將複製狀態置為REDIS_REPL_RECEIVE_CAPA後返回;

 

         如果當前的複製狀態為REDIS_REPL_RECEIVE_CAPA,說明從節點收到了主節點對於"REPLCONF capa eof"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆,如果回覆資訊的首位元組為"-",說明主節點不認識該命令,這不是致命錯誤,只是記錄日誌,然後將複製狀態設定為REDIS_REPL_SEND_PSYNC(不返回);

         如果複製狀態為REDIS_REPL_SEND_PSYNC,則呼叫slaveTryPartialResynchronization函式,向主節點發送"PSYNC  <psync_runid>  <psync_offset>"命令。

         在該函式中,如果從節點快取了主節點,說明該從節點之前與主節點的連線斷掉了,現在是重新連線,因此嘗試進行部分重同步。置psync_runid為儲存的主節點ID,置psync_offset為儲存的主節點複製偏移加1;如果從節點沒有快取主節點,說明需要進行完全重同步,則置psync_runid為"?",置psync_offset為"-1";

         傳送命令成功後函式返回,將複製狀態置為REDIS_REPL_RECEIVE_PSYNC後返回;

 

         接下來的程式碼處理握手過程的最後一個狀態REDIS_REPL_RECEIVE_PSYNC,走到這裡,複製狀態只能是REDIS_REPL_RECEIVE_PSYNC,如果不是則進入錯誤處理流程;

         呼叫slaveTryPartialResynchronization讀取主節點對於"PSYNC"命令的回覆:

         如果回覆資訊以"+CONTINUE"開頭,說明主節點可以進行部分重同步,這種情況下,設定複製狀態為REDIS_REPL_CONNECTED,後續將主節點當成一個客戶端,接收該主節點客戶端發來的命令請求,像處理普通客戶端一樣處理即可。因此函式slaveTryPartialResynchronization返回PSYNC_CONTINUE後,該函式直接返回即可;

         如果回覆資訊以"+FULLRESYNC"開頭,說明主節點雖然認識"PSYNC"命令,但是從節點發送的複製偏移psync_offset已經不在主節點的積壓佇列中了,因此需要進行完全重同步。解析出回覆資訊中的主節點ID,儲存在server.repl_master_runid中;解析出主節點複製偏移初始值,儲存在server.repl_master_initial_offset中;然後函式slaveTryPartialResynchronization返回PSYNC_FULLRESYNC;

         如果回覆資訊不屬於以上的情況,說明主節點不認識"PSYNC"命令,這種情況下,函式slaveTryPartialResynchronization返回PSYNC_NOT_SUPPORTED;

 

         不管函式slaveTryPartialResynchronization返回PSYNC_FULLRESYNC,還是返回PSYNC_NOT_SUPPORTED,都表示接下來要進行完全重同步過程:

         首先斷開當前例項與所有從節點的連線,因為接下來要進行完全重同步,本例項會接收主節點發來的完全不同的資料,因此此舉可以讓該例項的從節點重新進行復制同步過程(從而也接收這些資料);

         然後呼叫freeReplicationBacklog,釋放本例項的積壓佇列server.repl_backlog;

         如果slaveTryPartialResynchronization函式返回的是PSYNC_NOT_SUPPORTED,說明這是老版本的主節點,不支援"PSYNC"命令,因此向主節點發送"SYNC"命令(主節點收到該命令後,直接傳送RDB資料);

         接下來,就是為接收主節點發送來的RDB資料做準備:

         首先建立儲存RDB資料的臨時檔案"temp-<unixtime>.<pid>.rdb",該檔案的描述符記錄到server.repl_transfer_fd中;

         然後,註冊socket描述符server.repl_transfer_s上的可讀事件,事件回撥函式為readSyncBulkPayload;

         最後,置複製狀態為REDIS_REPL_TRANSFER,表示開始接收主節點的RDB資料。然後執行下列操作後返回:

 
  1. server.repl_state = REDIS_REPL_TRANSFER;

  2. server.repl_transfer_size = -1;

  3. server.repl_transfer_read = 0;

  4. server.repl_transfer_last_fsync_off = 0;

  5. server.repl_transfer_fd = dfd;

  6. server.repl_transfer_lastio = server.unixtime;

  7. server.repl_transfer_tmpfile = zstrdup(tmpfile);

 

 

 

四:從節點的複製狀態轉換

         根據以上的握手過程,總結出從節點的複製狀態轉換圖,如下圖所示:

         在這些狀態中,REDIS_REPL_CONNECT狀態是從節點的初始狀態,在狀態轉移過程中,出現了任何錯誤,都會關閉socket描述符,然後將狀態置為REDIS_REPL_CONNECT,等待下次呼叫定時函式replicationCron時,重新連線主節點。

         從REDIS_REPL_RECEIVE_PONG狀態到REDIS_REPL_RECEIVE_PSYNC狀態之間,是主從節點間的握手過程。

         REDIS_REPL_RECEIVE_PSYNC狀態之後,如果主節點支援部分重同步,則從節點進入狀態REDIS_REPL_CONNECTED,後續從節點將主節點當成客戶端server.master,從節點接收客戶端server.master發來的命令,像處理普通客戶端的命令請求一樣進行處理,從而實現了從節點和主節點之間的同步;

         如果主節點不支援部分重同步,則需要進行完全重同步,從節點進入REDIS_REPL_TRANSFER狀態,開始接收主節點發來的RDB資料。一旦從節點接收到完整的RDB資料,則載入該RDB資料,載入完成之後,從節點進入REDIS_REPL_CONNECTED狀態,將主節點當成客戶端server.master,接收客戶端server.master發來的命令,實現了從節點和主節點之間的同步;

 

五:接收RDB資料

         正常情況下,完全重同步需要主節點將其中的資料轉儲到RDB檔案中,然後將該檔案傳送給從節點。如果硬碟IO效率較差,則這種操作對於主節點的效能會造成會影響。

         從2.8.18版本開始,Redis引入了“無硬碟複製”選項,開啟該選項時,Redis在與從節點進行復制初始化時將不會將快照內容儲存到硬碟上,而是直接通過網路傳送給從節點,避免了硬碟的效能瓶頸。不過該功能還在試驗階段,可以在配置檔案中使用"repl-diskless-sync"選項來配置開啟該功能。

 

         有硬碟複製的RDB資料和無硬碟複製的RDB資料,它們的格式是不一樣的。有硬碟複製的RDB資料,主節點將資料儲存到RDB檔案後,將檔案內容加上"$<len>/r/n"的頭部後,傳送給從節點。無硬碟複製的RDB資料,主節點直接將資料傳送給從節點,而不再先儲存到本地檔案中,這種格式的RDB資料以"$EOF:<XXX>\r\n"開頭,以"<XXX>"結尾。開頭和結尾中的<XXX>內容相同,都是40位元組長的,由"0123456789abcdef"中的字元組成的隨機字串。

 

         在syncWithMaster函式中,握手過程結束後,需要進行完全重同步時,從節點註冊了socket描述符server.repl_transfer_s上的可讀事件,事件回撥函式為readSyncBulkPayload。從節點呼叫該函式接收主節點發來的RDB資料,該函式的程式碼如下:

 
  1. #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */

  2. void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {

  3. char buf[4096];

  4. ssize_t nread, readlen;

  5. off_t left;

  6. REDIS_NOTUSED(el);

  7. REDIS_NOTUSED(privdata);

  8. REDIS_NOTUSED(mask);

  9.  
  10. /* Static vars used to hold the EOF mark, and the last bytes received

  11. * form the server: when they match, we reached the end of the transfer. */

  12. static char eofmark[REDIS_RUN_ID_SIZE];

  13. static char lastbytes[REDIS_RUN_ID_SIZE];

  14. static int usemark = 0;

  15.  
  16. /* If repl_transfer_size == -1 we still have to read the bulk length

  17. * from the master reply. */

  18. if (server.repl_transfer_size == -1) {

  19. if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {

  20. redisLog(REDIS_WARNING,

  21. "I/O error reading bulk count from MASTER: %s",

  22. strerror(errno));

  23. goto error;

  24. }

  25.  
  26. if (buf[0] == '-') {

  27. redisLog(REDIS_WARNING,

  28. "MASTER aborted replication with an error: %s",

  29. buf+1);

  30. goto error;

  31. } else if (buf[0] == '\0') {

  32. /* At this stage just a newline works as a PING in order to take

  33. * the connection live. So we refresh our last interaction

  34. * timestamp. */

  35. server.repl_transfer_lastio = server.unixtime;

  36. return;

  37. } else if (buf[0] != '$') {

  38. redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);

  39. goto error;

  40. }

  41.  
  42. /* There are two possible forms for the bulk payload. One is the

  43. * usual $<count> bulk format. The other is used for diskless transfers

  44. * when the master does not know beforehand the size of the file to

  45. * transfer. In the latter case, the following format is used:

  46. *

  47. * $EOF:<40 bytes delimiter>

  48. *

  49. * At the end of the file the announced delimiter is transmitted. The

  50. * delimiter is long and random enough that the probability of a

  51. * collision with the actual file content can be ignored. */

  52. if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {

  53. usemark = 1;

  54. memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);

  55. memset(lastbytes,0,REDIS_RUN_ID_SIZE);

  56. /* Set any repl_transfer_size to avoid entering this code path

  57. * at the next call. */

  58. server.repl_transfer_size = 0;

  59. redisLog(REDIS_NOTICE,

  60. "MASTER <-> SLAVE sync: receiving streamed RDB from master");

  61. } else {

  62. usemark = 0;

  63. server.repl_transfer_size = strtol(buf+1,NULL,10);

  64. redisLog(REDIS_NOTICE,

  65. "MASTER <-> SLAVE sync: receiving %lld bytes from master",

  66. (long long) server.repl_transfer_size);

  67. }

  68. return;

  69. }

  70.  
  71. /* Read bulk data */

  72. if (usemark) {

  73. readlen = sizeof(buf);

  74. } else {

  75. left = server.repl_transfer_size - server.repl_transfer_read;

  76. readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);

  77. }

  78.  
  79. nread = read(fd,buf,readlen);

  80. if (nread <= 0) {

  81. redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",

  82. (nread == -1) ? strerror(errno) : "connection lost");

  83. replicationAbortSyncTransfer();

  84. return;

  85. }

  86. server.stat_net_input_bytes += nread;

  87.  
  88. /* When a mark is used, we want to detect EOF asap in order to avoid

  89. * writing the EOF mark into the file... */

  90. int eof_reached = 0;

  91.  
  92. if (usemark) {

  93. /* Update the last bytes array, and check if it matches our delimiter.*/

  94. if (nread >= REDIS_RUN_ID_SIZE) {

  95. memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);

  96. } else {

  97. int rem = REDIS_RUN_ID_SIZE-nread;

  98. memmove(lastbytes,lastbytes+nread,rem);

  99. memcpy(lastbytes+rem,buf,nread);

  100. }

  101. if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;

  102. }

  103.  
  104. server.repl_transfer_lastio = server.unixtime;

  105. if (write(server.repl_transfer_fd,buf,nread) != nread) {

  106. redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));

  107. goto error;

  108. }

  109. server.repl_transfer_read += nread;

  110.  
  111. /* Delete the last 40 bytes from the file if we reached EOF. */

  112. if (usemark && eof_reached) {

  113. if (ftruncate(server.repl_transfer_fd,

  114. server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)

  115. {

  116. redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));

  117. goto error;

  118. }

  119. }

  120.  
  121. /* Sync data on disk from time to time, otherwise at the end of the transfer

  122. * we may suffer a big delay as the memory buffers are copied into the

  123. * actual disk. */

  124. if (server.repl_transfer_read >=

  125. server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)

  126. {

  127. off_t sync_size = server.repl_transfer_read -

  128. server.repl_transfer_last_fsync_off;

  129. rdb_fsync_range(server.repl_transfer_fd,

  130. server.repl_transfer_last_fsync_off, sync_size);

  131. server.repl_transfer_last_fsync_off += sync_size;

  132. }

  133.  
  134. /* Check if the transfer is now complete */

  135. if (!usemark) {

  136. if (server.repl_transfer_read == server.repl_transfer_size)

  137. eof_reached = 1;

  138. }

  139.  
  140. if (eof_reached) {

  141. if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {

  142. redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));

  143. replicationAbortSyncTransfer();

  144. return;

  145. }

  146. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");

  147. signalFlushedDb(-1);

  148. emptyDb(replicationEmptyDbCallback);

  149. /* Before loading the DB into memory we need to delete the readable

  150. * handler, otherwise it will get called recursively since

  151. * rdbLoad() will call the event loop to process events from time to

  152. * time for non blocking loading. */

  153. aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);

  154. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");

  155. if (rdbLoad(server.rdb_filename) != REDIS_OK) {

  156. redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");

  157. replicationAbortSyncTransfer();

  158. return;

  159. }

  160. /* Final setup of the connected slave <- master link */

  161. zfree(server.repl_transfer_tmpfile);

  162. close(server.repl_transfer_fd);

  163. server.master = createClient(server.repl_transfer_s);

  164. server.master->flags |= REDIS_MASTER;

  165. server.master->authenticated = 1;

  166. server.repl_state = REDIS_REPL_CONNECTED;

  167. server.master->reploff = server.repl_master_initial_offset;

  168. memcpy(server.master->replrunid, server.repl_master_runid,

  169. sizeof(server.repl_master_runid));

  170. /* If master offset is set to -1, this master is old and is not

  171. * PSYNC capable, so we flag it accordingly. */

  172. if (server.master->reploff == -1)

  173. server.master->flags |= REDIS_PRE_PSYNC;

  174. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");

  175. /* Restart the AOF subsystem now that we finished the sync. This

  176. * will trigger an AOF rewrite, and when done will start appending

  177. * to the new file. */

  178. if (server.aof_state != REDIS_AOF_OFF) {

  179. int retry = 10;

  180.  
  181. stopAppendOnly();

  182. while (retry-- && startAppendOnly() == REDIS_ERR) {

  183. redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");

  184. sleep(1);

  185. }

  186. if (!retry) {

  187. redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");

  188. exit(1);

  189. }

  190. }

  191. }

  192.  
  193. return;

  194.  
  195. error:

  196. replicationAbortSyncTransfer();

  197. return;

  198. }

 

         server.repl_transfer_size的值表示要讀取的RDB資料的總長度(僅對有硬碟複製的RDB資料而言)。如果當前其值為-1,說明本次是第一次接收RDB資料。因此,首先呼叫syncReadLine,讀取主節點發來的第一行資料("\r\n"之前的內容)到buf中,讀取的超時時間為5s,如果在5s之內還讀不到"\n",則syncReadLine返回-1,因此呼叫函式replicationAbortSyncTransfer,終止本次複製過程,然後返回;

         然後解析讀取到的內容,如果符合無硬碟複製的RDB資料格式,則將40位元組的隨機串記錄到靜態變數eofmark中,並且置usemark為1,置server.repl_transfer_size為0,然後返回;

         如果不符合無硬碟複製的RDB資料格式,則認為是有硬碟複製的RDB資料,從buf中解析得到RDB資料的長度,記錄到server.repl_transfer_size中,並且置usemark為0後返回;

 

         後續可讀事件觸發,再次呼叫該函式時,server.repl_transfer_size已不再是-1,開始接收真正的RDB資料了。usemark為0,表示是有硬碟複製的RDB資料,為1,表示是無硬碟複製的的RDB資料;

 

         接下來呼叫read,讀取RDB資料內容到buf中。read返回值為nread,如果nread小於等於0,要麼說明發生了錯誤,要麼說明主節點終止了連結,無論哪種情況,都是呼叫函式replicationAbortSyncTransfer,終止本次複製過程,然後返回; 

         如果nread大於0,則先將其增加到server.stat_net_input_bytes中;

         如果是無硬碟複製的RDB資料,則每次read之後,都判斷是否接收到了末尾40位元組的隨機串:如果nread大於等於40,則將buf中後40個位元組複製到lastbytes中;否則,將buf複製到lastbytes中的尾部。然後比對lastbytes和eofmark,如果相同,說明已經接收到了末尾,置eof_reached為1;

 

         然後,將buf寫入到描述符server.repl_transfer_fd中,也就是從節點儲存RDB資料的臨時檔案中;

         然後將nread增加到server.repl_transfer_read中,該屬性記錄了當前已讀到的RDB資料的長度;

         如果是無硬碟複製的RDB資料,並且已經讀到了末尾,則將臨時檔案中末尾的40位元組的隨機串刪除;

         每當讀取了8M的資料後,都執行一次sync操作,保證臨時檔案內容確實寫到了硬碟;         如果是有硬碟複製的RDB資料,且server.repl_transfer_read等於server.repl_transfer_size,則說明已經接收到所有資料,置eof_reached為1;

 

         如果所有的RDB資料已經接收完了,則首先將儲存RDB資料的臨時檔案改名為配置的RDB檔名server.rdb_filename;然後呼叫signalFlushedDb,使得本例項的所有客戶端感知到接下來要清空資料庫了。然後就是呼叫emptyDb,清空所有資料,回撥函式是replicationEmptyDbCallback,每當處理了字典雜湊表中65535個bucket之後,就呼叫一次該函式,向主節點發送一個"\n",以向主節點證明本例項還活著;

         然後刪除server.repl_transfer_s上的可讀事件,這是因為在呼叫rdbLoad載入RDB資料時,每次呼叫rioRead都會呼叫processEventsWhileBlocked處理當前已觸發的事件,如果不刪除該可讀事件的話,就會遞迴進入的本函式中(因此,從節點在載入RDB資料時,是不能處理主節點發來的其他資料的);         

         接下來就是呼叫rdbLoad載入RDB資料;

        

         載入完RDB資料之後,就已經完成了完全重同步過程。接下來,從節點會將主節點當成客戶端,像處理普通客戶端那樣,接收主節點發來的命令,執行命令以保證主從一致性。

         因此,首先關閉RDB臨時檔案描述符server.repl_transfer_fd,然後就使用socket描述符server.repl_transfer_s建立redisClient結構server.master,因此後續還是使用該描述符接收主節點客戶端發來的命令;

         將標記REDIS_MASTER記錄到客戶端標誌中,以表明該客戶端是主節點;

         將複製狀態置為REDIS_REPL_CONNECTED,表示主從節點已完成握手和接收RDB資料的過程;

         主節點之前的傳送"PSYNC"命令回覆為"+FULLRESYNC"時,附帶的初始複製偏移記錄到了server.repl_master_initial_offset中,將其儲存到server.master->reploff;附帶的主節點ID記錄到了server.repl_master_runid中,將其儲存到server.master->replrunid中;如果server.repl_master_initial_offset為-1,說明主節點不認識"PSYNC"命令,因此將REDIS_PRE_PSYNC記錄到客戶端標誌位中;

         完成以上的操作之後,如果本例項開啟了AOF功能,則首先呼叫stopAppendOnly,然後迴圈10次,呼叫startAppendOnly開始進行AOF轉儲,直到startAppendOnly返回REDIS_OK。如果startAppendOnly失敗次數超過10次,則直接exit退出!!!

 

六:命令傳播

         當複製狀態變為REDIS_REPL_CONNECTED後,表示進入了命令傳播階段。後續從節點將主節點當成一個客戶端,接收該主節點客戶端發來的命令請求,像處理普通客戶端一樣處理即可。

 

         在讀取客戶端命令的函式readQueryFromClient中,一旦從節點讀到了追節點發來的同步命令,會將命令長度增加到從節點的複製偏移量server.master. reploff中:

 
  1. if (nread) {

  2. sdsIncrLen(c->querybuf,nread);

  3. c->lastinteraction = server.unixtime;

  4. if (c->flags & REDIS_MASTER) c->reploff += nread;

  5. server.stat_net_input_bytes += nread;

  6. }

         這樣,從節點的複製偏移量server.master. reploff就能與主節點保持一致了。

 

 

         與普通客戶端不同的是,主節點客戶端發來的命令請求無需回覆,因此,在函式prepareClientToWrite中,有下面的語句:

 
  1. int prepareClientToWrite(redisClient *c) {

  2. ...

  3. /* Masters don't receive replies, unless REDIS_MASTER_FORCE_REPLY flag

  4. * is set. */

  5. if ((c->flags & REDIS_MASTER) &&

  6. !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;

  7. ...

  8. }

 

         每次向客戶端輸出快取追加新資料之前,都要呼叫函式prepareClientToWrite函式。如果該函式返回REDIS_ERR,表示無需向輸出快取追加新資料。

         客戶端標誌中如果設定了REDIS_MASTER標記,就表示該客戶端是主節點客戶端server.master,並且在沒有設定REDIS_MASTER_FORCE_REPLY標記的情況下,該函式返回REDIS_ERR,表示無需向輸出快取追加新資料。

 

 

         其他有關主從複製的程式碼,可以參考:

https://github.com/gqtc/redis-3.0.5/blob/master/redis-3.0.5/src/replication.c