1、前言

本文介紹了Redis複製的主要流程和設計思想。通過本文的閱讀,您大致能理解複製在軟體架構方面的通用思想。在閱讀本文之前,希望讀者首先對Redis有一定的認識,對Redis的事件型別、和事件處理器有個基本的瞭解。因為本文主要講複製的流程,所以很多額外的知識點只是一筆帶過、想要更多的瞭解,自行參考網上資料。話不多說、進入主題。

2、複製的主要流程

在redis複製的過程中,參與者主要就是redis的主從架構。複製是從一方複製資料到另一方,所以兩臺Redis機器是必不可少的參與物件。一臺主機、一臺從機!參考Redis複製的主要流程,我將它分為以下幾個小模組來分析。

  • 配置階段
  • 握手、探測階段
  • 同步階段
  • 命令傳播階段

Redis使用狀態機的策略來把以上流程給串接起來。即在每個階段都配置一個狀態碼、及每個狀態碼下執行的程式碼流程!

2.1 配置階段

主從機是通過TCP協議來進行資料傳輸。所以它們首先就要建立一個安全的連結通道,以便可以通訊!那麼我們就要在從機啟動的時候配置個,它要向誰要資料,認哪個主機為自己的Master! 配置有以下幾種方法

1、 通過配置檔案配置

在Redis.conf時新增要建立連結的主機資訊、

echo slaveof masterIp masterPort >> redis.conf

2、通過客戶端

我們可以通過終端連結到從機

//連結到從機
redis-cli -p <從機port>
//執行
slaveof masterIp masterPort

3、通過啟動時指定引數

也可以在啟動從機的時候帶上指定引數

redis-server redis.conf  --slaveof masterIp masterPort

那麼以上三種方法都可以讓當前啟動的從機儲存既然要連結到主機的地址、和埠號!這三種方法有一定的區別,通過配置檔案儲存的啟動方式比較靠普一些。當配置好主機資訊後,那麼接下來就要連結到主機!

經過以上三種方式的配置,狀態機裡的狀態碼配置成REPL_STATE_CONNECT

/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
//其他程式碼..... //配置狀態機為 REPL_STATE_CONNECT
server.repl_state = REPL_STATE_CONNECT;
}

2.2 握手、探測階段

2.2.1 連結Master

上面說到從機配置好了主機的地址和埠,那麼如何觸發連結呢?這就是Redis的時間事件函式serverCron, 它做了很多事情。其中它做了一件事就是:維護主從機資料同步。

/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
/* 1000ms執行一次 replicationCron這個函式 */
run_with_period(1000) replicationCron();

這個replicationCron函式會去檢測狀態機的狀態碼、上回我們的狀態碼是REPL_STATE_CONNECT

