1. 程式人生 > >Redis原始碼解析:25叢集(一)握手、心跳訊息以及下線檢測

Redis原始碼解析:25叢集(一)握手、心跳訊息以及下線檢測

         Redis叢集是Redis提供的分散式資料庫方案,通過分片來進行資料共享,並提供複製和故障轉移功能。

一:初始化

1:資料結構

在原始碼中,通過server.cluster記錄整個叢集當前的狀態,比如叢集中的所有節點;叢集目前的狀態,比如是上線還是下線;叢集當前的紀元等等。該屬性是一個clusterState型別的結構體。該結構體的定義如下:

typedef struct clusterState {
    clusterNode *myself;  /* This node */
    ...
    int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
    int size;             /* Num of master nodes with at least one slot */
    dict *nodes;          /* Hash table of name -> clusterNode structures */
    ...
    clusterNode *slots[REDIS_CLUSTER_SLOTS];
    zskiplist *slots_to_keys;
    ...
} clusterState;

         myself指向當前Redis例項所表示的節點;state表示叢集狀態;字典nodes中記錄了,包括自己在內的所有叢集節點,該字典以節點名為key,以結構體clusterNode為value。其他屬性與具體的流程相關,後續在介紹叢集各種流程時會介紹。

叢集中的節點是由clusterNode表示的,該結構體的定義如下:

typedef struct clusterNode {
    mstime_t ctime; /* Node object creation time. */
    char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
    int flags;      /* REDIS_NODE_... */
    ...
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    mstime_t pong_received;  /* Unix time we received the pong */
    ...
    char ip[REDIS_IP_STR_LEN];  /* Latest known IP address of this node */
    int port;                   /* Latest known port of this node */
    clusterLink *link;          /* TCP/IP link with this node */
    list *fail_reports;         /* List of nodes signaling this as failing */
} clusterNode;

         該結構體記錄了節點的狀態和屬性。ctime表示節點的建立時間;name表示節點名,每個節點都有一個40位元組長的隨機字串作為名字,該名字同時也作為該節點在字典server.cluster->nodes中的key;flags表示節點的型別和狀態,比如節點是否下線,是主節點還是從節點等,都記錄在標誌位flags中;ip和port表示該節點的地址屬性;link表示當前節點與該節點間的TCP連線,該結構中包含socket描述符、輸入緩衝區和輸出緩衝區等屬性。在link所表示的TCP連線中,當前節點為客戶端,clusterNode所表示的節點為服務端。其他屬性與具體的流程相關,後續在介紹叢集各種流程時會介紹。

2:初始化

Redis例項啟動時,根據配置檔案中的"cluster-enabled"選項,決定該Redis例項是否處於叢集模式。如果該選項值為”yes”,則Redis例項中的server.cluster_enabled被置為1,表示當前處於叢集模式。

         在叢集模式下,Redis例項啟動時,首先會呼叫clusterInit函式,初始化叢集需要使用的結構,並建立監聽埠。該函式的程式碼如下:

void clusterInit(void) {
    int saveconf = 0;

    server.cluster = zmalloc(sizeof(clusterState));
    server.cluster->myself = NULL;
    server.cluster->currentEpoch = 0;
    server.cluster->state = REDIS_CLUSTER_FAIL;
    server.cluster->size = 1;
    server.cluster->todo_before_sleep = 0;
    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);
    server.cluster->failover_auth_time = 0;
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_rank = 0;
    server.cluster->failover_auth_epoch = 0;
    server.cluster->cant_failover_reason = REDIS_CLUSTER_CANT_FAILOVER_NONE;
    server.cluster->lastVoteEpoch = 0;
    server.cluster->stats_bus_messages_sent = 0;
    server.cluster->stats_bus_messages_received = 0;
    memset(server.cluster->slots,0, sizeof(server.cluster->slots));
    clusterCloseAllSlots();

    /* Lock the cluster config file to make sure every node uses
     * its own nodes.conf. */
    if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
        exit(1);

    /* Load or create a new nodes configuration. */
    if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
        /* No configuration found. We will just use the random name provided
         * by the createClusterNode() function. */
        myself = server.cluster->myself =
            createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
        redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);
        saveconf = 1;
    }
    if (saveconf) clusterSaveConfigOrDie(1);

    /* We need a listening TCP port for our cluster messaging needs. */
    server.cfd_count = 0;

    /* Port sanity check II
     * The other handshake port check is triggered too late to stop
     * us from trying to use a too-high cluster port number. */
    if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        redisLog(REDIS_WARNING, "Redis port number too high. "
                   "Cluster communication port is 10,000 port "
                   "numbers higher than your Redis port. "
                   "Your Redis port number must be "
                   "lower than 55535.");
        exit(1);
    }

    if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == REDIS_ERR)
    {
        exit(1);
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler, NULL) == AE_ERR)
                    redisPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }

    /* The slots -> keys map is a sorted set. Init it. */
    server.cluster->slots_to_keys = zslCreate();

    /* Set myself->port to my listening port, we'll just need to discover
     * the IP address via MEET messages. */
    myself->port = server.port;

    server.cluster->mf_end = 0;
    resetManualFailover();
}

         在該函式中,首先初始化clusterState結構型別server.cluster中的各個屬性;

