1. 程式人生 > >Redis原始碼(十一)——Redis的主從複製

Redis原始碼(十一)——Redis的主從複製

Redis中可以傳送SLAVEOF命令讓一個伺服器A成為另一個伺服器B的“slave”,從而去複製B,稱A為從伺服器(slave),B為主伺服器(master)

關於複製,2.8版本以前及以後實現有一些不同,2.8版本以前的複製在斷線重連後效率較低,所以我們在這裡只分析下2.8版本以後的複製。

複製功能分為同步(sync)和命令傳播(command propagate)兩個步驟。

一、同步的實現

Redis在2.8以後的版本使用PSYNC指令進行同步(舊版使用SYNC)

PSYNC指令具有兩種模式:完整重同步(full resync)和部分重同步(partial resync)

1) 完整重同步一般用在第一次複製的時候,通過讓主伺服器建立併發送RDB檔案給從伺服器,並向從伺服器傳送緩衝區內的寫命令來實現。

2) 部分重同步一般用於斷線重連的複製,當從伺服器斷線後又重新上線,如果滿足一定條件主伺服器可以將斷線這段時間的寫命令傳送給從伺服器,從而使主從資料庫狀態保持一致。

1. 部分重同步實現

部分重同步的功能主要由以下幾個重要的屬性組成:1) 伺服器執行id(runid);2)部分重同步偏移量(psync_offset);3)主伺服器的複製備份(replication backlog)

1)伺服器執行id

每個Redis伺服器都有自己的執行id,執行id會在伺服器啟動時自動生成,是長度為40的隨機16進位制字串。當從伺服器對主伺服器進行初次複製時,主伺服器會將自己的執行id傳給從伺服器,從伺服器會儲存這個id。在進行部分重同步的請求時,首先會比較當前連線的主伺服器的執行id與之前儲存的執行id:若相同則說明,從伺服器之前複製的就是這個主伺服器,從而可以繼續執行部分重同步操作;若不相同則從伺服器不能對此主伺服器執行部分重同步操作。

2)部分重同步偏移量(psync_offset)

主伺服器與從伺服器都會維護一個重同步的偏移量:主伺服器想從伺服器傳播N位元組資料,就會將自己的偏移量加N;同理從伺服器收到N位元組資料就將自己的偏移量加N,從而我們可以根據偏移量來確定從伺服器和主伺服器的資料庫狀態是否一致。

3)主伺服器的複製備份(replication backlog)

當從伺服器進行斷線重連併發送自己的同步偏移量給主伺服器,那麼主服務是如何來決定使用完整重同步還是部分重同步的呢?這與主伺服器的複製備份有關。主伺服器維護了一個長度固定的佇列,預設大小為1024*1024位元組,即1M。當主伺服器進行命令傳播,在將寫命令傳送給從伺服器的同時它也將寫命令推入複製備份佇列中。所以佇列裡會儲存著最近的一部分傳播命令。

如果從伺服器的psync_offset還在複製備份佇列範圍內,那將執行部分重同步操作;反之將執行完整重同步

部分重同步的底層程式碼實現如下:

/* Trya partial resynchronization with the master if we are about to reconnect.
 *
 * 在重連線之後,嘗試進行部分重同步。
 *
 * If there is no cached master structure, atleast try to issue a
 * "PSYNC ? -1" command in order totrigger a full resync using the PSYNC
 * command in order to obtain the master run idand the master replication
 * global offset.
 *
 * 如果 master 快取為空,那麼通過 "PSYNC ? -1" 命令來觸發一次 full resync ,
 * 讓主伺服器的 run id 和複製偏移量可以傳到附屬節點裡面。
 *
 * This function is designed to be called fromsyncWithMaster(), so the
 * following assumptions are made:
 *
 * 這個函式由 syncWithMaster() 函式呼叫,它做了以下假設:
 *
 * 1) We pass the function an already connectedsocket "fd".
 *    一個已連線套接字 fd 會被傳入函式
 * 2) This function does not close the filedescriptor "fd". However in case
 *    ofsuccessful partial resynchronization, the function will reuse
 *   'fd' as file descriptor of the server.master client structure.
 *    函式不會關閉 fd 。
 *    當部分同步成功時,函式會將 fd 用作 server.master 客戶端結構中的
 *    檔案描述符。
 *
 * The function returns:
 * 以下是函式的返回值:
 *
 * PSYNC_CONTINUE: If the PSYNC commandsucceded and we can continue.
 *                 PSYNC 命令成功,可以繼續。
 * PSYNC_FULLRESYNC: If PSYNC is supported buta full resync is needed.
 *                   In this case the masterrun_id and global replication
 *                   offset is saved.
 *                   主伺服器支援 PSYNC 功能,但目前情況需要執行 full resync 。
 *                   在這種情況下, run_id 和全域性複製偏移量會被儲存。
 * PSYNC_NOT_SUPPORTED: If the server does notunderstand PSYNC at all and
 *                      the caller should fallback to SYNC.
 *                      主伺服器不支援 PSYNC ,呼叫者應該下降到 SYNC 命令。
 */
 