void replicationCron(void) {

  	//.....

  /* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
} //....
}

小學英文水準也能看的懂是吧、檢測是否去連結Master!!!判斷條件很簡單、就是那個狀態碼!!接下來看下connectWithMaster連結主機的函式

int connectWithMaster(void) {
int fd;
//採用了NonBlock的方式 可以參考《UNIX網路程式設計》-卷1(16節)的非阻塞I/O部分
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
//檔案事件、大致思想參考《UNIX網路程式設計》-卷1(6節)I/O複用
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
} server.repl_transfer_lastio = server.unixtime;
//儲存連結套節字
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}

這就和Master建立TCP連結、使得狀態變成REPL_STATE_CONNECTING模式。

2.2.2 相互認證資訊、檢測同步環境

當我們從機連結到主機後、也不是立馬進行資料傳送,進行同步。它和那一樣、也要做足了前戲!過程相當的多,但都很簡單!沒有什麼重點可講的,我們大致過下

  • 從機給主機發送Ping,來探測網路狀況、網路狀態不好的情況下,重新建立連結!這步有點那個味道,先互相瞭解認識下、牽個手啥的!如果特殊時期,對不起您!
  • 然後身份驗證、就是身份識別,總不能什麼人來連結我 我都要給你同步吧!
  • 傳送當前從機的IP資訊、及監聽的埠號啥的、這步不知道啥用?
  • 探測支援的同步協議型別、和支援同步能力(EOF/PSYNC2/CAPA)。Redis在初期的時候,只支援全量的同步,就是你只要來,我都給你!經過後期作者的優化又支援、部分同步(即,同步過的資料不會再同步給你)。這步呢,就是來看看到底支援哪種同步協議型別的、以方便後續操作。

 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
//讀取回復
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}

經過上面那一系列的"互相認識"階段,最終讓狀態變成REPL_STATE_SEND_PSYNC

2.3 同步階段

那麼到這裡就可以真正的同步資料了,萬事具備了!上回說狀態到了REPL_STATE_SEND_PSYNC,且看原始碼:

if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}

主要函式slaveTryPartialResynchronization 小學水平翻譯下:“從機嘗試部分同步”。這裡為為什麼要嘗試部分同步呢?之前咱們說到:Redis早期的版本不支援部分同步,後來才支援的。函式名我估計是:如果當前這臺機器同步過資料,那麼走部分同步,如果沒有就走全部同步,所以起了個slaveTryPartialResynchronization 這也是我的猜想啊、看原始碼很累,有時候猜也能幫助你順著往下看,如果你每個函式都看一下,會累死的!猜函式的大致用法也是看原始碼的方法之一!

既然來了還是帶看下原始碼吧!

/**
* fd :連結套節字,你就認為中 socekt裡 socket_accpet 那返回的玩意兒,用於相互同信的!
* read_reply: 這個函式分為兩個部分、用這個值來區分,通俗了說也就是、傳遞1幹什麼、傳遞0幹什麼、
* 我們看下原始碼裡
*/
int slaveTryPartialResynchronization(int fd, int read_reply) { /* Writing half */
if (!read_reply) { /*
* 當不可讀的時候,取就是寫的時候,即往fd裡寫資料。即向對方傳送資料!
*/ return 狀態常量
} /* Reading half */
/**
* 上面是傳送資料,下面就是讀取資料的原始碼咯!
*/ }

很顯示、上面這函式分了兩大塊,由read_reply引數來決定,是傳送資料,還是讀取資料!就兩件事

0:傳送資料

1:讀取資料

接下來看一張圖吧、單獨用文字來解釋有點繞

上面這張圖大致的來表示了一個乾淨的從機,第一次向主機同步資料的過程,下面解釋下這張圖

  • 向傳送指令 psync ? -1

我們還是來看下原始碼、回到 slaveTryPartialResynchronization函式:

int slaveTryPartialResynchronization(int fd, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply; /* Writing half */
if (!read_reply) { server.master_initial_offset = -1;
//如果有主機的資料、
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
//沒有情況
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
//傳送 PSYNC 指令
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
} // 讀的部分...省略
}

PSYNC指令有兩個引數、

  • psync_replid
  • psync_offset

從上面的邏輯可以看出來、當有同步過的時候,psync_replidpsync_offset會取出相對就的值、如果沒有則用"?"和“-1”來給值。通常情況下,我們是一個新機器,所以沒有同步過主機資訊,即cached_masterfalse所以:

// psync_replid = ?
// psync_offset = -1 sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC", "?", -1, NULL);

接下來把狀態變更為PSYNC_WAIT_REPLY等待主機的回覆!

主機在接收到從機發來的PSYNC命令時大致的流程是會去fock一個子程序出來做bgSave的事情、有關於Redis持久化的過程不在本文描述、可以自尋資料觀看。當主機接收到PSYNC指令的時候,解析指令,我們轉到主機視角看如何解析!我們主要分析下關鍵程式碼~

void syncCommand(client *c) {

 		 // 一系列的判斷程式碼、略過

 		//因為我們是第一次同步、所以 嘗試部分同步會失敗、 走到下面的 else 裡  stat_sync_partial_err++

    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
} //以主機的視角來看的話,這裡很多程式碼是做一些 資料儲存、主要把從機的資訊儲存下來、 /* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */ //標識當前這個從機的同步狀態、標識從機為CLIENT_SLAVE身份、加入從機列表、
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;//這個打個flag ,下面將會用到。先標識狀態
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c); //這裡建立一個複製積壓緩衝區,用於部分同步,稍後講到、這裡打個flag
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
} //下面有三個case 因為我們是第一次請求主機同步。所以沒有任務bgsave progress(這裡假設,方便我們閱讀程式碼,和順應場景) /* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (server.aof_child_pid == -1) {
//開始為同步進行Bgsave操作
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
return; }

如果都沒出錯的知識(主機預設支援無磁化同步),那麼開始startBgsaveForReplication 再接著往下看

int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
//開始bgsave為同步準備, socket_target 為複製到socket還是磁碟的判斷
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
//準備RDB檔案 開始
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
//儲存到socket
retval = rdbSaveToSlavesSockets(rsiptr);
else
///儲存到disk
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}
//準備RDB檔案 結束 //如果錯誤 那麼採取的應對方法、找到等待同步的從機,
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//找到等待同步的從機,
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
//刪除節點、看樣子從列表中刪除當前這個從機
listDelNode(server.slaves,ln);
//向從機發送日誌、
addReplyError(slave,
"BGSAVE failed, replication can't continue");
//標識為close狀態
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
} /* If the target is socket, rdbSaveToSlavesSockets() already setup
* the salves for a full resync. Otherwise for disk target do it now.*/
//走到這裡、socket_tartget = false
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//找在等待同步開始的從機、這個狀態在上面設定過的,
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
} /* Flush the script cache, since we need that slave differences are
* accumulated without requiring slaves to match our cached scripts. */
if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}