如果在Redis配置檔案中指定了"cluster-config-file"選項的值,則用server.cluster_configfile屬性記錄該選項值,表示叢集配置檔案。接下來,就根據配置檔案的內容,初始化server.cluster中的各個屬性;

         如果載入叢集配置檔案失敗(或者配置檔案不存在),則以REDIS_NODE_MYSELF和REDIS_NODE_MASTER為標記,建立一個clusterNode結構表示自己本身,置為主節點,並設定自己的名字為一個40位元組的隨機串;然後將該節點新增到server.cluster->nodes中;

         接下來,呼叫listenToPort函式,在叢集監埠上建立socket描述符進行監聽。該叢集監聽埠是在Redis監聽埠基礎上加10000,比如如果Redis監聽客戶端的埠為6379,則叢集監聽埠就是16379,該監聽埠用於接收其他叢集節點的TCP建鏈,叢集中的每個節點,都會與其他節點進行建鏈,因此整個叢集就形成了一個強連通網狀圖;

         然後註冊監聽埠上的可讀事件,事件回撥函式為clusterAcceptHandler。

噹噹前節點收到其他叢集節點發來的TCP建鏈請求之後,就會呼叫clusterAcceptHandler函式accept連線。在clusterAcceptHandler函式中,對於每個已經accept的連結,都會建立一個clusterLink結構表示該連結,並註冊socket描述符上的可讀事件,事件回撥函式為clusterReadHandler。

二:叢集節點間的握手

1:CLUSTER  MEET命令

         Redis例項以叢集模式啟動之後,此時,在它的視角中,當前叢集只有他自己一個節點。如何認識叢集中的其他節點呢,這就需要客戶端傳送”CLUSTER  MEET”命令。

         客戶端向叢集節點A傳送命令” CLUSTER  MEET nodeB_ip  nodeB_port”, 其中的nodeB_ip和nodeB_port,表示節點B的ip和port。節點A收到客戶端發來的該命令後,呼叫clusterCommand函式處理。這部分的程式碼如下:

    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        long long port;

        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }

        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            addReply(c,shared.ok);
        }
    }

以命令中的ip和port為引數,呼叫clusterStartHandshake函式,節點A開始向節點B進行握手。

在clusterStartHandshake函式中,會以REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET為標誌,建立一個clusterNode結構表示節點B,該結構的ip和port屬性分別置為節點B的ip和port,並將該節點插入到字典server.cluster->nodes中。      這部分的程式碼如下:

/* Add the node with a random address (NULL as first argument to
 * createClusterNode()). Everything will be fixed during the
 * handshake. */
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);

注意,因為此時A還不知道節點B的名字,因此以NULL為引數呼叫函式createClusterNode,該函式中,會暫時以一個隨機串當做B的名字,後續互動過程中,節點B會在PONG包中發來自己的名字。

2:TCP建鏈

         在叢集定時器函式clusterCron中,會輪訓字典server.cluster->nodes中的每一個節點node,一旦發現node->link為NULL,就表示尚未向該節點建鏈(或是之前的連線已斷開)。因此,開始向其叢集埠發起TCP建鏈,這部分程式碼如下:

       if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
            if (fd == -1) {
                /* We got a synchronous error from connect before
                 * clusterSendPing() had a chance to be called.
                 * If node->ping_sent is zero, failure detection can't work,
                 * so we claim we actually sent a ping now (that will
                 * be really sent as soon as the link is obtained). */
                if (node->ping_sent == 0) node->ping_sent = mstime();
                redisLog(REDIS_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->port+REDIS_CLUSTER_PORT_INCR,
                    server.neterr);
                continue;
            }
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            /* Queue a PING in the new connection ASAP: this is crucial
             * to avoid false positives in failure detection.
             *
             * If the node is flagged as MEET, we send a MEET message instead
             * of a PING one, to force the receiver to add us in its node
             * table. */
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {
                /* If there was an active ping before the link was
                 * disconnected, we want to restore the ping time, otherwise
                 * replaced by the clusterSendPing() call. */
                node->ping_sent = old_ping_sent;
            }
            /* We can clear the flag after the first packet is sent.
             * If we'll never receive a PONG, we'll never send new packets
             * to this node. Instead after the PONG is received and we
             * are no longer in meet/handshake status, we want to send
             * normal PING packets. */
            node->flags &= ~REDIS_NODE_MEET;

            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
                    node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
        }

         當前節點A呼叫anetTcpNonBlockBindConnect函式,開始向節點B發起非阻塞的TCP建鏈,然後呼叫createClusterLink,建立clusterLink結構link,在這種連線中,節點B為服務端,當前節點為客戶端;然後註冊link->fd上的可讀事件,事件回撥函式為clusterReadHandler;

然後根據節點標誌位中是否有REDIS_NODE_MEET標記,向該節點發送MEET包或者PING包;最後清除節點標誌位中的REDIS_NODE_MEET標記。(該非阻塞的建鏈過程,沒有判斷建鏈成功或失敗的步驟,只要可寫事件觸發,直接傳送MEET或PING包,如果傳送成功,則說明之前建鏈成功了,如果傳送失敗,則說明建鏈失敗,會直接釋放該連結)。

         節點B在叢集埠上收到其他叢集節點發來的訊息之後,觸發其監聽埠上的可讀事件,事件回撥函式clusterReadHandler中,呼叫read讀取其他節點發來的資料。當收齊一個包的所有資料後,呼叫clusterProcessPacket函式處理該包。

