3.2 master選舉機制
3.2.1 選舉演算法
1)bully演算法
核心思想
- 假定所有的節點都具有一個可以比較的ID,通過比較這個ID來選舉master
流程說明
- 節點向所有比自己ID大的節點發送選舉資訊(election),告訴他們我選你
- 如果收到了回覆訊息(alive),這說明有人比自己“資歷”更老,要讓他去做老大,他只能乖乖等著老大選舉
- 等待老大成功選舉的訊息(victory)
- 如果超時之後還沒有成功選舉訊息,那麼重新發送選舉資訊
- 如果沒有收到任何回覆訊息(alive),那麼就自己當老大,同時向其他節點發送當選資訊(victory)
示例
首先,我們有6個節點的叢集,所有節點都互聯,P6是master
P6掛了
P3發現P6掛了,於是向所有比自己ID大的節點發送選舉訊息(election)
- 要給P6發的原因是P6有可能恢復了,所以P6也要發
P4和P5都收到了訊息,並表示他們會接手,你就不用管了(bully P3)
P4開始接管選主流程,它開始向P5和P6傳送選舉資訊
只有P5相應了,P5從這裡開始接管選舉(bully p4)
P5傳送選舉資訊
沒有人能響應P5的選舉資訊,於是P5當選master,同時告訴別人他是master
優缺點
- 優點
- 簡單粗暴,只要我比你大,我就來組織選舉
- 缺點
- master假死會使得叢集狀態不穩定。假定P6在P5釋出當選資訊後重新上線,P5檢測到P6的話,又會重新開啟選舉,因為P6的id比P5大
- 腦裂問題,當出現網路分割槽的時候,一個叢集可能會選舉出兩個master(因為網路通訊受限)
2)raft演算法
raft演算法首先將系統中角色定義為三種:leader、follower、candidate。同時將系統一致性拆分為Leader選舉(Leader election)、日誌同步(Log replication)、安全性(Safety)、日誌壓縮(Log compaction)、成員變更(Membership change)等多個子問題。這裡,我們只討論Leader election
核心思想
- 每個leader都有一個任期(term),在它的任期內,他是老大;只要發現有人的任期比自己大,他就會無條件的加入
選主流程
- follower在一段時間內沒有收到leader傳送來的確認資訊之後會轉變為candidate
- candidate等待投票請求
- 收到投票請求,投票,然後等待選舉結果
- 超時,給自己投票,傳送投票請求
- 收到足夠投票請求後,成功當選leader,開始維護叢集
參考資料
3.2.2 選舉實現
1)es6.8
邏輯流程圖
原始碼
叢集初始化
/**
* the main function of a join thread. This function is guaranteed to join the cluster
* or spawn a new join thread upon failure to do so.
*/
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
} if (joinThreadControl.joinThreadActive(currentThread) == false) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
} // 如果當前節點是被選出來的master,那麼他就成功當選master,開始接受其他節點的連線請求
// 如果沒有成功當選master,那麼就去加入master
// 這裡也解釋了為什麼在判斷存活master的時候不能把自己算進去。因為把自己算進去的話,所有節點都會認為自己是master,
if (transportService.getLocalNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
joinThreadControl.markThreadAsDone(currentThread);
}
} @Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
} );
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected"); // send join request
final boolean success = joinElectedMaster(masterNode); synchronized (stateMutex) {
if (success) {
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
if (currentMasterNode == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) {
// update cluster state
joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
} joinThreadControl.markThreadAsDone(currentThread);
} else {
// failed to join. Try again...
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
選主邏輯
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
} final DiscoveryNode localNode = transportService.getLocalNode(); // add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false; fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState())); // filter responses
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && localNode.equals(pingResponse.master()) == false) {
activeMasters.add(pingResponse.master());
}
} // nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
} // activeMasters為空的時候有兩種情況:1.當前節點能看到的所有節點都選出了一個共同的master,且那個節點就是本地節點;2.沒有master
// 1 --> 需要釋出選主資訊,告訴別人,master是誰
// 2 --> 既然大家都沒有master,那麼就來嘗試選舉master
// activeMasters不為空時,表示其他節點已經選出了一個master,當前節點要做的事情就是加入這個master
if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
assert activeMasters.contains(localNode) == false :
"local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
法定人數判斷邏輯
// 變數 minimumMasterNodes 就是配置項 discovery.zen.minimum_master_nodes
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}
節點比較邏輯
/**
* compares two candidates to indicate which the a better master.
* A higher cluster state version is better
*
* @return -1 if c1 is a batter candidate, 1 if c2.
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
分析
什麼時候開始選主投票?
- 叢集剛啟動時
- master檢測到其他節點離開時
- 其他節點檢測到master離開時
在選主進行的時候,有新的節點加怎麼辦?
- ES會暫時擱置這些加入請求,直到選主結束之後再來處理。如果本地節點成功當選,就接收這些連線請求;如果沒有成功當選,則丟棄這些請求
- 這些新發現的節點不會被計算到候選者中
每個節點選舉出來的master可能不一樣,是怎麼做到不腦裂的?
- ping過程中發現的候選者數量要大於等於設定項
discovery.zen.minimum_master_nodes
- ping過程中發現的候選者數量要大於等於設定項
為什麼會出現腦裂,不是已經有
discovery.zen.minimum_master_nodes
配置了嗎?- 假設一開始叢集規模為3,那麼配置為2是沒有任何問題的。但是,一旦叢集規模擴大到7,那麼合理的配置因為為4。於是,新節點的配置為4,而老節點的配置為2。如果沒有及時更新老節點的配置,就會存在腦裂的風險(試想一下,在主節點掛掉時,2箇舊節點又恰好和4個新節點產生了網路分割槽,而由於節點配置項不統一,就會導致腦裂)
2)es7.13
流程圖
- 和標準raft演算法的不同之處
- 在叢集啟動時,節點預設為candidate
- candidate不做任何投票限制,這有可能導致產生多個leader,ES選擇的是最新的leader(term最大的)
- candidate在投票的時候,最後才會給自己投票,防止出現同票現象
為什麼說新版本杜絕了腦裂問題?
因為新版本中的法定投票人數不再由設定決定,而是變成了一個動態更新的值。由ES在依據存活節點數量來判斷是否有足夠的參與人數
public boolean hasQuorum(Collection<String> votes) {
final HashSet<String> intersection = new HashSet<>(nodeIds);
intersection.retainAll(votes);
return intersection.size() * 2 > nodeIds.size();
}