經過上面很流程,終於走到了函式replicationSetupSlaveForFullResync、這裡再次提醒下大家在看原始碼的時候,不要多看,順著主流程往下看,每個函式的分支比較多,看多了容易看不回來。切記!

int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
//這個函式很簡單了、配置從機和複製偏移量、配置從機的複製狀態、 slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
//寫入資料、 FULLRESYNC replid offset
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}

好了、到這裡從機發送 psync ? -1的流程就完整了,最後主機把自己的replidoffset傳送給了從機!至於replidoffset的作用和含意我們下文說到! 原始碼鏈路不算太長,沿著主線看就行,拋開那些不是很重要的程式碼!

接著我們回到從機的視角

因為有資料回來,Redis的檔案事件會自動觸發syncWithMaster回到slaveTryPartialResynchronization函式、引數是1

psync_result = slaveTryPartialResynchronization(fd,1);

if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
return;
}
int slaveTryPartialResynchronization(int fd, int read_reply) {

  /* Reading half */
//讀取資料
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
} aeDeleteFileEvent(server.el,fd,AE_READABLE);
//如果是FULLRESYNC
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
//記錄返回來的replid 和 offset
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
//清空已經存在的主機資訊
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
}

從機讀取到了從主機發來的FULLRESYNC資訊後、儲存了一些返回來的資訊、接下來回到syncWithMaster下面的程式碼

    /* Prepare a suitable temp file for bulk transfer */
//準備一個臨時檔案、來放主機傳遞過來的RDB檔案、
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
} /* Setup the non blocking download of the bulk file. */
//監聽事件、回撥函式 readSyncBulkPayload、 用來接收RDB檔案!!
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
//配置狀態為等著接收RDB、初始一些初始化資料
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;

到這裡、syncWithMaster這個函式就結束了最終把狀態機變更為REPL_STATE_TRANSFER,配置回撥函式為readSyncBulkPayload來處理RDB檔案!!!

readSyncBulkPayload這個函式就不分析了,太長了、主要就是把接收到RDB檔案寫到臨時檔案、清空資料、然後載入到資料庫、釋放各種資源!

當主機和從機建立連結後,其實就可以正常的複製資料了,當主機準備RDB的時候,也會有正常的命令打進來,這時候因為從機狀態是等待同步,所以這些命令會被打入到快取區,等RDB檔案同步完,主機會把緩衝區的資料打到從機,更新資料、這部分就不說了。到這裡,同步就算完成了!!!

2.5 命令傳播

結過上面那些流程,總算能讓主從機達到資料的一致性、但是我們伺服器是一直執行的,所以我們需要把主機的命令及時的同步到從機上面、但總不是每次都是同步RDB檔案、那代價也太大了!

一個需求產生就有一個相對應的應對方法!命令傳播程式!主機在接收到資料的,經過命令傳播程式會把資料傳送給自己的小從機們、有達到資料的一致性!因為考慮到Redis的高效性,命令傳播是非同步進行的,所以在資料一致性上還是有點差異的,魚和熊掌不可兼得。作者也做了很多的彌補工作、後面再說!關於命令傳播的原始碼就不放出來了replicationFeedSlaves可以自行觀看、就是把資料寫到從機的 replcation buffer裡。同時也寫到backLog buffer(下文說)

自此主從整個複製流程已經結束!主機機器已經能夠正常的同步資料了!

等一等!!

從機也有斷網斷電的時候啊、不能我再次連結上來的時候又準備RDB檔案吧。所以作者又再一次進行了優化,我也支援你批量同步!

2.6 部分同步

部分同步的三個點~

2.6.1 runId 執行ID

這個很容易理解,圖上已經說明、不再描述。

2.6.2 offset 複製偏移量

2.6.3 backlogbuffer 複製積壓緩衝區

那麼通過下面這個圖來理解下offsetbacklogbuffer。在命令傳播階段、主機不僅把命令傳遞到從機、還把接收到命令按位元組數寫到backlogbuffer區,就是為了怕從機沒有接收到傳遞的資料,備份一下! 每次接收到新資料,主機和從機都會更新自己的 offset值,以達到兩邊保持一致!

首先backlogbuffer什麼時候是建立的呢?