在clusterProcessPacke函式中,首先嚐試在server.cluster->nodes字典中,以傳送者的名字為key尋找傳送者節點sender,因為此時節點B對於節點A一無所知,自然找不到對應的節點。

如果找不到傳送者節點,並且收到的報文為MEET報文,則以REDIS_NODE_HANDSHAKE為標誌,建立一個clusterNode結構表示節點A,該結構的ip和port分別置為節點A的ip和port,並將該節點插入到字典server.cluster->nodes中。並回復PONG包給節點A。這部分的程式碼如下:

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
        ...
        /* Add this node if it is new for us and the msg type is MEET.
         * In this stage we don't try to add the node with the right
         * flags, slaveof pointer, and so forth, as this details will be
         * resolved when we'll receive PONGs from the node. */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;

            node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
            nodeIp2String(node->ip,link);
            node->port = ntohs(hdr->port);
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
        ...
        /* Anyway reply with a PONG */
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

注意,節點B這裡呼叫createClusterNode函式建立clusterNode結構表示A節點時,也是以NULL為引數建立的,因此B不會設定A的名字,同樣以一個隨機串當做其名字,後續在節點B向節點A握手時,節點A會在PONG包中發來自己的名字。

         節點A在叢集埠上收到節點B發來的PONG回覆包之後,觸發其監聽埠上的可讀事件,呼叫回撥函式clusterReadHandler,同樣也呼叫clusterProcessPacket函式處理該包。

         同樣的,也是在server.cluster->nodes字典中,以包中傳送者的名字為key尋找匹配的節點。因為此時A尚不知道B的名字,因此還找不到對應的sender。

         此時在A中,節點B尚處於REDIS_NODE_HANDSHAKE狀態,因此,利用PONG包中B的名字更新節點B中的name屬性,並清除節點B標誌位中的REDIS_NODE_HANDSHAKE標記。並根據節點B在PONG包中填寫的角色資訊,將REDIS_NODE_MASTER或REDIS_NODE_SLAVE標記增加到B節點中的標誌位中。這部分的程式碼如下:

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        redisLog(REDIS_DEBUG,"%s packet received: %p",
            type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
            (void*)link->node);
        if (link->node) {
            if (nodeInHandshake(link->node)) {
                /* If we already have this node, try to change the
                 * IP/port of the node with the new one. */
                if (sender) {
                    ...    
                }

                /* First thing to do is replacing the random name with the
                 * right node name if this was a handshake stage. */
                clusterRenameNode(link->node, hdr->sender);
                redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
                    link->node->name);
                link->node->flags &= ~REDIS_NODE_HANDSHAKE;
                link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
        }
    }     

         至此,節點A向節點B的握手算是完成了。

         在節點B中,收到A發來的MEET包後,也建立了相應的節點,並插入到server.cluster->nodes中。因此在節點B的clusterCron中,也會向A發起TCP建鏈。並且在建鏈成功之後,向該節點發送PING包,表示B開始向A發起握手過程。

         A收到B發來的PING包後,會回覆一個PONG包。在B中,類似的,也呼叫clusterProcessPacket函式進行處理。同樣也在server.cluster->nodes字典中,以傳送者的名字尋找匹配的節點。因為之前B沒有設定A的名字,因此還找不到對應的sender。

         此時在B中,節點A尚處於REDIS_NODE_HANDSHAKE狀態,因此,利用PONG包中A的名字更新節點A中的name屬性,並清除節點A標誌位中的REDIS_NODE_HANDSHAKE標記。並根據節點A在PONG包中填寫的角色資訊,將REDIS_NODE_MASTER或REDIS_NODE_SLAVE標記增加到A節點中的標誌位中。

至此,節點B向節點A的握手也算是完成了。節點A和B它們算是相互認識了。

三:Gossip

這裡還有一個問題,如果叢集中共有N個節點的話,當有新節點加入進來時,難道對於其中的每個節點,都需要傳送一次”CLUSTER  MEET”命令,該節點才能被叢集中的其他節點所認識嗎?當然不會這麼做,只要通過Gossip協議,只需向叢集中的任一節點發送命令,新結點就能加入到叢集中,被其他所有節點所認識。

Gossip是分散式系統中被廣泛使用的協議,其主要用於實現分散式節點之間的資訊交換。Gossip演算法如其名,靈感來自於辦公室八卦,只要一個人八卦一下,在有限的時間內所有的人都會知道該八卦的資訊,也就是所謂的”一傳十,十傳百”。這種方式也與病毒傳播類似,因此Gossip有眾多的別名“閒話演算法”、“疫情傳播演算法”、“病毒感染演算法”、“謠言傳播演算法”。

Gossip的特點是:在一個有界網路中,每個節點都隨機地與其他節點通訊,經過一番雜亂無章的通訊,最終所有節點的狀態都會達成一致。每個節點可能知道所有其他節點,也可能僅知道幾個鄰居節點,只要這些節可以通過網路連通,最終他們的狀態都是一致的,當然這也是疫情傳播的特點。

