1. 程式人生 > >redis叢集實現(一)叢集架構與初始化

redis叢集實現(一)叢集架構與初始化

redis是一個高可用、高效能、高可擴充套件性的基於記憶體也支援持久化儲存的kv儲存資料庫,redis相比較於之前的kv儲存memcached而言,不但支援的value型別大大增加,並且還支援資料的持久化,彌補了memcached的不能持久化的缺點,但是在3.0之前的redis並不支援叢集功能,這也是redis3.0之前不能被大量部署的一個原因,但是由於3.0以後的redis支援了叢集功能,redis就開始大量的替代之前的memcached,今天我從原始碼層次學習下redis是怎麼實現叢集功能的。

我看的原始碼是redis-3.0原始碼,可以在下邊這個連結下載到。

redis的叢集並不是類似於HDFS

之類的namenodedatanode之類的架構,而是採用改進的一致性雜湊演算法來對資料進行分片,平均的分配到每一個master節點上,每一個master節點都有相對應的slave節點來複制master節點的資料,以便master宕機的時候來選舉成為master節點。大體的架構如下圖所示:

採取一致性雜湊演算法,保證每一塊資料對映在0-16384的區間之上,然後這個區間的一部分分給一個master來服務(當然不會分的這麼簡單)。每一個client訪問的時候就會訪問對應的master,可能有人想問client是怎麼知道資料在哪一個master上的,其實client也不知道,client會訪問一個master

,然後master發現數據不在這個master節點上,那麼master就會告訴client存放client想要的資料所在的master地址,然後client就會訪問到正確的master了。

那麼,redis叢集是怎麼搭建起來的呢,難道是幾十上百臺機器同時開機自動連線的嗎?當然不是,當只有一臺機器的時候,可以認為這是一個只有一臺機器的叢集,然後client登入master執行cluster meet <ip> <port>來把指定ip地址的機器加入到叢集裡邊。這樣,這個叢集就擁有兩臺機器了。就這樣,一臺一臺的新增,就實現了大規模的redis叢集。

首先看看和叢集有關的資料結構,這些都是叢集實現的基礎。

// 節點狀態結構體
struct clusterNode {

    // 建立節點的時間
    mstime_t ctime; 

    // 節點ID,通過隨機數生成,長度為40,每一個字元都是一個16進位制字元
    char name[REDIS_CLUSTER_NAMELEN];

    // 節點狀態標識位,比如標識節點是主節點還是從節點。
    int flags;  

    // 節點當前的配置紀元
    uint64_t configEpoch; 

    // 這個node儲存的資料槽點陣圖,REDIS_CLUSTER_SLOTS就是redis叢集分成的塊數目,相當於上邊的16384,如果值是1代表這個資料槽的資料儲存在當前節點,如果是0表示不在這個節點。
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; 

    // 這個node儲存的資料槽的數目
    int numslots;  

    // 如果本節點是主節點,這個欄位表示從節點的數目
    int numslaves; 

    // 指標陣列,指向各個從節點
    struct clusterNode **slaves;

    // 如果這是一個從節點,那麼指向主節點
    struct clusterNode *slaveof;                                                                                            

    // 最後一次傳送 PING資料包的時間
    mstime_t ping_sent;      

    // 最後一次接收 PONG資料包的時間戳
    mstime_t pong_received; 

    // 最後一次被設定為 FAIL狀態的時間
    mstime_t fail_time;     

    // 最後一次給某個從節點投票的時間
    mstime_t voted_time;    

    // 最後一次從這個節點接收到複製偏移量的時間
    mstime_t repl_offset_time; 

    // 這個節點的複製偏移量
    long long repl_offset;     

    // 節點的 IP地址
    char ip[REDIS_IP_STR_LEN];  

    // 節點的埠號
    int port;               

    // 儲存連線相關的資訊
    clusterLink *link; 

    // 一個連結串列,記錄了所有其他節點對該節點的下線報告
    list *fail_reports; 
};