void syncCommand(client *c) {
//主機在解析從機發來 PSYNC 命令時、
//當有一個從機的時候、並且 back_log為null的時候
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
//建立 backlogBuffer
createReplicationBacklog();
}
} //建立複製積壓緩衝區
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
//配置檔案配置大小、defult 1M
server.repl_backlog = zmalloc(server.repl_backlog_size);
//實際資料長度
server.repl_backlog_histlen = 0;
//下一次寫入命令的的位置
server.repl_backlog_idx = 0; /* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
}

即當有一個從機連結到主機的時候,並且傳送PSYNC的時候。並且是所有從機共享的一個數據構建。

那麼有了這三個要素,再來看下,如果斷網了從機是怎麼批量同步的、注意這裡是斷網即斷開了和主機的連結、但是資料已然還在!

當從機再次連結上來的時候、發現自己是有主機的資訊的、所以傳送命令帶上runIdoffset 上面的程式碼也是有的!可以回頭看下從機發送PSYNC命令的那個程式碼!

那麼主機是如何判斷支援部分同步的呢?

回到syncCommand

void syncCommand(client *c) {

  //其他程式碼...

  if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 在這裡、這個函式看函式名!
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} //其他程式碼...
} int masterTryPartialResynchronization(client *c) {
//其他程式碼... /* We still have the data our slave is asking for? */
//這個翻譯就是 。我們有從機要的資料!。。。。意思就是我有備份、不用生成RDB!
//backlogBuffer不存在
//或者、傳遞過來的偏移量 < 當前主機的偏移量 (意思是主機跑的太快了、覆蓋了一些資料)
//或者是當前從機比主機跑的還資料還要多
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
//否則就走全量同步
goto need_full_resync;
} //其他程式碼...
//傳送在緩衝區的資料
psync_len = addReplyReplicationBacklog(c,psync_offset);
}

我們只看重點程式碼、一些其他干擾的就去掉了!上面程式碼可以看了,只在資料還在複製積壓緩衝區就不用走全量同步!從機等著接收資料更新Offset即可。

上面修復了斷網的情況下~現在又有新情況下!

2.7 特殊情況下的批量同步

2.7.1 從機重啟

也沒關係~後來作者也針對這種情況做了優化!看圖

當從機關機重新的時候、會把當前同步的資訊儲存到RDB檔案中、持久化到磁碟中。等下次重新啟動的時候,再給拿回去!這樣保證了cached_master資料不會丟失。

2.7.2 換主的情況下

當主機變為從機的時候、從機(6380)會做了一件下面有意義的事情即”把原來的repl_id 和 offset 儲存到 備份、到replid2和scond_offset“

當之前下線的主機(6379)上線後,變為(6380)的從機。當6379再向6380請求同步資料的時候,帶上自己的(原來當主機時候的資料)repliid和offset,再看是主機是如果支援指同步的.再回到函式masterTryPartialResynchronization

void masterTryPartialResynchronization(client *c) {

  	//其他程式碼

   /* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
//我們說了,當6380被選為主機的時候,備份了原來的 replid 和 offset! 這一點至關重要
if (strcasecmp(master_replid, server.replid) && // 因為這裡支援了對 replid2的判斷,即我還記得你,你之前是我爸爸,但是,你不能比我跑的快、
//不然不好意思、去全量同步我的!
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
} }

所以通過以上程式碼可以得知,從機變主機,為了不忘記原來的主機,保留主機的資訊!以便下次來的時候,還能認得你!

所以到這裡同步就真的完了!!沒有然後了!從上看下來,主機同步做的工作真的很多,每一步走錯了都是致命的。本次分析的只是部分程式碼,還有60%的程式碼沒有分析到!最後還有一個心跳機制要說下!

3 心跳探測

在命令傳播階段呢,從伺服器會以1秒的頻率向主機發送 Replication ACK <offset>



作者設計這個目的也是會了增強主從之間的資料一致性、Redis被稱為高可用、高效能的伺服器,那麼對它的加強措施是一點兒也不能鬆懈!換名話說:如果你能和我一塊友好的工作,那麼就OK,否則Kill掉你,如果你的網路狀態不好,那麼不好意思,主機也拒絕寫命令!那麼這樣看來:資料的一致性在這塊設計還是贏了一回!

4 總結

就總結兩個點吧、其他還有很多,網上都能找到。那麼通過本文的學習我們可以一起考慮以下兩點。

  • 資料的丟失
  • 資料的一致性

4.1 資料丟失

Reids盡最大可能保持資料不要丟失。比如:持久化。但在我們剛才講的換主的情況下 、如果主機執行一個數據,因為命令傳播民非同步的,那麼就有可能失敗!如果從機真的失敗了,剛好主機又下線了!當失敗的從機被選為主機,下線的主機又被配置為從機,那麼在同步的時候剛才那條命令就會丟失!因為Reids在保持資料的一致性!

所以Redis最好只能用來做快取,不要當作真的資料庫來用

4.2資料不一致性

從上面可以看出來,資料同步是非同步的,所以就有可能讀寫不一致!

那麼避免這種情況,網路要好、機器要好、同時Redis的的配置項也能配置的6

本文參考資料redis原始碼5.0.9