Gossip是一個最終一致性演算法。雖然無法保證在某個時刻所有節點狀態一致,但可以保證在”最終“所有節點一致,”最終“是一個現實中存在,但理論上無法證明的時間點。但Gossip的缺點也很明顯,冗餘通訊會對網路頻寬、CPU資源造成很大的負載。

具體到Redis叢集中而言,Redis叢集中的每個節點,每隔一段時間就會向其他節點發送心跳包,心跳包中除了包含自己的資訊之外,還會包含若干我認識的其他節點的資訊,這就是所謂的gossip部分。

節點收到心跳包後,會檢查其中是否包含自己所不認識的節點,若有,就會向該節點發起握手流程。

舉個例子,如果叢集中,有A、B、C、D四個節點,A和B相互認識,C和D相互認識,此時只要客戶端向A傳送” CLUSTER  MEET nodeC_ip  nodeC_port”命令,則A在向節點C傳送MEET包時,該MEET包中還會帶有節點B的資訊,C收到該MEET包後,不但認識了A節點,也會認識B節點。同樣,C後續在向A和B傳送PING包時,該PING包中也會帶有節點D的資訊,這樣A和B也就認識了D節點。因此,經過一段時間之後,A、B、C、D四個節點就相互認識了。

在原始碼中,呼叫clusterSendPing函式向其他叢集節點發送心跳包或MEET包,心跳包可以是PING、PONG包。PING、PONG和MEET包,三種包的格式是一樣的,只是通過包頭中的type屬性來區分不同的包。該函式的原始碼如下,其中引數type指明瞭包的型別;link表示傳送報文的TCP連線:

void clusterSendPing(clusterLink *link, int type) {
    unsigned char *buf;
    clusterMsg *hdr;
    int gossipcount = 0; /* Number of gossip sections added so far. */
    int wanted; /* Number of gossip sections we want to append if possible. */
    int totlen; /* Total packet length. */
    /* freshnodes is the max number of nodes we can hope to append at all:
     * nodes available minus two (ourself and the node we are sending the
     * message to). However practically there may be less valid nodes since
     * nodes in handshake state, disconnected, are not considered. */
    int freshnodes = dictSize(server.cluster->nodes)-2;

    /* How many gossip sections we want to add? 1/10 of the number of nodes
     * and anyway at least 3. Why 1/10?
     *
     * If we have N masters, with N/10 entries, and we consider that in
     * node_timeout we exchange with each other node at least 4 packets
     * (we ping in the worst case in node_timeout/2 time, and we also
     * receive two pings from the host), we have a total of 8 packets
     * in the node_timeout*2 falure reports validity time. So we have
     * that, for a single PFAIL node, we can expect to receive the following
     * number of failure reports (in the specified window of time):
     *
     * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
     *
     * PROB = probability of being featured in a single gossip entry,
     *        which is 1 / NUM_OF_NODES.
     * ENTRIES = 10.
     * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
     *
     * If we assume we have just masters (so num of nodes and num of masters
     * is the same), with 1/10 we always get over the majority, and specifically
     * 80% of the number of nodes, to account for many masters failing at the
     * same time.
     *
     * Since we have non-voting slaves that lower the probability of an entry
     * to feature our node, we set the number of entires per packet as
     * 10% of the total nodes we have. */
    wanted = floor(dictSize(server.cluster->nodes)/10);
    if (wanted < 3) wanted = 3;
    if (wanted > freshnodes) wanted = freshnodes;

    /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
     * later according to the number of gossip sections we really were able
     * to put inside the packet. */
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += (sizeof(clusterMsgDataGossip)*wanted);
    /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
     * sizeof(clusterMsg) or more. */
    if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
    buf = zcalloc(totlen);
    hdr = (clusterMsg*) buf;

    /* Populate the header. */
    if (link->node && type == CLUSTERMSG_TYPE_PING)
        link->node->ping_sent = mstime();
    clusterBuildMessageHdr(hdr,type);

    /* Populate the gossip fields */
    int maxiterations = wanted*3;
    while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
        dictEntry *de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);
        clusterMsgDataGossip *gossip;
        int j;

        /* Don't include this node: the whole packet header is about us
         * already, so we just gossip about other nodes. */
        if (this == myself) continue;

        /* Give a bias to FAIL/PFAIL nodes. */
        if (maxiterations > wanted*2 &&
            !(this->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))
            continue;

        /* In the gossip section don't include:
         * 1) Nodes in HANDSHAKE state.
         * 3) Nodes with the NOADDR flag set.
         * 4) Disconnected nodes if they don't have configured slots.
         */
        if (this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
            (this->link == NULL && this->numslots == 0))
        {
            freshnodes--; /* Tecnically not correct, but saves CPU. */
            continue;
        }

        /* Check if we already added this node */
        for (j = 0; j < gossipcount; j++) {
            if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
                    REDIS_CLUSTER_NAMELEN) == 0) break;
        }
        if (j != gossipcount) continue;

        /* Add it */
        freshnodes--;
        gossip = &(hdr->data.ping.gossip[gossipcount]);
        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
        gossip->ping_sent = htonl(this->ping_sent);
        gossip->pong_received = htonl(this->pong_received);
        memcpy(gossip->ip,this->ip,sizeof(this->ip));
        gossip->port = htons(this->port);
        gossip->flags = htons(this->flags);
        gossip->notused1 = 0;
        gossip->notused2 = 0;
        gossipcount++;
    }

    /* Ready to send... fix the totlen fiend and queue the message in the
     * output buffer. */
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
    hdr->count = htons(gossipcount);
    hdr->totlen = htonl(totlen);
    clusterSendMessage(link,buf,totlen);
    zfree(buf);
}

