1. 程式人生 > >Redis原始碼解析:15Resis主從複製之從節點流程




         Redis的主從複製功能,可以實現Redis例項的高可用,避免單個Redis 伺服器的單點故障,並且可以實現負載均衡。












         c:當主節點的BGSAVE命令執行完畢時,主節點會將生成的RDB檔案傳送給從節點,從節點接收並載人這個 RDB檔案,將自己的資料庫狀態更新至主伺服器執行BGSAYE命令時的狀態。










         a:從節點會儲存主節點的執行ID。每個Redis 執行例項均會擁有一個唯一的執行ID,每當例項重啟後,就會自動生成一個新的執行ID。




         當主從連線準備就緒後,從節點會發送一條”PSYNC”命令,格式為”PSYNC  <runid> <offset>”。











         在Redis原始碼中,表示Redis伺服器的全域性結構體struct redisServer  server中,與主從複製相關的,從節點屬性如下:











         server.cached_master:主從節點複製過程中(具體應該是命令傳播過程中),如果從節點與主節點之間連線斷掉了,會呼叫freeClient(server.master),關閉與主節點客戶端的連線。為了後續重連時能夠進行部分重同步,在freeClient中,會呼叫replicationCacheMaster函式,將server.master儲存到server.cached_master。該redisClient結構中記錄了主節點的執行ID,以及複製偏移。當後續與主節點的連線又重新建立起來的時候,使用這些資訊進行部分重同步,也就是傳送"PSYNC  <runid>  <offset>"命令。


         server.repl_master_runid和server.repl_master_initial_offset:從節點發送"PSYNC  <runid> <offset>"命令後,如果主節點不支援部分重同步,則會回覆資訊為"+FULLRESYNC <runid>  <offset>",表示要進行完全重同步,其中<runid>表示主節點的執行ID,記錄到server.repl_master_runid中,<offset>表示主節點的初始複製偏移,記錄到server.repl_master_initial_offset中。









         當從節點收到客戶端使用者發來的”SLAVEOF” 命令時,或者在讀取配置檔案,發現了”slaveof”配置選項,就會將server.repl_state置為REDIS_REPL_CONNECT狀態。該狀態表示下一步需要向主節點發起TCP建鏈。


  1. int connectWithMaster(void) {

  2. int fd;

  4. fd = anetTcpNonBlockBestEffortBindConnect(NULL,

  5. server.masterhost,server.masterport,REDIS_BIND_ADDR);

  6. if (fd == -1) {

  7. redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",

  8. strerror(errno));

  9. return REDIS_ERR;

  10. }

  12. if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==

  13. AE_ERR)

  14. {

  15. close(fd);

  16. redisLog(REDIS_WARNING,"Can't create readable event for SYNC");

  17. return REDIS_ERR;

  18. }

  20. server.repl_transfer_lastio = server.unixtime;

  21. server.repl_transfer_s = fd;

  22. server.repl_state = REDIS_REPL_CONNECTING;

  23. return REDIS_OK;

  24. }









  1. void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {

  2. char tmpfile[256], *err = NULL;

  3. int dfd, maxtries = 5;

  4. int sockerr = 0, psync_result;

  5. socklen_t errlen = sizeof(sockerr);


  7. REDIS_NOTUSED(privdata);

  8. REDIS_NOTUSED(mask);

  10. /* If this event fired after the user turned the instance into a master

  11. * with SLAVEOF NO ONE we must just return ASAP. */

  12. if (server.repl_state == REDIS_REPL_NONE) {

  13. close(fd);

  14. return;

  15. }

  17. /* Check for errors in the socket. */

  18. if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)

  19. sockerr = errno;

  20. if (sockerr) {

  21. redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",

  22. strerror(sockerr));

  23. goto error;

  24. }

  26. /* Send a PING to check the master is able to reply without errors. */

  27. if (server.repl_state == REDIS_REPL_CONNECTING) {

  28. redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");

  29. /* Delete the writable event so that the readable event remains

  30. * registered and we can wait for the PONG reply. */

  31. aeDeleteFileEvent(server.el,fd,AE_WRITABLE);

  32. server.repl_state = REDIS_REPL_RECEIVE_PONG;

  33. /* Send the PING, don't check for errors at all, we have the timeout

  34. * that will take care about this. */

  35. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);

  36. if (err) goto write_error;

  37. return;

  38. }

  40. /* Receive the PONG command. */

  41. if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {

  42. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  44. /* We accept only two replies as valid, a positive +PONG reply

  45. * (we just check for "+") or an authentication error.

  46. * Note that older versions of Redis replied with "operation not

  47. * permitted" instead of using a proper error code, so we test

  48. * both. */

  49. if (err[0] != '+' &&

  50. strncmp(err,"-NOAUTH",7) != 0 &&

  51. strncmp(err,"-ERR operation not permitted",28) != 0)

  52. {

  53. redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",err);

  54. sdsfree(err);

  55. goto error;

  56. } else {

  57. redisLog(REDIS_NOTICE,

  58. "Master replied to PING, replication can continue...");

  59. }

  60. sdsfree(err);

  61. server.repl_state = REDIS_REPL_SEND_AUTH;

  62. }

  64. /* AUTH with the master if required. */

  65. if (server.repl_state == REDIS_REPL_SEND_AUTH) {

  66. if (server.masterauth) {

  67. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);

  68. if (err) goto write_error;

  69. server.repl_state = REDIS_REPL_RECEIVE_AUTH;

  70. return;

  71. } else {

  72. server.repl_state = REDIS_REPL_SEND_PORT;

  73. }

  74. }

  76. /* Receive AUTH reply. */

  77. if (server.repl_state == REDIS_REPL_RECEIVE_AUTH) {

  78. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  79. if (err[0] == '-') {

  80. redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);

  81. sdsfree(err);

  82. goto error;

  83. }

  84. sdsfree(err);

  85. server.repl_state = REDIS_REPL_SEND_PORT;

  86. }

  88. /* Set the slave port, so that Master's INFO command can list the

  89. * slave listening port correctly. */

  90. if (server.repl_state == REDIS_REPL_SEND_PORT) {

  91. sds port = sdsfromlonglong(server.port);

  92. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",

  93. "listening-port",port, NULL);

  94. sdsfree(port);

  95. if (err) goto write_error;

  96. sdsfree(err);

  97. server.repl_state = REDIS_REPL_RECEIVE_PORT;

  98. return;

  99. }

  101. /* Receive REPLCONF listening-port reply. */

  102. if (server.repl_state == REDIS_REPL_RECEIVE_PORT) {

  103. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  104. /* Ignore the error if any, not all the Redis versions support

  105. * REPLCONF listening-port. */

  106. if (err[0] == '-') {

  107. redisLog(REDIS_NOTICE,"(Non critical) Master does not understand "

  108. "REPLCONF listening-port: %s", err);

  109. }

  110. sdsfree(err);

  111. server.repl_state = REDIS_REPL_SEND_CAPA;

  112. }

  114. /* Inform the master of our capabilities. While we currently send

  115. * just one capability, it is possible to chain new capabilities here

  116. * in the form of REPLCONF capa X capa Y capa Z ...

  117. * The master will ignore capabilities it does not understand. */

  118. if (server.repl_state == REDIS_REPL_SEND_CAPA) {

  119. err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",

  120. "capa","eof",NULL);

  121. if (err) goto write_error;

  122. sdsfree(err);

  123. server.repl_state = REDIS_REPL_RECEIVE_CAPA;

  124. return;

  125. }

  127. /* Receive CAPA reply. */

  128. if (server.repl_state == REDIS_REPL_RECEIVE_CAPA) {

  129. err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

  130. /* Ignore the error if any, not all the Redis versions support

  131. * REPLCONF capa. */

  132. if (err[0] == '-') {

  133. redisLog(REDIS_NOTICE,"(Non critical) Master does not understand "

  134. "REPLCONF capa: %s", err);

  135. }

  136. sdsfree(err);

  137. server.repl_state = REDIS_REPL_SEND_PSYNC;

  138. }

  140. /* Try a partial resynchonization. If we don't have a cached master

  141. * slaveTryPartialResynchronization() will at least try to use PSYNC

  142. * to start a full resynchronization so that we get the master run id

  143. * and the global offset, to try a partial resync at the next

  144. * reconnection attempt. */

  145. if (server.repl_state == REDIS_REPL_SEND_PSYNC) {

  146. if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {

  147. err = sdsnew("Write error sending the PSYNC command.");

  148. goto write_error;

  149. }

  150. server.repl_state = REDIS_REPL_RECEIVE_PSYNC;

  151. return;

  152. }

  154. /* If reached this point, we should be in REDIS_REPL_RECEIVE_PSYNC. */

  155. if (server.repl_state != REDIS_REPL_RECEIVE_PSYNC) {

  156. redisLog(REDIS_WARNING,"syncWithMaster(): state machine error, "

  157. "state should be RECEIVE_PSYNC but is %d",

  158. server.repl_state);

  159. goto error;

  160. }

  162. psync_result = slaveTryPartialResynchronization(fd,1);

  163. if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

  165. /* Note: if PSYNC does not return WAIT_REPLY, it will take care of

  166. * uninstalling the read handler from the file descriptor. */

  168. if (psync_result == PSYNC_CONTINUE) {

  169. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");

  170. return;

  171. }

  173. /* PSYNC failed or is not supported: we want our slaves to resync with us

  174. * as well, if we have any (chained replication case). The mater may

  175. * transfer us an entirely different data set and we have no way to

  176. * incrementally feed our slaves after that. */

  177. disconnectSlaves(); /* Force our slaves to resync with us as well. */

  178. freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

  180. /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC

  181. * and the server.repl_master_runid and repl_master_initial_offset are

  182. * already populated. */

  183. if (psync_result == PSYNC_NOT_SUPPORTED) {

  184. redisLog(REDIS_NOTICE,"Retrying with SYNC...");

  185. if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {

  186. redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",

  187. strerror(errno));

  188. goto error;

  189. }

  190. }

  192. /* Prepare a suitable temp file for bulk transfer */

  193. while(maxtries--) {

  194. snprintf(tmpfile,256,

  195. "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());

  196. dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);

  197. if (dfd != -1) break;

  198. sleep(1);

  199. }

  200. if (dfd == -1) {

  201. redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));

  202. goto error;

  203. }

  205. /* Setup the non blocking download of the bulk file. */

  206. if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)

  207. == AE_ERR)

  208. {

  209. redisLog(REDIS_WARNING,

  210. "Can't create readable event for SYNC: %s (fd=%d)",

  211. strerror(errno),fd);

  212. goto error;

  213. }

  215. server.repl_state = REDIS_REPL_TRANSFER;

  216. server.repl_transfer_size = -1;

  217. server.repl_transfer_read = 0;

  218. server.repl_transfer_last_fsync_off = 0;

  219. server.repl_transfer_fd = dfd;

  220. server.repl_transfer_lastio = server.unixtime;

  221. server.repl_transfer_tmpfile = zstrdup(tmpfile);

  222. return;

  224. error:

  225. aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);

  226. close(fd);

  227. server.repl_transfer_s = -1;

  228. server.repl_state = REDIS_REPL_CONNECT;

  229. return;

  231. write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */

  232. redisLog(REDIS_WARNING,"Sending command to master in replication handshake: %s", err);

  233. sdsfree(err);

  234. goto error;

  235. }




         首先檢查當前主從複製狀態server.repl_state是否為REDIS_REPL_NONE,若是,則說明握手過程期間,從節點收到了客戶端執行的"slave  no  one"命令,因此直接關閉socket描述符,然後返回;





         如果當前的複製狀態為REDIS_REPL_RECEIVE_PONG,則說明從節點收到了主節點對於"PING"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆資訊,正常情況下,主節點的回覆只能有三種情況:"+PONG","-NOAUTH"和"-ERR operation not permitted"(老版本的redis主節點),如果收到的回覆不是以上的三種,則直接進入錯誤處理程式碼流程。否則,將複製狀態置為REDIS_REPL_SEND_AUTH(不返回);





         如果當前複製狀態為REDIS_REPL_SEND_PORT,則向主節點發送"REPLCONF listening-port  <port>"命令,告知主節點本身的埠號,然後將複製狀態置為REDIS_REPL_RECEIVE_PORT後返回;


         如果當前的複製狀態為REDIS_REPL_RECEIVE_PORT,說明從節點收到了主節點對於"REPLCONF listening-port"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆,如果回覆資訊的首位元組為"-",說明主節點不認識該命令,這不是致命錯誤,只是記錄日誌而已;然後將複製狀態設定為REDIS_REPL_SEND_CAPA(不返回);

         如果當前的複製狀態為REDIS_REPL_SEND_CAPA,則向主節點發送"REPLCONF capa  eof"命令,告知主節點本身的"能力",然後將複製狀態置為REDIS_REPL_RECEIVE_CAPA後返回;


         如果當前的複製狀態為REDIS_REPL_RECEIVE_CAPA,說明從節點收到了主節點對於"REPLCONF capa eof"命令的回覆,觸發了描述符的可讀事件,從而呼叫的該回調函式。這種情況下,首先讀取主節點的回覆,如果回覆資訊的首位元組為"-",說明主節點不認識該命令,這不是致命錯誤,只是記錄日誌,然後將複製狀態設定為REDIS_REPL_SEND_PSYNC(不返回);

         如果複製狀態為REDIS_REPL_SEND_PSYNC,則呼叫slaveTryPartialResynchronization函式,向主節點發送"PSYNC  <psync_runid>  <psync_offset>"命令。


















  1. server.repl_state = REDIS_REPL_TRANSFER;

  2. server.repl_transfer_size = -1;

  3. server.repl_transfer_read = 0;

  4. server.repl_transfer_last_fsync_off = 0;

  5. server.repl_transfer_fd = dfd;

  6. server.repl_transfer_lastio = server.unixtime;

  7. server.repl_transfer_tmpfile = zstrdup(tmpfile);


















  1. #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */

  2. void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {

  3. char buf[4096];

  4. ssize_t nread, readlen;

  5. off_t left;


  7. REDIS_NOTUSED(privdata);

  8. REDIS_NOTUSED(mask);

  10. /* Static vars used to hold the EOF mark, and the last bytes received

  11. * form the server: when they match, we reached the end of the transfer. */

  12. static char eofmark[REDIS_RUN_ID_SIZE];

  13. static char lastbytes[REDIS_RUN_ID_SIZE];

  14. static int usemark = 0;

  16. /* If repl_transfer_size == -1 we still have to read the bulk length

  17. * from the master reply. */

  18. if (server.repl_transfer_size == -1) {

  19. if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {

  20. redisLog(REDIS_WARNING,

  21. "I/O error reading bulk count from MASTER: %s",

  22. strerror(errno));

  23. goto error;

  24. }

  26. if (buf[0] == '-') {

  27. redisLog(REDIS_WARNING,

  28. "MASTER aborted replication with an error: %s",

  29. buf+1);

  30. goto error;

  31. } else if (buf[0] == '\0') {

  32. /* At this stage just a newline works as a PING in order to take

  33. * the connection live. So we refresh our last interaction

  34. * timestamp. */

  35. server.repl_transfer_lastio = server.unixtime;

  36. return;

  37. } else if (buf[0] != '$') {

  38. redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);

  39. goto error;

  40. }

  42. /* There are two possible forms for the bulk payload. One is the

  43. * usual $<count> bulk format. The other is used for diskless transfers

  44. * when the master does not know beforehand the size of the file to

  45. * transfer. In the latter case, the following format is used:

  46. *

  47. * $EOF:<40 bytes delimiter>

  48. *

  49. * At the end of the file the announced delimiter is transmitted. The

  50. * delimiter is long and random enough that the probability of a

  51. * collision with the actual file content can be ignored. */

  52. if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {

  53. usemark = 1;

  54. memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);

  55. memset(lastbytes,0,REDIS_RUN_ID_SIZE);

  56. /* Set any repl_transfer_size to avoid entering this code path

  57. * at the next call. */

  58. server.repl_transfer_size = 0;

  59. redisLog(REDIS_NOTICE,

  60. "MASTER <-> SLAVE sync: receiving streamed RDB from master");

  61. } else {

  62. usemark = 0;

  63. server.repl_transfer_size = strtol(buf+1,NULL,10);

  64. redisLog(REDIS_NOTICE,

  65. "MASTER <-> SLAVE sync: receiving %lld bytes from master",

  66. (long long) server.repl_transfer_size);

  67. }

  68. return;

  69. }

  71. /* Read bulk data */

  72. if (usemark) {

  73. readlen = sizeof(buf);

  74. } else {

  75. left = server.repl_transfer_size - server.repl_transfer_read;

  76. readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);

  77. }

  79. nread = read(fd,buf,readlen);

  80. if (nread <= 0) {

  81. redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",

  82. (nread == -1) ? strerror(errno) : "connection lost");

  83. replicationAbortSyncTransfer();

  84. return;

  85. }

  86. server.stat_net_input_bytes += nread;

  88. /* When a mark is used, we want to detect EOF asap in order to avoid

  89. * writing the EOF mark into the file... */

  90. int eof_reached = 0;

  92. if (usemark) {

  93. /* Update the last bytes array, and check if it matches our delimiter.*/

  94. if (nread >= REDIS_RUN_ID_SIZE) {

  95. memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);

  96. } else {

  97. int rem = REDIS_RUN_ID_SIZE-nread;

  98. memmove(lastbytes,lastbytes+nread,rem);

  99. memcpy(lastbytes+rem,buf,nread);

  100. }

  101. if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;

  102. }

  104. server.repl_transfer_lastio = server.unixtime;

  105. if (write(server.repl_transfer_fd,buf,nread) != nread) {

  106. redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));

  107. goto error;

  108. }

  109. server.repl_transfer_read += nread;

  111. /* Delete the last 40 bytes from the file if we reached EOF. */

  112. if (usemark && eof_reached) {

  113. if (ftruncate(server.repl_transfer_fd,

  114. server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)

  115. {

  116. redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));

  117. goto error;

  118. }

  119. }

  121. /* Sync data on disk from time to time, otherwise at the end of the transfer

  122. * we may suffer a big delay as the memory buffers are copied into the

  123. * actual disk. */

  124. if (server.repl_transfer_read >=

  125. server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)

  126. {

  127. off_t sync_size = server.repl_transfer_read -

  128. server.repl_transfer_last_fsync_off;

  129. rdb_fsync_range(server.repl_transfer_fd,

  130. server.repl_transfer_last_fsync_off, sync_size);

  131. server.repl_transfer_last_fsync_off += sync_size;

  132. }

  134. /* Check if the transfer is now complete */

  135. if (!usemark) {

  136. if (server.repl_transfer_read == server.repl_transfer_size)

  137. eof_reached = 1;

  138. }

  140. if (eof_reached) {

  141. if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {

  142. redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));

  143. replicationAbortSyncTransfer();

  144. return;

  145. }

  146. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");

  147. signalFlushedDb(-1);

  148. emptyDb(replicationEmptyDbCallback);

  149. /* Before loading the DB into memory we need to delete the readable

  150. * handler, otherwise it will get called recursively since

  151. * rdbLoad() will call the event loop to process events from time to

  152. * time for non blocking loading. */

  153. aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);

  154. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");

  155. if (rdbLoad(server.rdb_filename) != REDIS_OK) {

  156. redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");

  157. replicationAbortSyncTransfer();

  158. return;

  159. }

  160. /* Final setup of the connected slave <- master link */

  161. zfree(server.repl_transfer_tmpfile);

  162. close(server.repl_transfer_fd);

  163. server.master = createClient(server.repl_transfer_s);

  164. server.master->flags |= REDIS_MASTER;

  165. server.master->authenticated = 1;

  166. server.repl_state = REDIS_REPL_CONNECTED;

  167. server.master->reploff = server.repl_master_initial_offset;

  168. memcpy(server.master->replrunid, server.repl_master_runid,

  169. sizeof(server.repl_master_runid));

  170. /* If master offset is set to -1, this master is old and is not

  171. * PSYNC capable, so we flag it accordingly. */

  172. if (server.master->reploff == -1)

  173. server.master->flags |= REDIS_PRE_PSYNC;

  174. redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");

  175. /* Restart the AOF subsystem now that we finished the sync. This

  176. * will trigger an AOF rewrite, and when done will start appending

  177. * to the new file. */

  178. if (server.aof_state != REDIS_AOF_OFF) {

  179. int retry = 10;

  181. stopAppendOnly();

  182. while (retry-- && startAppendOnly() == REDIS_ERR) {

  183. redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");

  184. sleep(1);

  185. }

  186. if (!retry) {

  187. redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");

  188. exit(1);

  189. }

  190. }

  191. }

  193. return;

  195. error:

  196. replicationAbortSyncTransfer();

  197. return;

  198. }















         每當讀取了8M的資料後,都執行一次sync操作,保證臨時檔案內容確實寫到了硬碟;         如果是有硬碟複製的RDB資料,且server.repl_transfer_read等於server.repl_transfer_size,則說明已經接收到所有資料,置eof_reached為1;
















         在讀取客戶端命令的函式readQueryFromClient中,一旦從節點讀到了追節點發來的同步命令,會將命令長度增加到從節點的複製偏移量server.master. reploff中:

  1. if (nread) {

  2. sdsIncrLen(c->querybuf,nread);

  3. c->lastinteraction = server.unixtime;

  4. if (c->flags & REDIS_MASTER) c->reploff += nread;

  5. server.stat_net_input_bytes += nread;

  6. }

         這樣,從節點的複製偏移量server.master. reploff就能與主節點保持一致了。




  1. int prepareClientToWrite(redisClient *c) {

  2. ...

  3. /* Masters don't receive replies, unless REDIS_MASTER_FORCE_REPLY flag

  4. * is set. */

  5. if ((c->flags & REDIS_MASTER) &&

  6. !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR;

  7. ...

  8. }