然後是記錄叢集之間的連線的結構體

// clusterLink 包含了與其他節點進行通訊所需的全部資訊
typedef struct clusterLink {                                                                                                                                  

    // 連線的建立時間
    mstime_t ctime;        

    // TCP 套接字描述符
    int fd;          

    // 輸出緩衝區,儲存著等待發送給其他節點的訊息(message)。
    sds sndbuf;     

    // 輸入緩衝區,儲存著從其他節點接收到的訊息。
    sds rcvbuf;     

    // 與這個連線相關聯的節點,如果沒有的話就為 NULL
    struct clusterNode *node;

} clusterLink;

然後是記錄叢集狀態的結構體,每一個節點都有一個這個結構體,用來表示當前叢集的狀態。

typedef struct clusterState {
                                                                                                                                                              
    // 指向當前節點的指標
    clusterNode *myself;

    // 叢集當前的配置紀元,用於實現故障轉移
    uint64_t currentEpoch;

    // 叢集當前的狀態:是線上還是下線
    int state;

    // 叢集中至少處理著一個槽的節點的數量。
    int size;   

    // 叢集節點名單(包括 myself節點)
    // 字典的鍵為節點的名字,字典的值為 clusterNode結構
    dict *nodes;   

    // 節點黑名單,用於 CLUSTER FORGET命令
    // 防止被 FORGET的命令重新被新增到叢集裡面
    // (不過現在似乎沒有在使用的樣子,已廢棄?還是尚未實現?)
    dict *nodes_black_list;

    // 記錄要從當前節點遷移到目標節點的槽,以及遷移的目標節點
    // migrating_slots_to[i] = NULL 表示槽 i未被遷移
    // migrating_slots_to[i] = clusterNode_A 表示槽 i要從本節點遷移至節點 A
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

    // 記錄要從源節點遷移到本節點的槽,以及進行遷移的源節點
    // importing_slots_from[i] = NULL 表示槽 i未進行匯入
    // importing_slots_from[i] = clusterNode_A 表示正從節點 A中匯入槽 i
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

    // 負責處理各個槽的節點
    // 例如 slots[i] = clusterNode_A表示槽 i 由節點 A處理
    clusterNode *slots[REDIS_CLUSTER_SLOTS];

    // 跳躍表,表中以槽作為分值,鍵作為成員,對槽進行有序排序
    // 當需要對某些槽進行區間(range)操作時,這個跳躍表可以提供方便
    // 具體操作定義在 db.c裡面
    zskiplist *slots_to_keys;

    // 以下這些域被用於進行故障轉移選舉

    // 上次執行選舉或者下次執行選舉的時間
    mstime_t failover_auth_time;

    // 節點獲得的投票數量
    int failover_auth_count; 

    // 如果值為 1,表示本節點已經向其他節點發送了投票請求
    int failover_auth_sent; 

    int failover_auth_rank;

    uint64_t failover_auth_epoch; 

    /* 共用的手動故障轉移狀態 */

    // 手動故障轉移執行的時間限制
    mstime_t mf_end;           

    /* 主伺服器的手動故障轉移狀態 */
    clusterNode *mf_slave;     

    /* 從伺服器的手動故障轉移狀態 */
    long long mf_master_offset;    // 指示手動故障轉移是否可以開始的標誌值
    // 值為非 0時表示各個主伺服器可以開始投票
    int mf_can_start;          

    /* The followign fields are uesd by masters to take state on elections. */
    /* 以下這些域由主伺服器使用,用於記錄選舉時的狀態 */

    // 叢集最後一次進行投票的紀元
    uint64_t lastVoteEpoch;   

    // 在進入下個事件迴圈之前要做的事情,以各個 flag來記錄
    int todo_before_sleep;

    // 通過 cluster連線傳送的訊息數量
    long long stats_bus_messages_sent; 

    // 通過 cluster接收到的訊息數量
    long long stats_bus_messages_received; 
} clusterState;