包中不僅包含了當前節點的資訊,還會包含本節點所記錄的其他叢集節點的資訊,這就是所謂的gossip部分。接收者就是通過包中的gossip部分,認識其他叢集節點,更新其他節點狀態的。

這就面臨一個問題,包中需要包含多少個節點資訊呢?Redis目前是這樣規定的:gossip部分的節點數應該是所有節點數的1/10,但是最少應該包含3個節點資訊。之所以在gossip部分需要包含所有節點數的1/10,是為了能夠在下線檢測時間,也就是2倍的node_timeout時間內,如果有節點下線的話,能夠收到大部分叢集節點發來的,關於該節點的下線報告;

1/10這個數是這樣來的:如果共有N個叢集節點,在超時時間node_timeout內,當前節點最少會收到其他任一節點發來的4個心跳包:因節點最長經過node_timeout/2時間,就會其他節點發送一次PING包。節點收到PING包後,會回覆PONG包。因此,在node_timeout時間內,當前節點會收到節點A發來的兩個PING包,並且會收到節點A發來的,對於我發過去的PING包的回覆包,也就是2個PONG包。因此,在下線監測時間node_timeout*2內,會收到其他任一叢集節點發來的8個心跳包。因此,當前節點總共可以收到8*N個心跳包,每個心跳包中,包含下線節點資訊的概率是1/10,因此,收到下線報告的期望值就是8*N*(1/10),也就是N*80%,因此,這意味著可以收到大部分節點發來的下線報告。

變數freshnodes表示gossip部分可以包含節點數的最大值,該值是叢集節點總數減去2,這個2,包含當前節點自己,以及接收者節點;

變數wanted,就表示gossip部分需要包含的實際節點數,也就是總節點數的1/10;

接下來計算髮送報文佔用的總記憶體空間totlen,並且為報文申請記憶體;

如果傳送的PING包的話,還需要更新接收節點的ping_sent屬性;

接下來,呼叫clusterBuildMessageHdr,構建包頭資訊,包頭中主要是當前節點本身的資訊;

接下來開始在迴圈中,填充包的gossip部分,注意最大的迴圈遍歷次數為3*wanted。在迴圈中:

         首先從字典server.cluster->nodes中隨機取得一個節點;

         如果該節點就是當前節點本身,則直接過濾;

如果當前遍歷次數已經超過了2*wanted,並且該節點沒有標誌為下線或疑似下線,則直接過濾。這麼做是為了儘可能的在心跳包中包含下線節點的資訊;

如果該節點處於握手或者NOADDR狀態,或者當前節點與該節點沒有建鏈並且該節點沒有配置槽位,則直接過濾;

接下來,檢視該節點是否已經新增到gossip部分了,若是,則直接過濾;剩下的,就是將該節點資訊新增到gossip部分中;

心跳包構建完成之後,修正包的長度資訊totlen,並將gossip部分的節點數,以及包的總長度,填充到包頭中;最後,呼叫clusterSendMessage函式將包傳送出去;

噹噹前節點收到其他節點發來的PING、PONG或MEET包後,呼叫clusterProcessPacket處理這種型別的包時,會呼叫clusterProcessGossipSection函式處理包中的gossip部分。在該函式中,針對包中gossip部分中的每個節點,如果當前節點已認識該節點,則利用其中的節點資訊更新節點狀態,如果還不認識該節點,就會向該節點發起握手流程。

