感覺作為paxos的升級精簡版 Raft在設計之初就以容易理解為目標 看完資料 腦海裡都有了大概的輪廓。
有了這些詳細的資料甚至是動畫演示在前 起始都沒多少好說的,本篇知識作為記錄下學習點,作為日後回顧提示
在分散式系統中,一致性指的是叢集中的多個節點在狀態上達成一致.但是在現實場景中,由於程式崩潰、網路故障、硬體故障、斷電等原因,節點間的一致性很難保證,這樣就需要Paxos、Raft等一致性協議。
Paxos協議是Leslie Lamport在1990年提出的一種基於訊息傳遞的、具有高度容錯特性的一致性演算法.但是Paxos有兩個明顯的缺點:第一個缺點就是Paxos演算法難以理解.第二個缺點就是並沒有提供構建現實系統的良好基礎,
有很多工程化Paxos演算法的嘗試,但是他們對Paxos演算法本身做了較大改動,彼此之間的實現差距都比較大
Raft演算法是一種用於管理複製日誌的一致性演算法,在設計Raft演算法時設計者就將易於理解作為目標之一,是的Raft演算法更易於構建實際的系統,大幅度減少了工程化的工作量。
1 Leader選舉
Raft協議的模式是一個Leader節點和多個Follower節點的模式。就是常說的Leader-Follower模式.每個節點有三個狀態Leader Follower Candidate狀態
Leader負責處理客戶端請求 並且將處理結果以log形式同步到其他Follower節點上
在Raft協議中有兩個時間控制Leader選舉的進度。
一個Leader定時向Follower傳送心跳包。
一個是選舉超時控制(election timeout),選舉超時控制就是一個處於Follower節點等待進入Candidate狀態的時間限制。
選舉超時控制(election timeout)一般在選擇150ms到300ms之間的隨機值(概率上避免多個節點同時進入Candidate狀態)
若某個節點election timeout進度完成之前都沒收到Leader的心跳包,則說明沒有Leader,該節點進入Candidate狀態.給自己投票,然後給其他節點發送選舉請求.
其他節點收到選舉請求後,若在當前請求中標記的任期(term)內比自己記錄的term相等或者更大,且未進行過投票,則回覆答應該投票請求,重置自己的選舉超時控制
選舉者獲取一半以上投票,進入Leader狀態,開始給其他節點Follower傳送心跳,維持自己的權威
下面來看看多個節點 選擇的情況 節點B D同時發起選舉投票,並且每個節點都獲取一張選票,最後的結果就是隨機選舉超時時間,選舉超時控制(election timeout)一般在選擇150ms到300ms之間的隨機值(概率上避免多個節點同時進入Candidate狀態) 。
最終,重複多次選舉投票後(概率很小),某個節點獲取一半以上投票,成為Leader。
- #pragma once
- #include <iostream>
- #include <fstream>
- #include <cassert>
- #include <string>
- #include <iostream>
- #include <vector>
- #include <map>
- using namespace std;
- /*
- *作 者: itdef
- *歡迎轉帖 請保持文字完整並註明出處
- *技術部落格 http://www.cnblogs.com/itdef/
- *技術交流群 群號碼:432336863
- *歡迎c c++ windows驅動愛好者 伺服器程式設計師溝通交流
- *部分老程式碼存放地點
- *http://www.oschina.net/code/list_by_user?id=614253
- */
- const string FILE_NAME = "config.txt";
- class ReadConfig {
- public:
- ReadConfig(string filename = "") {
- if (filename.empty()) {
- file_name = FILE_NAME;
- }
- else {
- file_name = filename;
- }
- }
- ~ReadConfig() {}
- map<string, string> Do() {
- tar_path.clear();
- ifstream fin;
- fin.open(file_name);
- if (false == fin.is_open()) {
- std::cerr << "open file failed!!" << std::endl;
- return tar_path;
- }
- string s;
- while (getline(fin, s))
- {
- if ('#' == s[] || ('/' == s[] && '/' == s[]))
- continue;
- size_t pos = s.find_first_of("=");
- if (pos == std::string::npos || pos + >= s.size())
- continue;
- string targetName = s.substr(, pos);
- string path = s.substr(pos + );
- std::cout << targetName << " = " << path << std::endl;
- if (path[] != ' ')
- tar_path[targetName] = path;
- }
- fin.close();
- return tar_path;
- }
- private:
- map<string, string> tar_path;
- string file_name;
- };
ReadConfig.h
- #pragma once
- #pragma once
- #include <string>
- #include <mutex>
- #include <map>
- const enum STATUS {
- LEADER_STATUS = ,
- FOLLOWER_STATUS,
- CANDIDATE_STATUS,
- PRE_VOTE_STAUS,
- };
- const enum INFOTYPE {
- DEFAULT_TYPE = ,
- HEART_BREAT_TYPE,
- VOTE_LEADER_TYPE,
- VOTE_LEADER_RESP_TYPE,
- };
- typedef struct netInfo {
- int fromID;
- int toID;
- INFOTYPE infotype;
- int term;
- int voteId; //選舉ID infotype為votetype才有效
- }NetInfo;
- typedef struct locaInfo {
- int id;
- int leaderID;
- STATUS status;
- int term;
- int isVote;
- int IsRecvHeartbeat;
- std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取自己一票
- }LocalInfo;
- typedef struct localInfoWithLock {
- LocalInfo locInfo;
- std::mutex m;
- }LocalInfoWithLock;
CommonStruct.h
- #pragma once
- #pragma once
- #include "CommonStruct.h"
- #include "ReadConfig.h"
- #include <memory>
- #include <boost/asio.hpp>
- using boost::asio::ip::tcp;
- using namespace std;
- class RaftManager :public enable_shared_from_this<RaftManager> {
- public:
- static std::shared_ptr<RaftManager> GetInstance() {
- if (p == nullptr)
- p.reset(new RaftManager());
- //p = std::make_shared<RaftManager>();
- return p;
- }
- ~RaftManager() {
- std::cout << "enter ~RaftManager()\n";
- }
- bool Init();
- bool Go();
- private:
- boost::asio::io_service io_service;
- std::string ip; int portStart;
- int nodeID;
- int electionTimeout;
- int heartbeatTime;
- LocalInfoWithLock locInfolock;
- //===============================send
- void DiapatchByStatus(int id, int& timeoutLimit);
- void HandleLeaderSend(int id, int& timeoutLimit);
- void HandleCandidateSend(int id, int& timeoutLimit);
- void HandleFollowerSend(int id, int& timeoutLimit);
- void HandlePreVoteSend(int id, int& timeoutLimit);
- //===================recv
- void DiapatchByInfoType(const NetInfo& netinf);
- void HandleHeartbeatTypeRecv(const NetInfo& netinf);
- void HandleVoteTypeRecv(const NetInfo& netinf);
- void HandleVoteRespTypeRecv(const NetInfo& netinf);
- std::function<int()> dice;
- bool LoopCheck(int id, std::shared_ptr<tcp::socket> s);
- void Session(tcp::socket sock);
- void SendFunc(int id);
- RaftManager() {}
- RaftManager(const RaftManager&) = delete;
- RaftManager& operator=(const RaftManager&) = delete;
- static std::shared_ptr<RaftManager> p;
- };
RaftManager.h
- #include "RaftManager.h"
- #include <random>
- #include <functional>
- std::shared_ptr<RaftManager> RaftManager::p = nullptr;
- bool RaftManager::Init() {
- //可以使用json 讀取配置
- ReadConfig cfg("nodeCfg");
- map<string, string> kv = cfg.Do();
- if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
- assert();
- return false;
- }
- ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]);
- electionTimeout = ;
- heartbeatTime = ;
- if (kv.find("heartbeatTime") != kv.end())
- heartbeatTime = stoi(kv["heartbeatTime"]);
- locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = ;
- locInfolock.locInfo.IsRecvHeartbeat = ; locInfolock.locInfo.isVote = ;
- locInfolock.locInfo.status = FOLLOWER_STATUS;
- locInfolock.locInfo.voteRecord.clear();
- std::random_device rd;
- std::default_random_engine engine(rd());
- std::uniform_int_distribution<> dis(, );
- dice = std::bind(dis, engine);
- return true;
- }
- void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) {
- if (timeoutLimit > ){
- timeoutLimit -= ;
- }
- if (timeoutLimit <= ) {
- timeoutLimit = dice();
- }
- }
- void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) {
- if (timeoutLimit > ) {
- timeoutLimit -= ;
- }
- if (timeoutLimit <= ) {
- timeoutLimit = dice();
- }
- }
- void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) {
- if (timeoutLimit > ) {
- timeoutLimit -= ;
- }
- if (timeoutLimit <= ) {
- timeoutLimit = dice();
- }
- }
- void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) {
- if (timeoutLimit > ) {
- timeoutLimit -= ;
- }
- if (timeoutLimit <= ) {
- LocalInfo localInfo;
- //加鎖獲取當前狀態 決定是否進行傳送操作
- {
- //加鎖獲取本地當前狀態
- std::lock_guard<std::mutex> lck(locInfolock.m);
- localInfo = locInfolock.locInfo;
- }
- if (localInfo.IsRecvHeartbeat == ) {
- //心跳超時 切換到選舉模式
- std::lock_guard<std::mutex> lck(locInfolock.m);
- locInfolock.locInfo.term++;
- locInfolock.locInfo.status = CANDIDATE_STATUS;
- locInfolock.locInfo.voteRecord.clear();
- locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term;
- }
- timeoutLimit = dice();
- }
- }
- //===================
- void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) {
- std::lock_guard<std::mutex> lck(locInfolock.m);
- if (netinf.fromID != locInfolock.locInfo.leaderID)
- locInfolock.locInfo.leaderID = netinf.fromID;
- locInfolock.locInfo.IsRecvHeartbeat = ;
- }
- void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) {
- std::lock_guard<std::mutex> lck(locInfolock.m);
- int voteid = netinf.fromID;
- if (locInfolock.locInfo.isVote == ) {
- //回覆投票 todo
- locInfolock.locInfo.isVote = ; //標記該term已經投票
- }
- else {
- //回覆不投票 todo
- }
- }
- void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) {
- std::lock_guard<std::mutex> lck(locInfolock.m);
- if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) {
- //更新本地map記錄
- locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
- }
- int count = ;
- std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
- //檢視本term的投票是否達半數以上
- while (it != locInfolock.locInfo.voteRecord.end()) {
- if (it->second == locInfolock.locInfo.term)
- count++;
- it++;
- }
- if (count > / ) {
- //達到半數以上 轉化為leader模式 否則繼續選舉
- locInfolock.locInfo.leaderID = nodeID;
- locInfolock.locInfo.IsRecvHeartbeat = ;
- locInfolock.locInfo.status = LEADER_STATUS;
- }
- }
- //loop send
- void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) {
- NetInfo netinf{ nodeID,id,DEFAULT_TYPE,, };
- LocalInfo localInfo;
- //加鎖獲取當前狀態 決定是否進行傳送操作
- {
- //加鎖獲取本地當前狀態
- std::lock_guard<std::mutex> lck(locInfolock.m);
- localInfo = locInfolock.locInfo;
- }
- switch (localInfo.status) {
- case LEADER_STATUS:
- HandleLeaderSend(id,timeoutLimit);
- break;
- case FOLLOWER_STATUS:
- HandleFollowerSend(id,timeoutLimit);
- break;
- case CANDIDATE_STATUS:
- HandleCandidateSend(id,timeoutLimit);
- break;
- case PRE_VOTE_STAUS:
- HandlePreVoteSend(id, timeoutLimit);
- default:
- std::cerr << "unknown status!!" << std::endl;
- }
- }
- //handle recv
- void RaftManager::DiapatchByInfoType(const NetInfo& netinf) {
- {
- std::lock_guard<std::mutex> lck(locInfolock.m);
- if (netinf.term < locInfolock.locInfo.term)
- return;
- if (netinf.term > locInfolock.locInfo.term) {
- locInfolock.locInfo.term = netinf.term;
- locInfolock.locInfo.status = FOLLOWER_STATUS;
- locInfolock.locInfo.isVote = ;
- locInfolock.locInfo.IsRecvHeartbeat = ;
- locInfolock.locInfo.voteRecord.clear();
- }
- }
- //========================================
- switch (netinf.infotype) {
- case HEART_BREAT_TYPE:
- HandleHeartbeatTypeRecv(netinf);
- break;
- case VOTE_LEADER_TYPE:
- HandleVoteTypeRecv(netinf);
- break;
- case VOTE_LEADER_RESP_TYPE:
- HandleVoteRespTypeRecv(netinf);
- break;
- default:
- std::cerr << "Recv Unknown info type." << std::endl;
- }
- }
- bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) {
- int looptime = ;
- int timeoutlimit = dice();
- while () {
- DiapatchByStatus(id, timeoutlimit);
- std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
- }
- return false;
- }
- void RaftManager::SendFunc(int i) {
- //todo
- //示例 間隔200ms掃描 心跳間隔5000ms 選舉超時未 1001-4000ms
- string port = "";
- port[port.size() - ] += i;
- int looptime = ;
- while () {
- std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
- tcp::resolver resolver(io_service);
- try {
- boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port }));
- }
- catch (exception& e) {
- //持續嘗試連線
- continue;
- }
- LoopCheck(i, s);
- std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
- }
- return;
- }
- void RaftManager::Session(tcp::socket sock) {
- BYTE data[] = { };
- boost::system::error_code error;
- NetInfo netinf;
- while () {
- size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
- if (error == boost::asio::error::eof)
- return; // Connection closed cleanly by peer.
- else if (error) {
- std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
- return;
- }
- if (length != sizeof(netinf)) {
- std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
- return;
- }
- DiapatchByInfoType(netinf);
- }
- }
- bool RaftManager::Go() {
- //建立網路 本來可以使用廣播 獲取和通知其他節點
- //演示版本假定 5個ID和埠分別為1 2 3 4 5 和9921 9922 9923 9924 9925
- if (ip == "" || portStart == || nodeID == )
- return false;
- try {
- //開啟4個與其他執行緒傳送資訊的執行緒
- for (int i = ; i <= ; i++) {
- if (i == nodeID)
- continue;
- std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i);
- t.detach();
- }
- int port = portStart + nodeID;
- tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
- for (;;)
- {
- for (;;)
- {
- tcp::socket sock(io_service);
- a.accept(sock);
- std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach();
- }
- }
- }
- catch (exception& e) {
- std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
- return false;
- }
- return true;
- }
RaftManager.cpp
- // QueueTemplate.cpp : 此檔案包含 "main" 函式。程式執行將在此處開始並結束。
- //
- #include "pch.h"
- #include <iostream>
- #include<list>
- #include<mutex>
- #include<thread>
- #include<condition_variable>
- #include <iostream>
- using namespace std;
- template<typename T>
- class SyncQueue
- {
- public:
- SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
- {
- }
- void Put(const T&x)
- {
- Add(x);
- }
- void Put(T&&x)
- {
- Add(std::forward<T>(x));
- }
- void Take(std::list<T>& list)
- {
- std::unique_lock<std::mutex> locker(m_mutex);
- m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
- if (m_needStop)
- return;
- list = std::move(m_queue);
- m_notFull.notify_one();
- }
- void Take(T& t)
- {
- std::unique_lock<std::mutex> locker(m_mutex);
- m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
- if (m_needStop)
- return;
- t = m_queue.front();
- m_queue.pop_front();
- m_notFull.notify_one();
- }
- void Stop()
- {
- {
- std::lock_guard<std::mutex> locker(m_mutex);
- m_needStop = true;
- }
- m_notFull.notify_all();
- m_notEmpty.notify_all();
- }
- bool Empty()
- {
- std::lock_guard<std::mutex> locker(m_mutex);
- return m_queue.empty();
- }
- bool Full()
- {
- std::lock_guard<std::mutex> locker(m_mutex);
- return m_queue.size() == m_maxSize;
- }
- size_t Size()
- {
- std::lock_guard<std::mutex> locker(m_mutex);
- return m_queue.size();
- }
- int Count()
- {
- return m_queue.size();
- }
- private:
- bool NotFull() const
- {
- bool full = m_queue.size() >= m_maxSize;
- if (full)
- cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
- return !full;
- }
- bool NotEmpty() const
- {
- bool empty = m_queue.empty();
- if (empty)
- cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
- return !empty;
- }
- template<typename F>
- void Add(F&&x)
- {
- std::unique_lock< std::mutex> locker(m_mutex);
- m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
- if (m_needStop)
- return;
- m_queue.push_back(std::forward<F>(x));
- m_notEmpty.notify_one();
- }
- private:
- std::list<T> m_queue; //緩衝區
- std::mutex m_mutex; //互斥量和條件變數結合起來使用
- std::condition_variable m_notEmpty;//不為空的條件變數
- std::condition_variable m_notFull; //沒有滿的條件變數
- int m_maxSize; //同步佇列最大的size
- bool m_needStop; //停止的標誌
- };
- int main()
- {
- std::cout << "Hello World!\n";
- SyncQueue<int> q();
- q.Put();
- int a = ;
- q.Take(a);
- q.Put();
- q.Take(a);
- q.Stop();
- }
syncqueue.h
自己嘗試做一個簡化的raft選舉演示
實現定義2-5個節點,使用讀取配置檔案來獲取IP和埠以及節點ID
網路使用boost同步流程
一個執行緒收 四個執行緒傳送
1 收的執行緒根據接受的資料 判斷是心跳包還是選舉請求還是選舉請求回覆 來更新自己的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了自己的選舉票map<int,int> // nodeid, term
2 傳送的節點每個200MS則輪詢一次,根據結點當前狀態減少等待時間(等待時間根據節點狀態調節為1000ms心跳間隔或者1500-5000的隨機選舉超時)
根據當前狀態決定傳送心跳包或者是選舉訊息 或者是選舉回覆訊息
待填坑
參考:
《etcd技術內幕》
http://thesecretlivesofdata.com/raft/#intro