基本的結構體都介紹完了,我們來看看叢集的程式碼實現吧。

首先在看redis單節點的初始化程式碼,這是叢集的第一步,首先啟動單節點服務。

redis原始碼裡,redis.c檔案裡的main函式是redis-server的開始,由於我們只關心叢集的實現程式碼,一些和叢集關係不大的我就忽略了。

int main(int argc, char **argv) {

………………..

//說明使用者指定了引數,我們需要檢查使用者是不是指定了配置檔案

if (argc >= 2) {

……………………

//讀取配置檔案

loadServerConfig(configfile,options);

}

然後跳入loadServerConfig函式來進行字串配置的解析。在loadServerConfig函式裡有以下程式碼,如果配置項有cluster-enabled,我們就設定server.cluster_enabled1,表示叢集功能的開啟。

else if (!strcasecmp(argv[0],"cluster-enabled") && argc ==2) {                                                                                     

             if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {

                 err = "argument must be 'yes' or 'no'";goto loaderr;

             }

接著在main函式裡邊,讀取玩配置檔案,執行initServer()函式,在initServer函式裡邊,

// 如果伺服器以 cluster模式開啟,那麼初始化 cluster

if (server.cluster_enabled) clusterInit();

接著我們進入clusterInit函式,看看單機叢集設定的初始化程式碼

// 初始化叢集
void clusterInit(void) {
    int saveconf = 0; 

    // 初始化配置,server.cluster就是clusterState結構體,每一個節點儲存一個。
    server.cluster = zmalloc(sizeof(clusterState));
    //指向自己的節點指標
    server.cluster->myself = NULL;
    //初始配置紀元為0
    server.cluster->currentEpoch = 0; 
    //初始配置狀態fail
    server.cluster->state = REDIS_CLUSTER_FAIL;
    //叢集數目為1
    server.cluster->size = 1; 
    server.cluster->todo_before_sleep = 0; 
    //建立節點對映的雜湊結構體
    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    //節點的黑名單。。
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);
    //執行選舉相關的變數初始化
    server.cluster->failover_auth_time = 0; 
    server.cluster->failover_auth_count = 0; 
    server.cluster->failover_auth_rank = 0; 
    server.cluster->failover_auth_epoch = 0; 
    server.cluster->lastVoteEpoch = 0; 
    server.cluster->stats_bus_messages_sent = 0; 
    server.cluster->stats_bus_messages_received = 0; 
    //初始化槽
    memset(server.cluster->slots,0, sizeof(server.cluster->slots));
    //把槽陣列清零
    clusterCloseAllSlots();

    /* 鎖住叢集配置檔案,確保每個每個節點使用的是自己的配置檔案 */
    if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
        exit(1);

    /* 載入本節點的叢集配置檔案. */
    if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
        /* 如果沒有發現叢集配置檔案,就把自己加入到叢集裡. */
        myself = server.cluster->myself =
            createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
        redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);
        saveconf = 1;
    }

    // 儲存 nodes.conf檔案
    if (saveconf) clusterSaveConfigOrDie(1);

    // 監聽 TCP埠
    server.cfd_count = 0;

    if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        redisLog(REDIS_WARNING, "Redis port number too high. "
                   "Cluster communication port is 10,000 port "
                   "numbers higher than your Redis port. "
                   "Your Redis port number must be "
                   "lower than 55535.");
        exit(1);
    }
    //監聽本節點的埠號
    if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == REDIS_ERR)
    {
        exit(1);
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            // 關聯監聽事件處理器
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler,NULL) == AE_ERR)                                                                                                        
                    redisPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }

    // slots -> keys 對映是一個有序集合,基礎實現是跳躍連結串列,分值是槽號,返回的是key值
    server.cluster->slots_to_keys = zslCreate();
    resetManualFailover();
}
好了,至此,和叢集相關的初始化就結束了,以後我再寫一些新增刪除節點以及故障恢復相關的文章,歡迎大家提問哦~~