clusterProcessGossipSection函式的程式碼如下:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

    while(count--) {
        uint16_t flags = ntohs(g->flags);
        clusterNode *node;
        sds ci;

        ci = representRedisNodeFlags(sdsempty(), flags);
        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
        sdsfree(ci);

        /* Update our state accordingly to the gossip sections */
        node = clusterLookupNode(g->nodename);
        if (node) {
            /* We already know this node.
               Handle failure reports, only when the sender is a master. */
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
                    if (clusterNodeAddFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }
                    markNodeAsFailingIfNeeded(node);
                } else {
                    if (clusterNodeDelFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            /* If we already know this node, but it is not reachable, and
             * we see a different address in the gossip section, start an
             * handshake with the (possibly) new address: this will result
             * into a node address update if the handshake will be
             * successful. */
            if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
                (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        } else {
            /* If it's not in NOADDR state and we don't have it, we
             * start a handshake process against this IP/PORT pairs.
             *
             * Note that we require that the sender of this gossip message
             * is a well known node in our cluster, otherwise we risk
             * joining another cluster. */
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        }

        /* Next node */
        g++;
    }
}

首先得到sender:如果當前節點是作為客戶端,收到了服務端的回覆,則sender就是服務端節點;否則,就根據包中的傳送者資訊,在字典server.cluster->nodes中尋找相應的服務端節點,找不到則sender為NULL;

接下來,就是在迴圈中依次處理gossip部分中每一個節點資訊:首先將節點A的資訊記錄日誌;

然後根據節點名,在字典中server.cluster->nodes中尋找該節點,如果能找到該節點node,則這裡主要是下線檢測的流程,會在下一節中介紹,這裡暫時略過。

如果沒有找到node節點的資訊,並且有sender資訊(也就是sender已經是叢集中一個可信的節點了),並且節點標誌位中沒有REDIS_NODE_NOADDR標記,並且該節點不在黑名單中,這說明node節點是叢集中的新節點,因此呼叫clusterStartHandshake函式開始向該節點發起握手流程;

四:心跳訊息和下線檢測

1:心跳訊息

叢集中的每個節點,每隔一段時間就會向其他節點發送PING包,節點收到PING包之後,就會回覆PONG包。PING包和PONG包具有相同的格式,通過包頭的type欄位區分型別。因此,將PING和PONG包都稱為心跳包。

節點發送PING包的策略是:節點每隔1秒鐘,就會從字典server.cluster->nodes中,隨機挑選一個節點向其傳送PING包。而且,還會輪訓字典中的所有節點,如果已經超過 NODE_TIMEOUT/2的時間,沒有向該節點發送過PING包了,則會立即向該節點發送PING包。

節點發送PING包和收到PONG包時,會更新兩個時間屬性:ping_sent和pong_received。節點根據這兩個屬性判斷是否需要向其他節點發送PING,以及其他節點是否下線。這兩個屬性的更新策略是:

node->ping_sent:建立節點時,該屬性置為0,當向node節點發送PING包後,該屬性置為當時時間,當收到node節點對於PING的回覆PONG包之後,該屬性重置為0;

node->pong_received:建立節點時,該屬性置為0,向node節點發送PING包,當收到node節點對於PING的回覆PONG包之後,該屬性置為當時時間;

傳送PING包的邏輯是在叢集定時器函式clusterCron中處理的,這部分的程式碼如下:

void clusterCron(void) {
    ...
    /* Ping some random node 1 time every 10 iterations, so that we usually ping
     * one random node every second. */
    if (!(iteration % 10)) {
        int j;

        /* Check a few random nodes and ping the one with the oldest
         * pong_received time. */
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* Don't ping nodes disconnected or with a ping currently active. */
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
                continue;
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        if (min_pong_node) {
            redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }
    
    ...
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;

        if (node->flags &
            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
                continue;
        ...
        /* If we have currently no active ping in this instance, and the
         * received PONG is older than half the cluster timeout, send
         * a new ping now, to ensure all the nodes are pinged without
         * a too big delay. */
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }
        ...
    }
    dictReleaseIterator(di);
    ...
}   

函式中的iteration是個靜態變數,表示呼叫clusterCron函式的次數。因為該函式每隔100ms呼叫一次,因此該變數被10整除意味著1s的間隔時間。因此,每隔1s,就從字典server.cluster->nodes中隨機挑選5個節點,這5個節點滿足以下條件:連線正常,上一次向其傳送的PING包已經收到了回覆的PONG包;該節點不是我自己,也不處於握手狀態。

然後,從這5個隨機節點中,挑選出最早收到PONG回覆的那個節點,向其傳送PING包。

接下來,輪訓字典server.cluster->nodes,只要其中的節點不是我自己,沒有處於REDIS_NODE_NOADDR或者握手狀態,就對該node節點做相應的處理:

如果與node的連線正常,並且上一次傳送的PING包已經收到了相應的回覆PONG包,並且距離收到該PONG包已經超過了server.cluster_node_timeout/2的時間,則直接向該節點發送PING包;

這種傳送PING包的策略,如果NODE_TIMEOUT被置為一個較小值,而總結點數較大時,叢集內傳送心跳包的總數會是比較大的。因為只要當前節點已經超過 NODE_TIMEOUT/2的時間沒有向某個節點沒有傳送過PING包了,則會立即向其傳送PING包。比如,如果當前叢集中有100個節點,而NODE_TIMEOUT設定為60秒,則每個節點每隔30秒,就會向其他99個節點發送PING包,也就是說,每個節點平均每一秒就會發送3.3個PING包,100個節點,每秒就會發送330個PING包。

儘管可以降低發包數,但是目前尚未有關於頻寬問題的報告,因此目前還是採用這種方法來發送心跳包。

2:下線檢測

         Redis叢集節點是通過某個節點是否能及時回覆PING包來判斷該節點是否下線的。這裡的下線包括兩種狀態:疑似下線(PFAIL)和下線(FAIL)。

如果當前節點已經長時間沒有收到節點A對於PING包的回覆了,就會將節點A標記為疑似下線。因此所謂疑似下線,就是僅從當前節點的視角來看,節點A已經不可達了。但是節點A是否真正的下線了,還需要徵求其他節點的意見。

節點間互動的心跳包中,在其gossip部分會帶有節點的狀態資訊,如果當前節點在收到的其他節點發來的心跳包中,有大多數節點都把節點A標記為PFAIL了,則當前節點就會認為節點A確實下線了,就將其標記為FAIL,表示該節點A確實下線。一旦將A標記為FAIL後,當前節點就會立即通過FAIL包,將節點A下線的訊息廣播給其他所有節點,這樣最終所有節點都會標記節點A為FAIL狀態了。

疑似下線和下線,比較類似於哨兵中的主觀下線和客觀下線。

如果節點已經超過server.cluster_node_timeout的時間沒有回覆當前節點的PING包了,則當前節點就會將該節點標記為疑似下線。這部分邏輯是在定時器函式clusterCron中處理的,這部分的程式碼如下:

void clusterCron(void) {    
    ...
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;

        if (node->flags &
            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
                continue;

        ...

        /* If we are waiting for the PONG more than half the cluster
         * timeout, reconnect the link: maybe there is a connection
         * issue even if the node is alive. */
        if (node->link && /* is connected */
            now - node->link->ctime >
            server.cluster_node_timeout && /* was not already reconnected */
            node->ping_sent && /* we already sent a ping */
            node->pong_received < node->ping_sent && /* still waiting pong */
            /* and we are waiting for the pong more than timeout/2 */
            now - node->ping_sent > server.cluster_node_timeout/2)
        {
            /* Disconnect the link, it will be reconnected automatically. */
            freeClusterLink(node->link);
        }
        ...
        /* Check only if we have an active ping for this instance. */
        if (node->ping_sent == 0) continue;

        /* Compute the delay of the PONG. Note that if we already received
         * the PONG, then node->ping_sent is zero, so can't reach this
         * code at all. */
        delay = now - node->ping_sent;

        if (delay > server.cluster_node_timeout) {
            /* Timeout reached. Set the node as possibly failing if it is
             * not already in this state. */
            if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
                redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
                    node->name);
                node->flags |= REDIS_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);
    ...
}   

在輪訓字典server.cluster->nodes的過程中,只要其中的節點不是我自己,沒有處於REDIS_NODE_NOADDR或者握手狀態,就對該node節點做相應的處理:

如果與node節點的連線正常,並且建鏈時間已經超過了server.cluster_node_timeout,並且最近一次向該node節點發送的PING包,還沒有收到回覆的PONG包,並且距離最近一次向其傳送PING包,已經超過了server.cluster_node_timeout/2,則直接釋放該連線。這樣下一次呼叫clusterCron時會重新向該節點建鏈,這是因為雖然網路暫時有問題,但是該node節點可能還是正常的,這麼做可以避免因暫時的網咯問題,就標記該node節點下線;

如果距離上次向node傳送PING包,已經超過了server.cluster_node_timeout的時間,則只要該node節點尚未被標記為PFAIL或FAIL,則將其標記為PFAIL,因此該節點目前處於疑似下線的狀態;

一旦當前節點A將節點B標記為PFAIL之後,則當前節點A發出去的心跳包中,在gossip部分就可能會帶有節點B的資訊。其他節點C收到節點A的心跳包後,解析其中的gossip部分,發現B節點被A節點標記為PFAIL了,則就會將一個包含A節點的下線報告結構體clusterNodeFailReport插入到列表B->fail_reports中。

clusterNodeFailReport結構體的定義如下:

typedef struct clusterNodeFailReport {
    struct clusterNode *node;  /* Node reporting the failure condition. */
    mstime_t time;             /* Time of the last report from this node. */
} clusterNodeFailReport;

該結構體中,包含傳送下線報告的節點node,以及最近一次該節點發來下線報告的時間戳。

在節點結構體clusterNode中,有一個下線報告列表fail_reports,列表中的每個元素都是一個clusterNodeFailReport結構,該列表記錄了將該節點B標記為疑似下線的所有其他節點。因此節點C收到節點A對於節點B的下線報告後,就會將包含A節點的下線報告結構體clusterNodeFailReport插入到列表B->fail_reports中。

節點C每收到一次對於B節點的下線報告,就會統計列表B->fail_reports中,報告時間在2倍server.cluster_node_timeout內的元素個數,若元素個數已經超過了叢集節點的一半,則節點C就可以將節點B標記為下線(FAIL)了。

這部分的處理邏輯是在clusterProcessGossipSection函式中實現的。該函式的程式碼如下:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

    while(count--) {
        uint16_t flags = ntohs(g->flags);
        clusterNode *node;
        sds ci;

        ci = representRedisNodeFlags(sdsempty(), flags);
        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
        sdsfree(ci);

        /* Update our state accordingly to the gossip sections */
        node = clusterLookupNode(g->nodename);
        if (node) {
            /* We already know this node.
               Handle failure reports, only when the sender is a master. */
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
                    if (clusterNodeAddFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }
                    markNodeAsFailingIfNeeded(node);
                } else {
                    if (clusterNodeDelFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            /* If we already know this node, but it is not reachable, and
             * we see a different address in the gossip section, start an
             * handshake with the (possibly) new address: this will result
             * into a node address update if the handshake will be
             * successful. */
            if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
                (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        } else {
            /* If it's not in NOADDR state and we don't have it, we
             * start a handshake process against this IP/PORT pairs.
             *
             * Note that we require that the sender of this gossip message
             * is a well known node in our cluster, otherwise we risk
             * joining another cluster. */
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        }

        /* Next node */
        g++;
    }
}

首先得到sender:如果當前節點是作為客戶端,收到了服務端的回覆,則sender就是服務端節點;否則,就根據包中的傳送者資訊,在字典server.cluster->nodes中尋找相應的節點,找不到則sender為NULL;

接下來,就是在迴圈中依次處理gossip部分中每一個節點資訊:首先將節點A的資訊記錄日誌;

然後根據節點名,在字典中server.cluster->nodes中尋找該節點,如果能找到該節點node,並且sender不為NULL,並且sender為主節點,並且節點node不是我,則如果包中標記該節點node為FAIL或者PFAIL,則呼叫clusterNodeAddFailureReport,將sender節點的下線報告,追加到列表node->fail_reports中。然後呼叫markNodeAsFailingIfNeeded函式,在條件滿足的情況下,將node標註為FAIL,並向其他所有節點廣播發送FAIL包,以便能儘快通知其他節點。

如果包中沒有標註該節點為FAIL或PFAIL,則呼叫clusterNodeDelFailureReport,清除列表node->fail_reports中的sender節點的下線報告(如果有的話);

接下來,如果node節點已經被當前節點標註為PFAIL或者FAIL了,並且包中對於該節點的地址資訊與當前節點記錄的不一致,則可能該節點有了新的地址,因此呼叫clusterStartHandshake函式,開始向新地址發起握手流程;

剩下的是處理新結點的部分,之前已經解析過了,不再贅述。

markNodeAsFailingIfNeeded函式的程式碼如下:

void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    int needed_quorum = (server.cluster->size / 2) + 1;

    if (!nodeTimedOut(node)) return; /* We can reach it. */
    if (nodeFailed(node)) return; /* Already FAILing. */

    failures = clusterNodeFailureReportsCount(node);
    /* Also count myself as a voter if I'm a master. */
    if (nodeIsMaster(myself)) failures++;
    if (failures < needed_quorum) return; /* No weak agreement from masters. */

    redisLog(REDIS_NOTICE,
        "Marking node %.40s as failing (quorum reached).", node->name);

    /* Mark the node as failing. */
    node->flags &= ~REDIS_NODE_PFAIL;
    node->flags |= REDIS_NODE_FAIL;
    node->fail_time = mstime();

    /* Broadcast the failing node name to everybody, forcing all the other
     * reachable nodes to flag the node as FAIL. */
    if (nodeIsMaster(myself)) clusterSendFail(node->name);
    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

本函式用於在條件滿足的情況下,將節點node標記為下線(FAIL)狀態。這裡的條件是指:

node節點已經被當前節點標記為疑似下線了(PFAIL);

在node節點的下線報告列表node->fail_reports中,在2倍server.cluster_node_timeout的時間段內,有超過一半的節點都將node節點標記為PFAIL或FAIL了;

在函式中,如果node節點未被當前節點標記為PFAIL,則直接返回;如果node節點已經被標記為FAIL狀態了,則直接返回;

然後呼叫clusterNodeFailureReportsCount統計下線報告列表node->fail_reports中的元素個數failures。在clusterNodeFailureReportsCount中,會首先清除那些發來下線報告的時間已經超過2倍server.cluster_node_timeout的所有節點;

如果當前節點是主節點,則增加failures的值,因為當前節點也已把node節點標記為PFAIL了;

如果failures的值,沒有超過所有節點數的一半,則直接返回;

接下來就是將node節點標記為FAIL狀態了:首先清除node標誌位中的REDIS_NODE_PFAIL標記,然後將REDIS_NODE_FAIL增加到node標誌位中,更新node->fail_time為當前時間;如果當前節點為主節點,則呼叫clusterSendFail向起他節點廣播FAIL包,FAIL包中除了包頭以外,就僅包含下線節點的名字nodename;

其他節點收到FAIL包後,在包處理函式clusterProcessPacket中,立即將該節點標記為下線(FAIL),不管它之前是否已經將該節點標記為PFAIL了。這部分的程式碼如下:

    else if (type == CLUSTERMSG_TYPE_FAIL) {
        clusterNode *failing;

        if (sender) {
            failing = clusterLookupNode(hdr->data.fail.about.nodename);
            if (failing &&
                !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
            {
                redisLog(REDIS_NOTICE,
                    "FAIL message received from %.40s about %.40s",
                    hdr->sender, hdr->data.fail.about.nodename);
                failing->flags |= REDIS_NODE_FAIL;
                failing->fail_time = mstime();
                failing->flags &= ~REDIS_NODE_PFAIL;
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE);
            }
        } else {
            redisLog(REDIS_NOTICE,
                "Ignoring FAIL message from unknown node %.40s about %.40s",
                hdr->sender, hdr->data.fail.about.nodename);
        }
    } 

如果sender不為NULL,說明發送者是可信的。因此根據包中的節點名,從字典server.cluster->nodes中尋找對應的failing節點。如果能找到該failing節點,並且該節點尚未被標記為FAIL,並且該節點也不是當前節點本身,則將該failing節點標記為FAIL:

將REDIS_NODE_FAIL標記增加到節點標誌位中,更新failing->fail_time為當前時間;將標記REDIS_NODE_PFAIL從標誌位中清除;

如果sender為NULL,則說明當前節點還不認識傳送者,因此不做任何處理;

參考: