Redis原始碼閱讀(五)叢集-故障遷移(上)
Redis原始碼閱讀(五)叢集-故障遷移(上)
故障遷移是叢集非常重要的功能;直白的說就是在叢集中部分節點失效時,能將失效節點負責的鍵值對遷移到其他節點上,從而保證整個集群系統在部分節點失效後沒有丟失資料,仍能正常提供服務。這裡先拋開Redis實際的做法,我們可以自己想下對於Redis叢集應該怎麼做故障遷移,哪些關鍵點是必須要實現的。然後再去看Redis原始碼中具體的實現,是否覆蓋了我們想到的關鍵點,有哪些設計是我們沒有想到的,這樣看程式碼的效果會比較好。
我在思考故障遷移這個功能時,首先想到的是節點發生故障時要很快被叢集中其他節點發現,儘量縮短叢集不可用的時間;其次就是要選出失效節點上的資料可以被遷移到哪個節點上;在選擇遷移節點時最好能夠考慮節點的負載,避免遷移造成部分節點負載過高。另外,失效節點的資料在其失效前就應該實時的複製到其他節點上,因為一般情況下節點失效有很大概率是機器不可用,如果沒有事先執行過資料複製,節點資料就丟失了。最後,就是遷移的執行,除了要將失效節點原有的鍵值對資料遷移到其他節點上,還要將失效節點原來負責的槽也遷移到其他節點上,而且槽和鍵值對應該同步遷移,要避免槽被分配到節點A而槽所對應的鍵值對被分配到節點B的情況。
總結起來有實現叢集故障遷移要實現下面關鍵點:
1. 節點失效事件能被集群系統很快的發現
2. 遷移時要能選擇合適的節點
3. 節點資料需要實時複製,在失效後可以直接使用複製的資料進行遷移
4. 遷移要注意將槽和鍵值對同步遷移
看過Redis原始碼後,發現Redis的故障遷移也是以主備複製為基礎的,也就是說需要給每個叢集主節點配置從節點,這樣主節點的資料天然就是實時複製的,在主節點出現故障時,直接在從節點中選擇一個接替失效主節點,將該從節點升級為主節點並通知到叢集中所有其他節點即可,這樣就無需考慮上面提到的第三點和第四點。如果叢集中有節點沒有配置從節點,那麼就不支援故障遷移。
故障檢測
Redis的叢集是無中心的,無法通過中心定時向各個節點發送心跳來判斷節點是否故障。在Redis原始碼中故障的檢測分三步:
1. 節點互發ping訊息,將Ping超時的節點置為疑似下線節點
在這一步中,每個節點都會向其他節點發送Ping訊息,來檢測其他節點是否和自己的連線有異常。但要注意的是即便檢測到了其他節點Ping訊息超時,也不能簡單的認為其他節點是失效的,因為有可能是這個節點自己的網路異常,無法和其他節點通訊。所以在這一步只是將檢測到超時的節點置為疑似下線。例如:節點A向節點B傳送Ping發現超時,則A會將節點B的狀態置為疑似下線並儲存在自己記錄的叢集節點資訊中,儲存的疑似下線資訊就是之前提過的clusterState.nodes裡對應的失效節點的flags狀態值。
// 預設節點超時時限
#define REDIS_CLUSTER_DEFAULT_NODE_TIMEOUT 15000
2. 向其他節點共享疑似下線節點
在檢測到某個節點為疑似下線之後,會將這個節點的疑似下線情況分享給叢集中其他的節點,分享的方式也是通過互發Ping訊息,在ping訊息中會帶上叢集中隨機的三個節點的狀態,前面在分析叢集初始化時,曾介紹過利用gossip協議擴散叢集節點狀態給整個叢集,這裡節點的疑似下線狀態也是通過這種方式傳播給其他節點的。每條ping訊息會帶最多三個隨機節點的狀態資訊
void clusterSendPing(clusterLink *link, int type) { //隨機算去本節點所在叢集中的任意兩個其他node節點(不包括link本節點和link對應的節點)資訊傳送給link對應的節點 unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; int gossipcount = 0, totlen; /* freshnodes is the number of nodes we can still use to populate the * gossip section of the ping packet. Basically we start with the nodes * we have in memory minus two (ourself and the node we are sending the * message to). Every time we add a node we decrement the counter, so when * it will drop to <= zero we know there is no more gossip info we can * send. */ int freshnodes = dictSize(server.cluster->nodes)-2; //除去本節點和接收本ping資訊的節點外,整個叢集中有多少其他節點 // 如果傳送的資訊是 PING ,那麼更新最後一次傳送 PING 命令的時間戳 if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); // 將當前節點的資訊(比如名字、地址、埠號、負責處理的槽)記錄到訊息裡面 clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ // 從當前節點已知的節點中隨機選出兩個節點 // 並通過這條訊息捎帶給目標節點,從而實現 gossip 協議 // 每個節點有 freshnodes 次傳送 gossip 資訊的機會 // 每次向目標節點發送 3 個被選中節點的 gossip 資訊(gossipcount 計數) while(freshnodes > 0 && gossipcount < 3) { // 從 nodes 字典中隨機選出一個節點(被選中節點) dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); clusterMsgDataGossip *gossip; ////pingpong meet訊息體部分用該結構 int j; if (this == myself || this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* otherwise we may loop forever. */ continue; } /* Check if we already added this node */ // 檢查被選中節點是否已經在 hdr->data.ping.gossip 數組裡面 // 如果是的話說明這個節點之前已經被選中了 // 不要再選中它(否則就會出現重複) for (j = 0; j < gossipcount; j++) {//這裡是避免前面隨機選擇clusterNode的時候重複選擇相同的節點 if (memcmp(hdr->data.ping.gossip[j].nodename,this->name, REDIS_CLUSTER_NAMELEN) == 0) break; } if (j != gossipcount) continue; /* Add it */ // 這個被選中節點有效,計數器減一 freshnodes--; // 指向 gossip 資訊結構 gossip = &(hdr->data.ping.gossip[gossipcount]); // 將被選中節點的名字記錄到 gossip 資訊 memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN); // 將被選中節點的 PING 命令傳送時間戳記錄到 gossip 資訊 gossip->ping_sent = htonl(this->ping_sent); // 將被選中節點的 PING 命令回覆的時間戳記錄到 gossip 資訊 gossip->pong_received = htonl(this->pong_received); // 將被選中節點的 IP 記錄到 gossip 資訊 memcpy(gossip->ip,this->ip,sizeof(this->ip)); // 將被選中節點的埠號記錄到 gossip 資訊 gossip->port = htons(this->port); // 將被選中節點的標識值記錄到 gossip 資訊 gossip->flags = htons(this->flags); // 這個被選中節點有效,計數器增一 gossipcount++; } // 計算資訊長度 totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); // 將被選中節點的數量(gossip 資訊中包含了多少個節點的資訊) // 記錄在 count 屬性裡面 hdr->count = htons(gossipcount); // 將資訊的長度記錄到資訊裡面 hdr->totlen = htonl(totlen); // 傳送資訊 clusterSendMessage(link,buf,totlen); }
收到ping訊息的節點,如果發現ping訊息中帶的某個節點屬於疑似下線狀態,則找到自身記錄該節點的ClusterNode結構,並向該結構的下線報告連結串列中插入一條上報記錄,上報源頭為發出Ping的節點。例如:節點A向節點C傳送了ping訊息, ping訊息中帶上B節點狀態,並且B節點狀態為疑似下線,那麼C節點收到這個Ping訊息之後,就會查詢自身記錄節點B的clusterNode,向這個clusterNode的fail_reports連結串列中插入來自A的下線報告。
3. 收到叢集中超過半數的節點認為某節點處於疑似下線狀態,則判定該節點下線,並廣播
判定的時機是在每次收到一條ping訊息的時候,當發現ping訊息中帶有某節點的疑似下線狀態後,除了加入該節點的下線報告以外,還會呼叫markNodeAsFailingIfNeeded函式來嘗試判斷該節點是否已經被超過半數的節點判斷為疑似下線,如果是的話,就將該節點狀態置為下線,並呼叫clusterSendFail函式將下線狀態廣播給所有已知節點。這裡廣播不是通過訂閱分發的方式,而是遍歷所有節點,並給每個節點單獨傳送訊息。
void clusterSendFail(char *nodename) { //如果超過一半的主節點認為該nodename節點下線了,則需要把該節點下線資訊同步到整個cluster叢集 unsigned char buf[sizeof(clusterMsg)]; clusterMsg *hdr = (clusterMsg*) buf; // 建立下線訊息 clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); // 記錄命令 memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN); // 廣播訊息 clusterBroadcastMessage(buf,ntohl(hdr->totlen)); }
void clusterBroadcastMessage(void *buf, size_t len) { //buf裡面的內容為clusterMsg+clusterMsgData dictIterator *di; dictEntry *de; // 遍歷所有已知節點 di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); // 不向未連線節點發送資訊 if (!node->link) continue; // 不向節點自身或者 HANDSHAKE 狀態的節點發送資訊 if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; // 傳送資訊 clusterSendMessage(node->link,buf,len); } dictReleaseIterator(di);
從節點判斷自己所屬的主節點下線,則開始進入故障轉移流程。如果主節點下只有一個從節點,那麼很自然的可以直接進行切換,但如果主節點下的從節點不只一個,那麼還需要選出一個新的主節點。這裡的選舉過程使用了比較經典的分散式一致性演算法Raft,下一篇會介紹Redis中選舉新主節點的過程。