1. 程式人生 > >Redis(九):主從複製的設計與實現解析

Redis(九):主從複製的設計與實現解析

  前面幾篇我們已經完全理解了redis的基本功能的實現了。

  但單靠基本功能實現,往往還是稱不上優秀的專案的。畢竟,我們現在面對的都是複雜的環境,高併發的場景,大資料量的可能。

  簡而言之,現在的系統一般都需要支援分散式部署,不存在單點問題,才算是一個合格的系統。

  而redis作為一個儲存系統,單點問題肯定是不行的。

  最簡單的,就是起碼得支援讀寫分離功能,因為我們面臨的許多問題,一般是面對大量的查詢問題。而要做到讀寫分離功能,就是要把主節點的資料同步到從節點上。從而可以讓從節點接受讀請求,以減輕主節點的讀壓力。

  就讓我們來分析下 Redis 是如何進行主從同步資料的吧!主從同步,換個名稱也就是資料複製。

 

0. 主從複製的作用

  資料冗餘:主從複製實現了資料的熱備份,是持久化之外的一種資料冗餘方式。

  故障恢復:當主節點出現問題時,可以由從節點提供服務,實現快速的故障恢復;實際上是一種服務的冗餘。

  負載均衡:在主從複製的基礎上,配合讀寫分離,可以由主節點提供寫服務,由從節點提供讀服務(即寫Redis資料時應用連線主節點,讀Redis資料時應用連線從節點),分擔伺服器負載;尤其是在寫少讀多的場景下,通過多個從節點分擔讀負載,可以大大提高Redis伺服器的併發量。

  讀寫分離:可以用於實現讀寫分離,主庫寫、從庫讀,讀寫分離不僅可以提高伺服器的負載能力,同時可根據需求的變化,改變從庫的數量;

  高可用基石:除了上述作用以外,主從複製還是哨兵和叢集能夠實施的基礎,因此說主從複製是Redis高可用的基礎。

 

1. Redis 主從複製簡介

  在主從複製中,資料庫分為兩類,一類是主庫(master),另一類是同步主庫資料的從庫(slave)。主庫可以進行讀寫操作,當寫操作導致資料變化時會自動同步到從庫。而從庫一般是隻讀的(特定情況也可以寫,通過引數slave-read-only指定),並接受來自主庫的資料,一個主庫可擁有多個從庫,而一個從庫只能有一個主庫。這樣就使得redis的主從架構有了兩種模式:一類是一主多從如下圖1,二類是“鏈式主從複製”--主->從->主-從如下圖2。

  

 

 

 

2. Redis 主從複製的操作步驟簡略說明

  1. 首先,你得有至少2個redis server 例項,單機多例項或者多機多例項皆可。

  2. 配置主從關係,使用 slaveof master_host master_port; (config rewrite 可直接寫入配置檔案,避免每次都重新寫)

  3. 驗證主從配置,使用 info Replication; 

  上面的操作步驟是進行實時操作的,也可以直接將 master/slave 配置放到 redis.conf 中,啟動時直接載入。

  當master需要使用密碼進行訪問時,可以使用命令 masterauth 進行授權。

    masterauth 123456                # 寫到redis.conf配置檔案中
    config set masterauth 123456    # 通過命令列進行授權

 

3. 主要同步的實現原理

  主從複製大致流程為:

    1. slaveof 是我們的開啟方法,它會將master資訊寫入到從節點;
    2. 然後與master進行建立連線;
    3. 然後master決定複製方式是全量同步還是部分同步;
    4. master進行資料準備;
    5. 將需要同步的傳送給slave節點;
    6. 從節點執行傳送過來的資料;

  但是,我們需要進行深入理解。

3.1. slaveof 命令原始碼解析 

  slaveof 為我們操作開啟主從複製開啟了入口,其介面定義如下:

{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
// 用法 slaveof <master_host> <master_port>  建立主從關係
// slaveof no one 取消主從同步
// replication.c    
void slaveofCommand(client *c) {
    /* SLAVEOF is not allowed in cluster mode as replication is automatically
     * configured using the current address of the master node. */
    if (server.cluster_enabled) {
        addReplyError(c,"SLAVEOF not allowed in cluster mode.");
        return;
    }

    /* The special host/port combination "NO" "ONE" turns the instance
     * into a master. Otherwise the new master address is set. */
    // slaveof no one, 取消主從同步
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        if (server.masterhost) {
            // 取消當前的master關聯,返回客戶端目前狀態資訊,結束
            replicationUnsetMaster();
            sds client = catClientInfoString(sdsempty(),c);
            serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
                client);
            sdsfree(client);
        }
    } else {
        long port;

        if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
            return;

        /* Check if we are already attached to the specified slave */
        // 只能和一個 master 建立主從關係
        if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
            && server.masterport == port) {
            serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
            addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
            return;
        }
        /* There was no previous master or the user specified a different one,
         * we can continue. */
        // 設定master資訊
        replicationSetMaster(c->argv[1]->ptr, port);
        // 輸出client狀態資訊
        sds client = catClientInfoString(sdsempty(),c);
        serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
            server.masterhost, server.masterport, client);
        sdsfree(client);
    }
    addReply(c,shared.ok);
}
// 繫結新的master關聯
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsnew(ip);
    server.masterport = port;
    if (server.master) freeClient(server.master);
    // slave 不進行阻塞客戶端
    disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
    // 斷開所有 slave 連線
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    // cacheMaster 丟棄
    replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
    // 鏈式主從複製刪除
    freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
    // 斷開正在連線slave請求
    cancelReplicationHandshake();
    server.repl_state = REPL_STATE_CONNECT;
    server.master_repl_offset = 0;
    server.repl_down_since = 0;
}
// 取消master關聯
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
    if (server.masterhost == NULL) return; /* Nothing to do. */
    sdsfree(server.masterhost);
    server.masterhost = NULL;
    if (server.master) {
        if (listLength(server.slaves) == 0) {
            /* If this instance is turned into a master and there are no
             * slaves, it inherits the replication offset from the master.
             * Under certain conditions this makes replicas comparable by
             * replication offset to understand what is the most updated. */
            server.master_repl_offset = server.master->reploff;
            freeReplicationBacklog();
        }
        freeClient(server.master);
    }
    replicationDiscardCachedMaster();
    cancelReplicationHandshake();
    server.repl_state = REPL_STATE_NONE;
}

// blocked.c, 解除所有的阻塞客戶端
/* Mass-unblock clients because something changed in the instance that makes
 * blocking no longer safe. For example clients blocked in list operations
 * in an instance which turns from master to slave is unsafe, so this function
 * is called when a master turns into a slave.
 *
 * The semantics is to send an -UNBLOCKED error to the client, disconnecting
 * it at the same time. */
void disconnectAllBlockedClients(void) {
    listNode *ln;
    listIter li;

    listRewind(server.clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        if (c->flags & CLIENT_BLOCKED) {
            addReplySds(c,sdsnew(
                "-UNBLOCKED force unblock from blocking operation, "
                "instance state changed (master -> slave?)\r\n"));
            unblockClient(c);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        }
    }
}
// networking.c, 斷開所有的 slave 連線
/* Close all the slaves connections. This is useful in chained replication
 * when we resync with our own master and want to force all our slaves to
 * resync with us as well. */
void disconnectSlaves(void) {
    while (listLength(server.slaves)) {
        listNode *ln = listFirst(server.slaves);
        freeClient((client*)ln->value);
    }
}
// replication.c
/* Free a cached master, called when there are no longer the conditions for
 * a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
    if (server.cached_master == NULL) return;

    serverLog(LL_NOTICE,"Discarding previously cached master state.");
    server.cached_master->flags &= ~CLIENT_MASTER;
    freeClient(server.cached_master);
    server.cached_master = NULL;
}
// replication.c
void freeReplicationBacklog(void) {
    serverAssert(listLength(server.slaves) == 0);
    zfree(server.repl_backlog);
    server.repl_backlog = NULL;
}
// replication.c
/* This function aborts a non blocking replication attempt if there is one
 * in progress, by canceling the non-blocking connect attempt or
 * the initial bulk transfer.
 *
 * If there was a replication handshake in progress 1 is returned and
 * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
 *
 * Otherwise zero is returned and no operation is perforemd at all. */
int cancelReplicationHandshake(void) {
    if (server.repl_state == REPL_STATE_TRANSFER) {
        replicationAbortSyncTransfer();
        server.repl_state = REPL_STATE_CONNECT;
    } else if (server.repl_state == REPL_STATE_CONNECTING ||
               slaveIsInHandshakeState())
    {
        undoConnectWithMaster();
        server.repl_state = REPL_STATE_CONNECT;
    } else {
        return 0;
    }
    return 1;
}

// networking.c
/* Concatenate a string representing the state of a client in an human
 * readable format, into the sds string 's'. */
sds catClientInfoString(sds s, client *client) {
    char flags[16], events[3], *p;
    int emask;

    p = flags;
    if (client->flags & CLIENT_SLAVE) {
        if (client->flags & CLIENT_MONITOR)
            *p++ = 'O';
        else
            *p++ = 'S';
    }
    if (client->flags & CLIENT_MASTER) *p++ = 'M';
    if (client->flags & CLIENT_MULTI) *p++ = 'x';
    if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
    if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
    if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
    if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
    if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
    if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
    if (client->flags & CLIENT_READONLY) *p++ = 'r';
    if (p == flags) *p++ = 'N';
    *p++ = '\0';

    emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
    p = events;
    if (emask & AE_READABLE) *p++ = 'r';
    if (emask & AE_WRITABLE) *p++ = 'w';
    *p = '\0';
    // 可變引數定義: sds sdscatfmt(sds s, char const *fmt, ...) 
    return sdscatfmt(s,
        "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s",
        (unsigned long long) client->id,
        getClientPeerId(client),
        client->fd,
        client->name ? (char*)client->name->ptr : "",
        (long long)(server.unixtime - client->ctime),
        (long long)(server.unixtime - client->lastinteraction),
        flags,
        client->db->id,
        (int) dictSize(client->pubsub_channels),
        (int) listLength(client->pubsub_patterns),
        (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
        (unsigned long long) sdslen(client->querybuf),
        (unsigned long long) sdsavail(client->querybuf),
        (unsigned long long) client->bufpos,
        (unsigned long long) listLength(client->reply),
        (unsigned long long) getClientOutputBufferMemoryUsage(client),
        events,
        client->lastcmd ? client->lastcmd->name : "NULL");
}

  所以,slaveof 只是做簡單的驗證,然後設定了下 master 資訊,然後就返回了。那麼是誰在做同步的工作呢?

  其實同步任務是由 cron 任務執行的。

 

3.2. 如何執行同步任務?

  因為複製是比較耗效能的東西,如果和使用者執行緒共享處理過程的話,將可能引起併發效能的。所以,redis使用非同步 cron 任務的形式實現主從複製功能。

// server.c, 初始化server,註冊 cron 
void initServer(void) {
    ...
    /* Create out timers, that's our main way to process background
     * operations. */
    // 新增 serverCron 到 eventLoop 中,以便後續可以執行定時指令碼
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    ...
}

// ae.c, 新增時間事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}
    
// server.c, 主指令碼執行入口, 每1秒執行1次
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    // 主從複製,連線 master,我們的入口
    run_with_period(1000) replicationCron();
    ...
    server.cronloops++;
    return 1000/server.hz;
}

// 重點入口: replicationCron()
// replication.c, 主從複製定時指令碼
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
    static long long replication_cron_loops = 0;

    /* Non blocking connection timeout? */
    // 連線超時處理,取消重連
    if (server.masterhost &&
        (server.repl_state == REPL_STATE_CONNECTING ||
         slaveIsInHandshakeState()) &&
         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
        cancelReplicationHandshake();
    }

    /* Bulk transfer I/O timeout? */
    // 傳輸資料超時,取消重連
    if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
        (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
        cancelReplicationHandshake();
    }

    /* Timed out master when we are an already connected slave? */
    // slave 會話超時
    if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
        (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
    {
        serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
        freeClient(server.master);
    }

    /* Check if we should connect to a MASTER */
    // 3.2.1. 初次設定master時,一定會進行連線處理
    if (server.repl_state == REPL_STATE_CONNECT) {
        serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == C_OK) {
            serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }

    /* Send ACK to master from time to time.
     * Note that we do not send periodic acks to masters that don't
     * support PSYNC and replication offsets. */
    // 3.2.2. 每次定時任務執行,都會發生 ACK 給master
    if (server.masterhost && server.master &&
        !(server.master->flags & CLIENT_PRE_PSYNC))
        replicationSendAck();

    /* If we have attached slaves, PING them from time to time.
     * So slaves can implement an explicit timeout to masters, and will
     * be able to detect a link disconnection even if the TCP connection
     * will not actually go down. */
    listIter li;
    listNode *ln;
    robj *ping_argv[1];

    /* First, send PING according to ping_slave_period. */
    // 3.2.3. 傳送 PING 請求
    // 預設 repl_ping_slave_period: 10
    if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
        ping_argv[0] = createStringObject("PING",4);
        replicationFeedSlaves(server.slaves, server.slaveseldb,
            ping_argv, 1);
        decrRefCount(ping_argv[0]);
    }

    /* Second, send a newline to all the slaves in pre-synchronization
     * stage, that is, slaves waiting for the master to create the RDB file.
     * The newline will be ignored by the slave but will refresh the
     * last-io timer preventing a timeout. In this case we ignore the
     * ping period and refresh the connection once per second since certain
     * timeouts are set at a few seconds (example: PSYNC response). */
    // 3.2.4. 向以當前節點為master的slaves 傳送空行資料
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;

        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
            (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
             server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
        {
            if (write(slave->fd, "\n", 1) == -1) {
                /* Don't worry, it's just a ping. */
            }
        }
    }

    /* Disconnect timedout slaves. */
    // 斷開連線超時的 slaves
    if (listLength(server.slaves)) {
        listIter li;
        listNode *ln;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;

            if (slave->replstate != SLAVE_STATE_ONLINE) continue;
            if (slave->flags & CLIENT_PRE_PSYNC) continue;
            if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
            {
                serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
                    replicationGetSlaveName(slave));
                freeClient(slave);
            }
        }
    }

    /* If we have no attached slaves and there is a replication backlog
     * using memory, free it after some (configured) time. */
    // 如果沒有slave 跟隨當前節點,一段時間後將backlog 釋放掉
    if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
        server.repl_backlog)
    {
        time_t idle = server.unixtime - server.repl_no_slaves_since;

        if (idle > server.repl_backlog_time_limit) {
            freeReplicationBacklog();
            serverLog(LL_NOTICE,
                "Replication backlog freed after %d seconds "
                "without connected slaves.",
                (int) server.repl_backlog_time_limit);
        }
    }

    /* If AOF is disabled and we no longer have attached slaves, we can
     * free our Replication Script Cache as there is no need to propagate
     * EVALSHA at all. */
    if (listLength(server.slaves) == 0 &&
        server.aof_state == AOF_OFF &&
        listLength(server.repl_scriptcache_fifo) != 0)
    {
        replicationScriptCacheFlush();
    }

    /* If we are using diskless replication and there are slaves waiting
     * in WAIT_BGSAVE_START state, check if enough seconds elapsed and
     * start a BGSAVE.
     *
     * This code is also useful to trigger a BGSAVE if the diskless
     * replication was turned off with CONFIG SET, while there were already
     * slaves in WAIT_BGSAVE_START state. */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        time_t idle, max_idle = 0;
        int slaves_waiting = 0;
        int mincapa = -1;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            client *slave = ln->value;
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
                idle = server.unixtime - slave->lastinteraction;
                if (idle > max_idle) max_idle = idle;
                slaves_waiting++;
                mincapa = (mincapa == -1) ? slave->slave_capa :
                                            (mincapa & slave->slave_capa);
            }
        }
        // 3.2.5. 如果有等待同步的slave, 且等待時間超過 server.repl_diskless_sync_delay, 預設是: 5s
        if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
            /* Start a BGSAVE. Usually with socket target, or with disk target
             * if there was a recent socket -> disk config change. */
            startBgsaveForReplication(mincapa);
        }
    }

    /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
    // 重新整理本節點的 從健康節點 數量,以便在需要確保多少節點時才進行寫入的場景判定
    refreshGoodSlavesCount();
    replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}

  以上,就是整個主從複製的主體框架了。且以上程式碼包含了兩種角色的執行機制。1: master 的執行; 2. slave 的執行;

  slave 的執行過程如下:

    1. 從節點每秒執行一次定時任務;
    2. 當定時任務發現存在新的主節點後,會呼叫 connectWithMaster() 嘗試與master節點建立網路連線;
    3. 建立連線後,由 syncWithMaster() 進行處理後續同步事務;
    4. 各種連線超時釋放處理;

  master 的執行過程如下:

    1. 各種連線超時釋放處理;
    2. 定期進行 PING slave 操作;
    3. 向slave寫入一個空行,相當於ping操作與slave續租期;
    4. 清理連線超時的slaves, 如果一個slave也沒有, 則直接把backlog釋放掉;
    5. 如果未開啟磁碟持久化操作,且有等待同步的slaves, 則主動開啟一個 bgsave;

  從上面的框架中,可以說大部分時候都是在處理各種異常問題和續期問題,但是實際最重要的一個連線master操作卻只有一行程式碼。那麼slave連線master之後,是如何進行後續的同步的呢?好像這個定時任務的執行並沒有太大的作用呢!

 

3.3. 從節點如何處理同步操作?

  從節點是整個同步操作的操控者,整個同步可以說都是其主導的。從上一節的過程,我們可以看到,只有一個連線master的只剩,所以必定許多工作要這裡完成。

  實際上,slave連線到master的請求實現,基於 epoll 模型的非同步操作,所以,在主框架中,我們只看到一個連線操作。因為連線完成後的操作,是非同步執行的。

// replication.c, 連線請求到 master 節點
int connectWithMaster(void) {
    int fd;
    // 建立socket fd
    fd = anetTcpNonBlockBestEffortBindConnect(NULL,
        server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
    if (fd == -1) {
        serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return C_ERR;
    }
    // 使用epoll模型進行非同步連線
    // 連線成功後,由 syncWithMaster 進行事件處理
    // 關注 讀寫事件
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        serverLog(LL_WARNING,"Can't create readable event for SYNC");
        return C_ERR;
    }

    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    // 狀態變更,以便下次不會再進行連線
    server.repl_state = REPL_STATE_CONNECTING;
    return C_OK;
}
// anet.c, 建立一個非阻塞的socket連線
int anetTcpNonBlockBestEffortBindConnect(char *err, char *addr, int port,
                                         char *source_addr)
{
    // ANET_CONNECT_BE_BINDING 代表將進行重試儘可能建立連線
    return anetTcpGenericConnect(err,addr,port,source_addr,
            ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING);
}
// 與master連線成功後,由 syncWithMaster 進行處理後續事務
// replication.c
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err = NULL;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);

    /* If this event fired after the user turned the instance into a master
     * with SLAVEOF NO ONE we must just return ASAP. */
    if (server.repl_state == REPL_STATE_NONE) {
        close(fd);
        return;
    }

    /* Check for errors in the socket. */
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
        serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }

    /* Send a PING to check the master is able to reply without errors. */
    if (server.repl_state == REPL_STATE_CONNECTING) {
        serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REPL_STATE_RECEIVE_PONG;
        /* Send the PING, don't check for errors at all, we have the timeout
         * that will take care about this. */
        // 傳送一個 PING 出去,檢查 master 是否可以響應
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
        if (err) goto write_error;
        return;
    }

    /* Receive the PONG command. */
    if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
        // 同步讀取PING結果
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

        /* We accept only two replies as valid, a positive +PONG reply
         * (we just check for "+") or an authentication error.
         * Note that older versions of Redis replied with "operation not
         * permitted" instead of using a proper error code, so we test
         * both. */
        // 沒有許可權且提示不是請授權類的提示,則發生錯誤
        // 沒有呼叫 auth 前
        // -NOAUTH, 代表未授權, 可以進入下一步授權操作
        if (err[0] != '+' &&
            strncmp(err,"-NOAUTH",7) != 0 &&
            strncmp(err,"-ERR operation not permitted",28) != 0)
        {
            serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
            sdsfree(err);
            goto error;
        } else {
            serverLog(LL_NOTICE,
                "Master replied to PING, replication can continue...");
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_AUTH;
    }

    /* AUTH with the master if required. */
    // 需要輸入master密碼狀態
    if (server.repl_state == REPL_STATE_SEND_AUTH) {
        if (server.masterauth) 
            // 傳送授權命令
            // AUTH master_password
            err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
            if (err) goto write_error;
            server.repl_state = REPL_STATE_RECEIVE_AUTH;
            return;
        } else {
            server.repl_state = REPL_STATE_SEND_PORT;
        }
    }

    /* Receive AUTH reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
        // 授權響應,讀取結果
        // 授權成功響應 +OK, 其他授權失敗
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        if (err[0] == '-') {
            serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_PORT;
    }

    /* Set the slave port, so that Master's INFO command can list the
     * slave listening port correctly. */
    // 傳送埠號給master, 以便master可以列舉出所有slave的埠號
    if (server.repl_state == REPL_STATE_SEND_PORT) {
        sds port = sdsfromlonglong(server.port);
        // 傳送本節點的埠給 master
        // 命令: REPLCONF listening-port port 
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "listening-port",port, NULL);
        sdsfree(port);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_PORT;
        return;
    }

    /* Receive REPLCONF listening-port reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF listening-port. */
        // 忽略失敗情況,影響不大,只是個展示問題,且並非所有版本都支援該命令
        if (err[0] == '-') {
            serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                "REPLCONF listening-port: %s", err);
        }
        sdsfree(err);
        server.repl_state = REPL_STATE_SEND_CAPA;
    }

    /* Inform the master of our capabilities. While we currently send
     * just one capability, it is possible to chain new capabilities here
     * in the form of REPLCONF capa X capa Y capa Z ...
     * The master will ignore capabilities it does not understand. */
    if (server.repl_state == REPL_STATE_SEND_CAPA) {
        // 傳送命令: REPLCONF capa eof
        err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
                "capa","eof",NULL);
        if (err) goto write_error;
        sdsfree(err);
        server.repl_state = REPL_STATE_RECEIVE_CAPA;
        return;
    }

    /* Receive CAPA reply. */
    if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
        err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
        /* Ignore the error if any, not all the Redis versions support
         * REPLCONF capa. */
        if (err[0] == '-') {
            serverLog(LL_NOTICE,"(Non critical) Master does not understand "
                                  "REPLCONF capa: %s", err);
        }
        sdsfree(err);
        // 可以進行資料同步了 PSYNC
        server.repl_state = REPL_STATE_SEND_PSYNC;
    }

    /* Try a partial resynchonization. If we don't have a cached master
     * slaveTryPartialResynchronization() will at least try to use PSYNC
     * to start a full resynchronization so that we get the master run id
     * and the global offset, to try a partial resync at the next
     * reconnection attempt. */
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
        // 嘗試進行部分同步, 可能為 全量同步、部分同步、或者命令不支援
        // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED
        if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
            err = sdsnew("Write error sending the PSYNC command.");
            goto write_error;
        }
        server.repl_state = REPL_STATE_RECEIVE_PSYNC;
        return;
    }

    /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
    if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
        serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
                             "state should be RECEIVE_PSYNC but is %d",
                             server.repl_state);
        goto error;
    }
    // 讀取 PSYNC 結果
    // PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_FULLRESYNC, PSYNC_NOT_SUPPORTED    
    psync_result = slaveTryPartialResynchronization(fd,1);
    if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

    /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
     * uninstalling the read handler from the file descriptor. */

    if (psync_result == PSYNC_CONTINUE) {
        serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
        return;
    }

    /* PSYNC failed or is not supported: we want our slaves to resync with us
     * as well, if we have any (chained replication case). The mater may
     * transfer us an entirely different data set and we have no way to
     * incrementally feed our slaves after that. */
    // 不能使用 PSYNC 進行同步,斷開當前節點的 slaves
    // 不允許鏈式主從
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid and repl_master_initial_offset are
     * already populated. */
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        serverLog(LL_NOTICE,"Retrying with SYNC...");
        // 不支援 PSYNC, 降級為 SYNC
        if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }

    /* Prepare a suitable temp file for bulk transfer */
    // 準備從rdb檔案中讀取資料,最多重試5次(共5s)
    // 臨時檔名: temp-<1560888xxx>.<pid>.rdb 
    while(maxtries--) {
        snprintf(tmpfile,256,
            "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
        goto error;
    }

    /* Setup the non blocking download of the bulk file. */
    // 使用 epoll 模型進行非同步接收master傳送過來的rdb檔案
    // 由 readSyncBulkPayload 函式進行結果處理
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        serverLog(LL_WARNING,
            "Can't create readable event for SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }
    // 儲存同步狀態
    server.repl_state = REPL_STATE_TRANSFER;
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;

error:
    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REPL_STATE_CONNECT;
    return;

write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
    serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
    sdsfree(err);
    goto error;
}

  整個連線成功之後的處理過程還是比較繁雜的,主要邏輯就在 syncWithMaster,主要是在各個狀態之間的轉換,尤其頭疼,不過幸好都是流水式的一步步下來。

    1. REPL_STATE_CONNECTING: 待連線狀態. slave 傳送 PING命令進行主動連線, 然後將狀態置為 REPL_STATE_RECEIVE_PONG;
    2. REPL_STATE_RECEIVE_PONG: 待master響應狀態. slave同步等待結果(其實一般會立即獲取到,因為epoll已經準備好,才會呼叫此狀態),判斷是否PING正常後, 將狀態置為 REPL_STATE_SEND_AUTH;
    3. REPL_STATE_SEND_AUTH: 等待授權狀態. slave 傳送 auth passwd 給master後, 將狀態置為 REPL_STATE_RECEIVE_AUTH;
    4. REPL_STATE_RECEIVE_AUTH: 等待授權響應狀態. slave同步等待結果, 判斷授權通過後, 將狀態置為 REPL_STATE_SEND_PORT;
    5. REPL_STATE_SEND_PORT: 待發送埠狀態. slave傳送自身的服務埠給master以便master展示使用, 然後將狀態置為 REPL_STATE_RECEIVE_PORT;
    6. REPL_STATE_RECEIVE_PORT: 等待埠傳送結果. 不論結果如何, 直接將狀態置為 REPL_STATE_SEND_CAPA;
    7. REPL_STATE_SEND_CAPA: 等待發送capa命令狀態. 傳送 REPLCONF capa eof 後, 將狀態置為 REPL_STATE_RECEIVE_CAPA;
    8. REPL_STATE_RECEIVE_CAPA: 等待capa命令傳送結果. 不論結果如何, 將狀態置為 REPL_STATE_SEND_PSYNC;
    9. REPL_STATE_SEND_PSYNC: 等待PSYNC同步命令狀態. 嘗試使用PSYNC進行部分複製,結果可能是全量複製或部分複製,也可能使用其他版本命令執行, 將狀態置為 REPL_STATE_RECEIVE_PSYNC;
    10. REPL_STATE_RECEIVE_PSYNC: 等待PSYNC結果. 這是真正接收資料的時候, 是終態, 根據上一次命令的請求方式,接收相應結果進一步處理;
    11. 重新註冊一個 epoll 事件,用於接收master傳輸過來的資料,處理方法為 readSyncBulkPayload();

  接下來,我們先看看嘗試部分時都做了哪些事,因為這決定了是使用全量複製還是部分複製:

// 嘗試進行部分同步
// replication.c
int slaveTryPartialResynchronization(int fd, int read_reply) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;

    /* Writing half */
    // 第一次呼叫時, read_reply=0, 即是寫動作
    // 向 master 寫入 PSYNC psync_runid psync_offset
    // 即是每次都拉取一部分資料吧
    if (!read_reply) {
        /* Initially set repl_master_initial_offset to -1 to mark the current
         * master run_id and offset as not valid. Later if we'll be able to do
         * a FULL resync using the PSYNC command we'll set the offset at the
         * right value, so that this information will be propagated to the
         * client structure representing the master into server.master. */
        server.repl_master_initial_offset = -1;
        // 如果已經建立了連線,則 psync_runid, psync_offset 都是可預知的
        // 否則 psync_runid = "?", psync_offset="-1";
        if (server.cached_master) {
            psync_runid = server.cached_master->replrunid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
        } else {
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_runid = "?";
            memcpy(psync_offset,"-1",3);
        }

        /* Issue the PSYNC command */
        // 首次傳送命令 PSYNC ? -1
        // 後續使用實際的資訊 PSYNC psync_runid psync_offset
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
        if (reply != NULL) {
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
            sdsfree(reply);
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
            return PSYNC_WRITE_ERROR;
        }
        return PSYNC_WAIT_REPLY;
    }

    /* Reading half */
    // 讀取 PSYNC 的結果
    reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
    if (sdslen(reply) == 0) {
        /* The master may send empty newlines after it receives PSYNC
         * and before to reply, just to keep the connection alive. */
        sdsfree(reply);
        return PSYNC_WAIT_REPLY;
    }

    aeDeleteFileEvent(server.el,fd,AE_READABLE);
    // +FULLRESYNC 代表需要進行全量複製,否則進行部分複製
    // +FULLRESYNC runid offset
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;

        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        // runid 長度為 40
        if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
            serverLog(LL_WARNING,
                "Master replied with wrong +FULLRESYNC syntax.");
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * runid to make sure next PSYNCs will fail. */
            memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
        } else {
            memcpy(server.repl_master_runid, runid, offset-runid-1);
            server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
            server.repl_master_initial_offset = strtoll(offset,NULL,10);
            serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discard the cached master structure. */
        // 全量同步,重置master快取
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    }
    // 部分複製的情況下,只會返回 +CONTINUE
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        serverLog(LL_NOTICE,
            "Successful partial resynchronization with master.");
        // 立即將結果釋放,那什麼時候處理結果呢?
        sdsfree(reply);
        // 實際上通過該方法同步資料的
        replicationResurrectCachedMaster(fd);
        // 繼續使用 部分同步
        return PSYNC_CONTINUE;
    }

    /* If we reach this point we received either an error since the master does
     * not understand PSYNC, or an unexpected reply from the master.
     * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
    // PSYNC 不支援,因處理為降級版本
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        serverLog(LL_WARNING,
            "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        serverLog(LL_NOTICE,
            "Master does not support PSYNC or is in "
            "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}

  通過上面的過程,我們可以看清了整個與master是如何協調進行同步的,主要依賴於 PSYNC 的返回值決定。也可以看到,全量同步功能時,註冊了一個可讀事件的監聽,具體處理使用 readSyncBulkPayload 進行承載。

 

3.4. 全量同步資料的實現方式

  通過前面的分析,我們看到全量同時時,註冊了一個FileEvent事件,依賴於epoll實現非同步操作。具體處理由 readSyncBulkPayload() 進行處理。它負責非同步讀取master 同步過來的資料,寫入aof檔案,載入到slave的資料庫中。具體如下:

// replication.c
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[4096];
    ssize_t nread, readlen;
    off_t left;
    UNUSED(el);
    UNUSED(privdata);
    UNUSED(mask);

    /* Static vars used to hold the EOF mark, and the last bytes received
     * form the server: when they match, we reached the end of the transfer. */
    static char eofmark[CONFIG_RUN_ID_SIZE];
    static char lastbytes[CONFIG_RUN_ID_SIZE];
    static int usemark = 0;

    /* If repl_transfer_size == -1 we still have to read the bulk length
     * from the master reply. */
    // 先讀取資料長度
    if (server.repl_transfer_size == -1) {
        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
            serverLog(LL_WARNING,
                "I/O error reading bulk count from MASTER: %s",
                strerror(errno));
            goto error;
        }

        if (buf[0] == '-') {
            serverLog(LL_WARNING,
                "MASTER aborted replication with an error: %s",
                buf+1);
            goto error;
        } else if (buf[0] == '\0') {
            /* At this stage just a newline works as a PING in order to take
             * the connection live. So we refresh our last interaction
             * timestamp. */
            server.repl_transfer_lastio = server.unixtime;
            return;
        } else if (buf[0] != '$') {
            serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
            goto error;
        }

        /* There are two possible forms for the bulk payload. One is the
         * usual $<count> bulk format. The other is used for diskless transfers
         * when the master does not know beforehand the size of the file to
         * transfer. In the latter case, the following format is used:
         *
         * $EOF:<40 bytes delimiter>
         *
         * At the end of the file the announced delimiter is transmitted. The
         * delimiter is long and random enough that the probability of a
         * collision with the actual file content can be ignored. */
        if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
            usemark = 1;
            memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
            memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
            /* Set any repl_transfer_size to avoid entering this code path
             * at the next call. */
            server.repl_transfer_size = 0;
            serverLog(LL_NOTICE,
                "MASTER <-> SLAVE sync: receiving streamed RDB from master");
        } else {
            usemark = 0;
            // 讀取資料長度, 寫入 server.repl_transfer_size, 後續判斷是否取完整資料
            server.repl_transfer_size = strtol(buf+1,NULL,10);
            serverLog(LL_NOTICE,
                "MASTER <-> SLAVE sync: receiving %lld bytes from master",
                (long long) server.repl_transfer_size);
        }
        return;
    }

    /* Read bulk data */
    if (usemark) {
        readlen = sizeof(buf);
    } else {
        left = server.repl_transfer_size - server.repl_transfer_read;
        readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
    }

    nread = read(fd,buf,readlen);
    if (nread <= 0) {
        serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
            (nread == -1) ? strerror(errno) : "connection lost");
        cancelReplicationHandshake();
        return;
    }
    server.stat_net_input_bytes += nread;

    /* When a mark is used, we want to detect EOF asap in order to avoid
     * writing the EOF mark into the file... */
    int eof_reached = 0;

    if (usemark) {
        /* Update the last bytes array, and check if it matches our delimiter.*/
        // 更新 最後幾個字元
        if (nread >= CONFIG_RUN_ID_SIZE) {
            memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
        } else {
            int rem = CONFIG_RUN_ID_SIZE-nread;
            memmove(lastbytes,lastbytes+nread,rem);
            memcpy(lastbytes+rem,buf,nread);
        }
        if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
    }

    server.repl_transfer_lastio = server.unixtime;
    // 將資料寫入到 temp rdb 檔案中
    if (write(server.repl_transfer_fd,buf,nread) != nread) {
        serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
        goto error;
    }
    server.repl_transfer_read += nread;

    /* Delete the last 40 bytes from the file if we reached EOF. */
    if (usemark && eof_reached) {
        if (ftruncate(server.repl_transfer_fd,
            server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
        {
            serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
            goto error;
        }
    }

    /* Sync data on disk from time to time, otherwise at the end of the transfer
     * we may suffer a big delay as the memory buffers are copied into the
     * actual disk. */
    // 緩衝達到一定值後,直接刷盤
    // REPL_MAX_WRITTEN_BEFORE_FSYNC: 8M
    if (server.repl_transfer_read >=
        server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
    {
        off_t sync_size = server.repl_transfer_read -
                          server.repl_transfer_last_fsync_off;
        rdb_fsync_range(server.repl_transfer_fd,
            server.repl_transfer_last_fsync_off, sync_size);
        server.repl_transfer_last_fsync_off += sync_size;
    }

    /* Check if the transfer is now complete */
    // 傳輸完成
    if (!usemark) {
        if (server.repl_transfer_read == server.repl_transfer_size)
            eof_reached = 1;
    }

    if (eof_reached) {
        // 直接將臨時 rdb 檔案改名為正式的 rdb 檔案,從而實現資料替換
        if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
            serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
            cancelReplicationHandshake();
            return;
        }
        serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
        // 清空原來的資料,刷入新資料
        signalFlushedDb(-1);
        emptyDb(
            -1,
            server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
            replicationEmptyDbCallback);
        /* Before loading the DB into memory we need to delete the readable
         * handler, otherwise it will get called recursively since
         * rdbLoad() will call the event loop to process events from time to
         * time for non blocking loading. */
        aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
        serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
        // 重新載入 rdb 檔案,從而完成同步操作
        if (rdbLoad(server.rdb_filename) != C_OK) {
            serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
            cancelReplicationHandshake();
            return;
        }
        /* Final setup of the connected slave <- master link */
        zfree(server.repl_transfer_tmpfile);
        close(server.repl_transfer_fd);
        // 設定 master 資訊,以便下次直接使用
        replicationCreateMasterClient(server.repl_transfer_s);
        serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
        /* Restart the AOF subsystem now that we finished the sync. This
         * will trigger an AOF rewrite, and when done will start appending
         * to the new file. */
        if (server.aof_state != AOF_OFF) {
            int retry = 10;
            // 重新關聯 aof 檔案,以便後續寫入aof正常
            stopAppendOnly();
            while (retry-- && startAppendOnly() == C_ERR) {
                serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
                sleep(1);
            }
            if (!retry) {
                serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
                exit(1);
            }
        }
    }

    return;

error:
    cancelReplicationHandshake();
    return;
}

  以上就是全量複製功能實現了,大體步驟為:

    1. 先讀取整體資料長度;(肯定是master發來的資料了)
    2. 依次讀取就緒資料,將其定入臨時aof檔案 temp-<unixtime>.<pid>.aof;
    3. 達到一定緩衝數量後,強制刷盤;
    4. master 傳輸完成後,slave將臨時aof檔案重新命名為正式的aof檔案;
    5. slave 清空原來db資料;
    6. 禁用aof檔案的監聽,載入新的aof資料,重新開啟監聽;
    7. aof 先停止再啟動,重新關聯新檔案;

 

3.5. 部分複製的實現

  前面我們看到有個 slaveTryPartialResynchronization(), 是做部分同步檢測的,但是它只會返回幾個狀態,好像返回後都沒有做什麼後續處理。只有全量同步時,我們看到了如上邏輯。那麼部分同步是如何實現的呢?其中有個 +CONTINUE 的狀態值得我們注意:

    ...
    // 部分複製的情況下,只會返回 +CONTINUE
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        serverLog(LL_NOTICE,
            "Successful partial resynchronization with master.");
        // 立即將結果釋放,那什麼時候處理結果呢?
        sdsfree(reply);
        // 實際上通過該方法同步資料的
        replicationResurrectCachedMaster(fd);
        // 繼續使用 部分同步
        return PSYNC_CONTINUE;
    }
    ...

  就這上面這個,返回 CONTINUE 後,外部邏輯只是返回,所以肯定是 replicationResurrectCachedMaster() 做了處理。而這個處理,應該是讀取後續的資料沒錯了!

// replication.c, 使用 cacheMaster 做 PSYNC 處理複製資料    
/* Turn the cached master into the current master, using the file descriptor
 * passed as argument as the socket for the new master.
 *
 * This function is called when successfully setup a partial resynchronization
 * so the stream of data that we'll receive will start from were this
 * master left. */
void replicationResurrectCachedMaster(int newfd) {
    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->fd = newfd;
    server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;
    server.repl_state = REPL_STATE_CONNECTED;

    /* Re-add to the list of clients. */
    listAddNodeTail(server.clients,server.master);
    // 新增file事件,epoll事件, 由 readQueryFromClient 進行事件處理
    if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                          readQueryFromClient, server.master)) {
        serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
        freeClientAsync(server.master); /* Close ASAP. */
    }

    /* We may also need to install the write handler as well if there is
     * pending data in the write buffers. */
    // 如果有待發送資料,建立一個 寫的 fileEvent 事件
    if (clientHasPendingReplies(server.master)) {
        if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                          sendReplyToClient, server.master)) {
            serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    }
}
// 接下來,我們檢視下 當master傳送資料過來時,部分複製是如何實現的
// networking.c, 從 master 中讀取資料, privdata = server.master
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);
    // PROTO_IOBUF_LEN: 1024*16
    // PROTO_MBULK_BIG_ARG: 1024*32
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 讀取請求命令
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->reploff += nread;
    server.stat_net_input_bytes += nread;
    // 超出最大限制,不處理
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    // 處理 querybuf 資料, 其實就和普通的客戶端寫請求一樣的處理方式
    processInputBuffer(c);
}

  處理master 部分同步過來的資料,重新在 slave 執行一次即可,基於epoll的事件監聽,可以持續處理同步資料。

  所以,部分複製,其實就是重新在slave端執行與master相同的請求就好了。這個processInputBuffer()過程在前面的文章已經介紹過。

 

3.6. PSYNC 命令實現原理

  從上面可以看出,PSYNC是整個主從複製過程的重要操作,那麼 PSYNC 都是怎麼實現的呢?大體上應該是一個範圍查詢響應的過程,但是細節必然很多。我們可以先自己想想,要處理的點大概有哪些呢?

    1. 第一次呼叫時,即 PSYNC ? -1 如何處理?
    2. 後續呼叫時 即 PSYNC psync_runid psync_offset 如何處理?
    3. 響應結構是如何的?比如如何響應+CONTINUE?

  我們就通過原始碼來解答這些問題吧!

  首先是 PSYNC 的定義: 可以看到,sync 和 psync 居然是一樣的實現?

    // 差別是 sync 的引數只有一個,而 psync 的引數是3個
    {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
    {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},

  具體實現:

// 用法: PSYNC run_id offset
// replication.c    
/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {
    /* ignore SYNC if already slave or in monitor mode */
    // SYNC 命令只能呼叫成功一次,後續就直接忽略了
    if (c->flags & CLIENT_SLAVE) return;

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
        addReplyError(c,"Can't SYNC while not connected with my master");
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    // 還有輸出未完成時不能再進行處理
    if (clientHasPendingReplies(c)) {
        addReplyError(c,"SYNC and PSYNC are invalid with pending output");
        return;
    }

    serverLog(LL_NOTICE,"Slave %s asks for synchronization",
        replicationGetSlaveName(c));

    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens masterTryPartialResynchronization() already
     * replied with:
     *
     * +FULLRESYNC <runid> <offset>
     *
     * So the slave knows the new runid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    // 事實上,psync 和 sync 的實現還是區別對待的
    // psync 將會優先嚐試部分複製
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 部分複製將不會重置 flags, 即每次 psync 都會成功執行
        if (masterTryPartialResynchronization(c) == C_OK) {
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            char *master_runid = c->argv[1]->ptr;

            /* Increment stats for failed PSYNCs, but only if the
             * runid is not "?", as this is used by slaves to force a full
             * resync on purpose when they are not albe to partially
             * resync. */
            if (master_runid[0] != '?') server.stat_sync_partial_err++;
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli --slave). Flag the client
         * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= CLIENT_PRE_PSYNC;
    }
    // 以下為全量複製
    /* Full resynchronization. */
    server.stat_sync_full++;

    /* Setup the slave as one waiting for BGSAVE to start. The following code
     * paths will change the state if we handle the slave differently. */
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
    c->repldbfd = -1;
    // 新增slave 到master的從節點集合中, 設定 SLAVE 標識,表示已執行過 SYNC 操作
    c->flags |= CLIENT_SLAVE;
    listAddNodeTail(server.slaves,c);

    /* CASE 1: BGSAVE is in progress, with disk target. */
    // 如果 rdb 儲存已在進行中,即 BGSAVE 已經在執行
    // 此種是對於後來進行主從同步的客戶端,只需告知正在執行 BGSAVE 即可
    if (server.rdb_child_pid != -1 &&
        server.rdb_child_type == RDB_CHILD_TYPE_DISK)
    {
        /* Ok a background save is in progress. Let's check if it is a goo