#definePSYNC_CONTINUE 0
#definePSYNC_FULLRESYNC 1
#definePSYNC_NOT_SUPPORTED 2
intslaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
 
    /* Initially set repl_master_initial_offsetto -1 to mark the current
     * master run_id and offset as not valid.Later if we'll be able to do
     * a FULL resync using the PSYNC commandwe'll set the offset at the
     * right value, so that this informationwill be propagated to the
     * client structure representing the masterinto server.master. */
    server.repl_master_initial_offset = -1;
 
    if (server.cached_master) {
        // 快取存在,嘗試部分重同步
        // 命令為 "PSYNC <master_run_id> <repl_offset>"
        psync_runid =server.cached_master->replrunid;
       snprintf(psync_offset,sizeof(psync_offset),"%lld",server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying apartial resynchronization (request %s:%s).", psync_runid, psync_offset);
    } else {
        // 快取不存在
        // 傳送 "PSYNC ? -1" ,要求完整重同步
        redisLog(REDIS_NOTICE,"Partialresynchronization not possible (no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }
 
    /* Issue the PSYNC command */
    // 向主伺服器傳送 PSYNC 命令
    reply =sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
 
    // 接收到 FULLRESYNC ,進行 full-resync
    if(!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;
 
        /* FULL RESYNC, parse the reply inorder to extract the run id
         * and the replication offset. */
        // 分析並記錄主伺服器的 run id
        runid = strchr(reply,' ');
        if (runid) {
            runid++;
            offset = strchr(runid,' ');
            if (offset) offset++;
        }
        // 檢查 run id 的合法性
        if (!runid || !offset ||(offset-runid-1) != REDIS_RUN_ID_SIZE) {
            redisLog(REDIS_WARNING,
                "Master replied with wrong+FULLRESYNC syntax.");
            /* This is an unexpected condition,actually the +FULLRESYNC
             * reply means that the mastersupports PSYNC, but the reply
             * format seems wrong. To stay safe we blankthe master
             * runid to make sure next PSYNCswill fail. */
            // 主伺服器支援 PSYNC ,但是卻發來了異常的 run id
            // 只好將 run id 設為 0 ,讓下次 PSYNC 時失敗
           memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
        } else {
            // 儲存 run id
            memcpy(server.repl_master_runid,runid, offset-runid-1);
           server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
            // 以及 initial offset
            server.repl_master_initial_offset =strtoll(offset,NULL,10);
            // 列印日誌,這是一個 FULL resync
            redisLog(REDIS_NOTICE,"Fullresync from master: %s:%lld",
                server.repl_master_runid,
                server.repl_master_initial_offset);
        }
        /* We are going to full resync, discardthe cached master structure. */
        // 要開始完整重同步,快取中的 master 已經沒用了,清除它
        replicationDiscardCachedMaster();
        sdsfree(reply);
       
        // 返回狀態
        return PSYNC_FULLRESYNC;
    }
 
    // 接收到 CONTINUE ,進行 partial resync
    if(!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set thereplication state accordingly */
        redisLog(REDIS_NOTICE,
            "Successful partial resynchronizationwith master.");
        sdsfree(reply);
        // 將快取中的 master 設為當前 master
        replicationResurrectCachedMaster(fd);
 
        // 返回狀態
        return PSYNC_CONTINUE;
    }
 
    /* If we reach this point we receied eitheran error since the master does
     * not understand PSYNC, or an unexpectedreply from the master.
     * Return PSYNC_NOT_SUPPORTED to the callerin both cases. */
 
    // 接收到錯誤?
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log theunexpected event. */
        redisLog(REDIS_WARNING,
            "Unexpected reply to PSYNCfrom master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
            "Master does not support PSYNCor is in "
            "error state (reply:%s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
 
    // 主伺服器不支援 PSYNC
    return PSYNC_NOT_SUPPORTED;
}
 
// 從伺服器用於同步主伺服器的回撥函式
voidsyncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);
 
    /* If this event fired after the userturned the instance into a master
     * with SLAVEOF NO ONE we must just returnASAP. */
    // 如果處於 SLAVEOF NO ONE 模式,那麼關閉 fd
    if (server.repl_state == REDIS_REPL_NONE) {
        close(fd);
        return;
    }
 
    /* Check for errors in the socket. */
    // 檢查套接字錯誤
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR,&sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
       aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        redisLog(REDIS_WARNING,"Errorcondition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }
 
    /* If we were connecting, it's time to senda non blocking PING, we want to
     * make sure the master is able to replybefore going into the actual
     * replication process where we have longtimeouts in the order of
     * seconds (in the meantime the slave wouldblock). */
    // 如果狀態為 CONNECTING ,那麼在進行初次同步之前,
    // 向主伺服器傳送一個非阻塞的 PONG
    // 因為接下來的 RDB 檔案傳送非常耗時,所以我們想確認主伺服器真的能訪問
    if (server.repl_state ==REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Nonblocking connect for SYNC fired the event.");
        /* Delete the writable event so thatthe readable event remains
         * registered and we can wait for thePONG reply. */
        // 手動傳送同步 PING ,暫時取消監聽寫事件
       aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        // 更新狀態
        server.repl_state =REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don't check forerrors at all, we have the timeout
         * that will take care about this. */
        // 同步傳送 PING
       syncWrite(fd,"PING\r\n",6,100);
 
        // 返回,等待 PONG 到達
        return;
    }
 
    /* Receive the PONG command. */
    // 接收 PONG 命令
    if (server.repl_state ==REDIS_REPL_RECEIVE_PONG) {
       char buf[1024];
 
        /* Delete the readable event, we nolonger need it now that there is
         * the PING reply to read. */
        // 手動同步接收 PONG ,暫時取消監聽讀事件
       aeDeleteFileEvent(server.el,fd,AE_READABLE);
 
        /* Read the reply with explicittimeout. */
        // 嘗試在指定時間限制內讀取 PONG
        buf[0] = '\0';
        // 同步接收 PONG
        if (syncReadLine(fd,buf,sizeof(buf),
            server.repl_syncio_timeout*1000) ==-1)
        {
            redisLog(REDIS_WARNING,
                "I/O error reading PINGreply from master: %s",
                strerror(errno));
            goto error;
        }
 
        /* We accept only two replies as valid,a positive +PONG reply
         * (we just check for "+") oran authentication error.
         * Note that older versions of Redisreplied with "operation not
         * permitted" instead of using aproper error code, so we test
         * both. */
        // 接收到的資料只有兩種可能:
        // 第一種是 +PONG ,第二種是因為未驗證而出現的 -NOAUTH 錯誤
        if (buf[0] != '+' &&
            strncmp(buf,"-NOAUTH",7)!= 0 &&
            strncmp(buf,"-ERR operationnot permitted",28) != 0)
        {
            // 接收到未驗證錯誤
            redisLog(REDIS_WARNING,"Errorreply to PING from master: '%s'",buf);
            goto error;
        } else {
            // 接收到 PONG
            redisLog(REDIS_NOTICE,
                "Master replied to PING,replication can continue...");
        }
    }
 
    /* AUTH with the master if required. */
    // 進行身份驗證
    if(server.masterauth) {
        err =sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
        if (err[0] == '-') {
            redisLog(REDIS_WARNING,"Unableto AUTH to MASTER: %s",err);
            sdsfree(err);
            goto error;
        }
        sdsfree(err);
    }
 
    /* Set the slave port, so that Master'sINFO command can list the
     * slave listening port correctly. */
    // 將從伺服器的埠傳送給主伺服器,
    // 使得主伺服器的 INFO 命令可以顯示從伺服器正在監聽的埠
    {
        sds port = sdsfromlonglong(server.port);
        err =sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                         NULL);
        sdsfree(port);
        /* Ignore the error if any, not all theRedis versions support
         * REPLCONF listening-port. */
        if (err[0] == '-') {
            redisLog(REDIS_NOTICE,"(Noncritical) Master does not understand REPLCONF listening-port: %s", err);
        }
        sdsfree(err);
    }
 
    /* Try a partial resynchonization. If wedon't have a cached master
     * slaveTryPartialResynchronization() willat least try to use PSYNC
     * to start a full resynchronization sothat we get the master run id
     * and the global offset, to try a partialresync at the next
     * reconnection attempt. */
    // 根據返回的結果決定是執行部分 resync ,還是 full-resync
    psync_result =slaveTryPartialResynchronization(fd);
 
    // 可以執行部分 resync
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER<-> SLAVE sync: Master accepted a Partial Resynchronization.");
        // 返回
        return;
    }
 
    /* Fall back to SYNC if needed. Otherwisepsync_result == PSYNC_FULLRESYNC
     * and the server.repl_master_runid andrepl_master_initial_offset are
     * already populated. */
    // 主伺服器不支援 PSYNC ,傳送 SYNC
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retryingwith SYNC...");
        // 向主伺服器傳送 SYNC 命令
        if(syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/Oerror writing to MASTER: %s",
                strerror(errno));
            goto error;
        }
    }
 
    // 如果執行到這裡,
    // 那麼 psync_result == PSYNC_FULLRESYNC 或PSYNC_NOT_SUPPORTED
 
    /* Prepare a suitable temp file for bulktransfer */
    // 開啟一個臨時檔案,用於寫入和儲存接下來從主伺服器傳來的 RDB 檔案資料
    while(maxtries--) {
        snprintf(tmpfile,256,
           "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
        if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Openingthe temp file needed for MASTER <-> SLAVE synchronization:%s",strerror(errno));
        goto error;
    }
 
    /* Setup the non blocking download of thebulk file. */
    // 設定一個讀事件處理器,來讀取主伺服器的 RDB 檔案
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,readSyncBulkPayload,NULL)
            == AE_ERR)
    {
        redisLog(REDIS_WARNING,
            "Can't create readable eventfor SYNC: %s (fd=%d)",
            strerror(errno),fd);
        goto error;
    }
 
    // 設定狀態
    server.repl_state = REDIS_REPL_TRANSFER;
 
    // 更新統計資訊
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_fd = dfd;
    server.repl_transfer_lastio =server.unixtime;
    server.repl_transfer_tmpfile =zstrdup(tmpfile);
 
    return;
 
error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}


二、命令傳播

一般來說在執行完同步命令後,主從伺服器的資料庫狀態應該保持一致,但是假設在主從伺服器剛剛完成同步後,客戶端又向主伺服器傳送了相關的寫命令,那麼此時主從資料庫的狀態是不一致的,這個時候主伺服器就需要對從伺服器執行命令傳播:主伺服器將執行的寫命令傳送給從伺服器,待從伺服器執行後,兩者的資料庫狀態將保持一致。