Raft官網

論文中文翻譯

論文英文地址

感覺作為paxos的升級精簡版 Raft在設計之初就以容易理解為目標 看完資料 腦海裡都有了大概的輪廓。

有了這些詳細的資料甚至是動畫演示在前 起始都沒多少好說的,本篇知識作為記錄下學習點,作為日後回顧提示

在分散式系統中,一致性指的是叢集中的多個節點在狀態上達成一致.但是在現實場景中,由於程式崩潰、網路故障、硬體故障、斷電等原因,節點間的一致性很難保證,這樣就需要Paxos、Raft等一致性協議。

Paxos協議是Leslie Lamport在1990年提出的一種基於訊息傳遞的、具有高度容錯特性的一致性演算法.但是Paxos有兩個明顯的缺點:第一個缺點就是Paxos演算法難以理解.第二個缺點就是並沒有提供構建現實系統的良好基礎,

有很多工程化Paxos演算法的嘗試,但是他們對Paxos演算法本身做了較大改動,彼此之間的實現差距都比較大

Raft演算法是一種用於管理複製日誌的一致性演算法,在設計Raft演算法時設計者就將易於理解作為目標之一,是的Raft演算法更易於構建實際的系統,大幅度減少了工程化的工作量。

1 Leader選舉

Raft協議的模式是一個Leader節點和多個Follower節點的模式。就是常說的Leader-Follower模式.每個節點有三個狀態Leader Follower Candidate狀態

Leader負責處理客戶端請求 並且將處理結果以log形式同步到其他Follower節點上

在Raft協議中有兩個時間控制Leader選舉的進度。

一個Leader定時向Follower傳送心跳包。

一個是選舉超時控制(election timeout),選舉超時控制就是一個處於Follower節點等待進入Candidate狀態的時間限制。

選舉超時控制(election timeout)一般在選擇150ms到300ms之間的隨機值(概率上避免多個節點同時進入Candidate狀態)

若某個節點election timeout進度完成之前都沒收到Leader的心跳包,則說明沒有Leader,該節點進入Candidate狀態.給自己投票,然後給其他節點發送選舉請求.

其他節點收到選舉請求後,若在當前請求中標記的任期(term)內比自己記錄的term相等或者更大,且未進行過投票,則回覆答應該投票請求,重置自己的選舉超時控制

選舉者獲取一半以上投票,進入Leader狀態,開始給其他節點Follower傳送心跳,維持自己的權威

下面來看看多個節點 選擇的情況 節點B D同時發起選舉投票,並且每個節點都獲取一張選票,最後的結果就是隨機選舉超時時間,選舉超時控制(election timeout)一般在選擇150ms到300ms之間的隨機值(概率上避免多個節點同時進入Candidate狀態) 。

最終,重複多次選舉投票後(概率很小),某個節點獲取一半以上投票,成為Leader。

  1. #pragma once
  2. #include <iostream>
  3. #include <fstream>
  4. #include <cassert>
  5. #include <string>
  6. #include <iostream>
  7. #include <vector>
  8. #include <map>
  9. using namespace std;
  10. /*
  11. *作 者: itdef
  12. *歡迎轉帖 請保持文字完整並註明出處
  13. *技術部落格 http://www.cnblogs.com/itdef/
  14. *技術交流群 群號碼:432336863
  15. *歡迎c c++ windows驅動愛好者 伺服器程式設計師溝通交流
  16. *部分老程式碼存放地點
  17. *http://www.oschina.net/code/list_by_user?id=614253
  18. */
  19. const string FILE_NAME = "config.txt";
  20. class ReadConfig {
  21. public:
  22. ReadConfig(string filename = "") {
  23. if (filename.empty()) {
  24. file_name = FILE_NAME;
  25. }
  26. else {
  27. file_name = filename;
  28. }
  29. }
  30. ~ReadConfig() {}
  31. map<string, string> Do() {
  32. tar_path.clear();
  33. ifstream fin;
  34. fin.open(file_name);
  35. if (false == fin.is_open()) {
  36. std::cerr << "open file failed!!" << std::endl;
  37. return tar_path;
  38. }
  39. string s;
  40. while (getline(fin, s))
  41. {
  42. if ('#' == s[] || ('/' == s[] && '/' == s[]))
  43. continue;
  44. size_t pos = s.find_first_of("=");
  45. if (pos == std::string::npos || pos + >= s.size())
  46. continue;
  47. string targetName = s.substr(, pos);
  48. string path = s.substr(pos + );
  49. std::cout << targetName << " = " << path << std::endl;
  50. if (path[] != ' ')
  51. tar_path[targetName] = path;
  52. }
  53. fin.close();
  54. return tar_path;
  55. }
  56. private:
  57. map<string, string> tar_path;
  58. string file_name;
  59. };

ReadConfig.h

  1. #pragma once
  2. #pragma once
  3. #include <string>
  4. #include <mutex>
  5. #include <map>
  6.  
  7. const enum STATUS {
  8. LEADER_STATUS = ,
  9. FOLLOWER_STATUS,
  10. CANDIDATE_STATUS,
  11. PRE_VOTE_STAUS,
  12. };
  13.  
  14. const enum INFOTYPE {
  15. DEFAULT_TYPE = ,
  16. HEART_BREAT_TYPE,
  17. VOTE_LEADER_TYPE,
  18. VOTE_LEADER_RESP_TYPE,
  19.  
  20. };
  21.  
  22. typedef struct netInfo {
  23. int fromID;
  24. int toID;
  25. INFOTYPE infotype;
  26. int term;
  27. int voteId; //選舉ID infotype為votetype才有效
  28. }NetInfo;
  29.  
  30. typedef struct locaInfo {
  31. int id;
  32. int leaderID;
  33. STATUS status;
  34. int term;
  35. int isVote;
  36. int IsRecvHeartbeat;
  37. std::map<int, int> voteRecord;// id term有此記錄表示該term收到該id投取自己一票
  38. }LocalInfo;
  39.  
  40. typedef struct localInfoWithLock {
  41. LocalInfo locInfo;
  42. std::mutex m;
  43. }LocalInfoWithLock;

CommonStruct.h

  1. #pragma once
  2. #pragma once
  3. #include "CommonStruct.h"
  4. #include "ReadConfig.h"
  5. #include <memory>
  6. #include <boost/asio.hpp>
  7.  
  8. using boost::asio::ip::tcp;
  9. using namespace std;
  10.  
  11. class RaftManager :public enable_shared_from_this<RaftManager> {
  12. public:
  13. static std::shared_ptr<RaftManager> GetInstance() {
  14. if (p == nullptr)
  15. p.reset(new RaftManager());
  16. //p = std::make_shared<RaftManager>();
  17. return p;
  18. }
  19. ~RaftManager() {
  20. std::cout << "enter ~RaftManager()\n";
  21. }
  22. bool Init();
  23. bool Go();
  24.  
  25. private:
  26. boost::asio::io_service io_service;
  27. std::string ip; int portStart;
  28. int nodeID;
  29. int electionTimeout;
  30. int heartbeatTime;
  31. LocalInfoWithLock locInfolock;
  32.  
  33. //===============================send
  34. void DiapatchByStatus(int id, int& timeoutLimit);
  35. void HandleLeaderSend(int id, int& timeoutLimit);
  36. void HandleCandidateSend(int id, int& timeoutLimit);
  37. void HandleFollowerSend(int id, int& timeoutLimit);
  38. void HandlePreVoteSend(int id, int& timeoutLimit);
  39.  
  40. //===================recv
  41. void DiapatchByInfoType(const NetInfo& netinf);
  42. void HandleHeartbeatTypeRecv(const NetInfo& netinf);
  43. void HandleVoteTypeRecv(const NetInfo& netinf);
  44. void HandleVoteRespTypeRecv(const NetInfo& netinf);
  45.  
  46. std::function<int()> dice;
  47.  
  48. bool LoopCheck(int id, std::shared_ptr<tcp::socket> s);
  49. void Session(tcp::socket sock);
  50. void SendFunc(int id);
  51.  
  52. RaftManager() {}
  53. RaftManager(const RaftManager&) = delete;
  54. RaftManager& operator=(const RaftManager&) = delete;
  55. static std::shared_ptr<RaftManager> p;
  56. };

RaftManager.h

  1. #include "RaftManager.h"
  2. #include <random>
  3. #include <functional>
  4.  
  5. std::shared_ptr<RaftManager> RaftManager::p = nullptr;
  6.  
  7. bool RaftManager::Init() {
  8. //可以使用json 讀取配置
  9. ReadConfig cfg("nodeCfg");
  10. map<string, string> kv = cfg.Do();
  11.  
  12. if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
  13. assert();
  14. return false;
  15. }
  16. ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]);
  17. electionTimeout = ;
  18. heartbeatTime = ;
  19. if (kv.find("heartbeatTime") != kv.end())
  20. heartbeatTime = stoi(kv["heartbeatTime"]);
  21.  
  22. locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = ;
  23. locInfolock.locInfo.IsRecvHeartbeat = ; locInfolock.locInfo.isVote = ;
  24. locInfolock.locInfo.status = FOLLOWER_STATUS;
  25. locInfolock.locInfo.voteRecord.clear();
  26.  
  27. std::random_device rd;
  28. std::default_random_engine engine(rd());
  29. std::uniform_int_distribution<> dis(, );
  30. dice = std::bind(dis, engine);
  31.  
  32. return true;
  33. }
  34.  
  35. void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) {
  36. if (timeoutLimit > ){
  37. timeoutLimit -= ;
  38. }
  39. if (timeoutLimit <= ) {
  40.  
  41. timeoutLimit = dice();
  42. }
  43. }
  44. void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) {
  45. if (timeoutLimit > ) {
  46. timeoutLimit -= ;
  47. }
  48. if (timeoutLimit <= ) {
  49.  
  50. timeoutLimit = dice();
  51. }
  52.  
  53. }
  54.  
  55. void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) {
  56. if (timeoutLimit > ) {
  57. timeoutLimit -= ;
  58. }
  59. if (timeoutLimit <= ) {
  60.  
  61. timeoutLimit = dice();
  62. }
  63.  
  64. }
  65.  
  66. void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) {
  67. if (timeoutLimit > ) {
  68. timeoutLimit -= ;
  69. }
  70. if (timeoutLimit <= ) {
  71. LocalInfo localInfo;
  72. //加鎖獲取當前狀態 決定是否進行傳送操作
  73. {
  74. //加鎖獲取本地當前狀態
  75. std::lock_guard<std::mutex> lck(locInfolock.m);
  76. localInfo = locInfolock.locInfo;
  77. }
  78. if (localInfo.IsRecvHeartbeat == ) {
  79. //心跳超時 切換到選舉模式
  80. std::lock_guard<std::mutex> lck(locInfolock.m);
  81. locInfolock.locInfo.term++;
  82. locInfolock.locInfo.status = CANDIDATE_STATUS;
  83. locInfolock.locInfo.voteRecord.clear();
  84. locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term;
  85. }
  86.  
  87. timeoutLimit = dice();
  88. }
  89. }
  90.  
  91. //===================
  92. void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) {
  93. std::lock_guard<std::mutex> lck(locInfolock.m);
  94. if (netinf.fromID != locInfolock.locInfo.leaderID)
  95. locInfolock.locInfo.leaderID = netinf.fromID;
  96. locInfolock.locInfo.IsRecvHeartbeat = ;
  97.  
  98. }
  99. void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) {
  100. std::lock_guard<std::mutex> lck(locInfolock.m);
  101. int voteid = netinf.fromID;
  102. if (locInfolock.locInfo.isVote == ) {
  103. //回覆投票 todo
  104.  
  105. locInfolock.locInfo.isVote = ; //標記該term已經投票
  106. }
  107. else {
  108. //回覆不投票 todo
  109. }
  110.  
  111. }
  112. void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) {
  113. std::lock_guard<std::mutex> lck(locInfolock.m);
  114. if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) {
  115. //更新本地map記錄
  116. locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
  117. }
  118. int count = ;
  119. std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
  120. //檢視本term的投票是否達半數以上
  121. while (it != locInfolock.locInfo.voteRecord.end()) {
  122. if (it->second == locInfolock.locInfo.term)
  123. count++;
  124. it++;
  125. }
  126. if (count > / ) {
  127. //達到半數以上 轉化為leader模式 否則繼續選舉
  128. locInfolock.locInfo.leaderID = nodeID;
  129. locInfolock.locInfo.IsRecvHeartbeat = ;
  130. locInfolock.locInfo.status = LEADER_STATUS;
  131. }
  132. }
  133.  
  134. //loop send
  135. void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) {
  136. NetInfo netinf{ nodeID,id,DEFAULT_TYPE,, };
  137. LocalInfo localInfo;
  138. //加鎖獲取當前狀態 決定是否進行傳送操作
  139. {
  140. //加鎖獲取本地當前狀態
  141. std::lock_guard<std::mutex> lck(locInfolock.m);
  142. localInfo = locInfolock.locInfo;
  143. }
  144. switch (localInfo.status) {
  145. case LEADER_STATUS:
  146. HandleLeaderSend(id,timeoutLimit);
  147. break;
  148. case FOLLOWER_STATUS:
  149. HandleFollowerSend(id,timeoutLimit);
  150. break;
  151. case CANDIDATE_STATUS:
  152. HandleCandidateSend(id,timeoutLimit);
  153. break;
  154. case PRE_VOTE_STAUS:
  155. HandlePreVoteSend(id, timeoutLimit);
  156. default:
  157. std::cerr << "unknown status!!" << std::endl;
  158. }
  159.  
  160. }
  161.  
  162. //handle recv
  163. void RaftManager::DiapatchByInfoType(const NetInfo& netinf) {
  164. {
  165. std::lock_guard<std::mutex> lck(locInfolock.m);
  166. if (netinf.term < locInfolock.locInfo.term)
  167. return;
  168. if (netinf.term > locInfolock.locInfo.term) {
  169. locInfolock.locInfo.term = netinf.term;
  170. locInfolock.locInfo.status = FOLLOWER_STATUS;
  171. locInfolock.locInfo.isVote = ;
  172. locInfolock.locInfo.IsRecvHeartbeat = ;
  173. locInfolock.locInfo.voteRecord.clear();
  174. }
  175. }
  176. //========================================
  177. switch (netinf.infotype) {
  178. case HEART_BREAT_TYPE:
  179. HandleHeartbeatTypeRecv(netinf);
  180. break;
  181. case VOTE_LEADER_TYPE:
  182. HandleVoteTypeRecv(netinf);
  183. break;
  184. case VOTE_LEADER_RESP_TYPE:
  185. HandleVoteRespTypeRecv(netinf);
  186. break;
  187. default:
  188. std::cerr << "Recv Unknown info type." << std::endl;
  189. }
  190.  
  191. }
  192.  
  193. bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) {
  194. int looptime = ;
  195. int timeoutlimit = dice();
  196. while () {
  197. DiapatchByStatus(id, timeoutlimit);
  198. std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
  199. }
  200.  
  201. return false;
  202. }
  203.  
  204. void RaftManager::SendFunc(int i) {
  205. //todo
  206. //示例 間隔200ms掃描 心跳間隔5000ms 選舉超時未 1001-4000ms
  207. string port = "";
  208. port[port.size() - ] += i;
  209. int looptime = ;
  210. while () {
  211. std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
  212. tcp::resolver resolver(io_service);
  213. try {
  214. boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port }));
  215. }
  216. catch (exception& e) {
  217. //持續嘗試連線
  218. continue;
  219. }
  220. LoopCheck(i, s);
  221. std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
  222. }
  223.  
  224. return;
  225. }
  226.  
  227. void RaftManager::Session(tcp::socket sock) {
  228. BYTE data[] = { };
  229. boost::system::error_code error;
  230. NetInfo netinf;
  231. while () {
  232. size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
  233. if (error == boost::asio::error::eof)
  234. return; // Connection closed cleanly by peer.
  235. else if (error) {
  236. std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
  237. return;
  238. }
  239. if (length != sizeof(netinf)) {
  240. std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
  241. return;
  242. }
  243.  
  244. DiapatchByInfoType(netinf);
  245.  
  246. }
  247. }
  248.  
  249. bool RaftManager::Go() {
  250. //建立網路 本來可以使用廣播 獲取和通知其他節點
  251. //演示版本假定 5個ID和埠分別為1 2 3 4 5 和9921 9922 9923 9924 9925
  252. if (ip == "" || portStart == || nodeID == )
  253. return false;
  254. try {
  255. //開啟4個與其他執行緒傳送資訊的執行緒
  256. for (int i = ; i <= ; i++) {
  257. if (i == nodeID)
  258. continue;
  259. std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i);
  260. t.detach();
  261. }
  262.  
  263. int port = portStart + nodeID;
  264. tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
  265. for (;;)
  266. {
  267. for (;;)
  268. {
  269. tcp::socket sock(io_service);
  270. a.accept(sock);
  271. std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach();
  272. }
  273. }
  274. }
  275. catch (exception& e) {
  276. std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
  277. return false;
  278. }
  279.  
  280. return true;
  281. }

RaftManager.cpp

  1. // QueueTemplate.cpp : 此檔案包含 "main" 函式。程式執行將在此處開始並結束。
  2. //
  3.  
  4. #include "pch.h"
  5. #include <iostream>
  6.  
  7. #include<list>
  8. #include<mutex>
  9. #include<thread>
  10. #include<condition_variable>
  11. #include <iostream>
  12. using namespace std;
  13.  
  14. template<typename T>
  15. class SyncQueue
  16. {
  17. public:
  18. SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
  19. {
  20. }
  21.  
  22. void Put(const T&x)
  23. {
  24. Add(x);
  25. }
  26.  
  27. void Put(T&&x)
  28. {
  29. Add(std::forward<T>(x));
  30. }
  31.  
  32. void Take(std::list<T>& list)
  33. {
  34. std::unique_lock<std::mutex> locker(m_mutex);
  35. m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
  36.  
  37. if (m_needStop)
  38. return;
  39. list = std::move(m_queue);
  40. m_notFull.notify_one();
  41. }
  42.  
  43. void Take(T& t)
  44. {
  45. std::unique_lock<std::mutex> locker(m_mutex);
  46. m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
  47.  
  48. if (m_needStop)
  49. return;
  50. t = m_queue.front();
  51. m_queue.pop_front();
  52. m_notFull.notify_one();
  53. }
  54.  
  55. void Stop()
  56. {
  57. {
  58. std::lock_guard<std::mutex> locker(m_mutex);
  59. m_needStop = true;
  60. }
  61. m_notFull.notify_all();
  62. m_notEmpty.notify_all();
  63. }
  64.  
  65. bool Empty()
  66. {
  67. std::lock_guard<std::mutex> locker(m_mutex);
  68. return m_queue.empty();
  69. }
  70.  
  71. bool Full()
  72. {
  73. std::lock_guard<std::mutex> locker(m_mutex);
  74. return m_queue.size() == m_maxSize;
  75. }
  76.  
  77. size_t Size()
  78. {
  79. std::lock_guard<std::mutex> locker(m_mutex);
  80. return m_queue.size();
  81. }
  82.  
  83. int Count()
  84. {
  85. return m_queue.size();
  86. }
  87. private:
  88. bool NotFull() const
  89. {
  90. bool full = m_queue.size() >= m_maxSize;
  91. if (full)
  92. cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
  93. return !full;
  94. }
  95.  
  96. bool NotEmpty() const
  97. {
  98. bool empty = m_queue.empty();
  99. if (empty)
  100. cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
  101. return !empty;
  102. }
  103.  
  104. template<typename F>
  105. void Add(F&&x)
  106. {
  107. std::unique_lock< std::mutex> locker(m_mutex);
  108. m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
  109. if (m_needStop)
  110. return;
  111.  
  112. m_queue.push_back(std::forward<F>(x));
  113. m_notEmpty.notify_one();
  114. }
  115.  
  116. private:
  117. std::list<T> m_queue; //緩衝區
  118. std::mutex m_mutex; //互斥量和條件變數結合起來使用
  119. std::condition_variable m_notEmpty;//不為空的條件變數
  120. std::condition_variable m_notFull; //沒有滿的條件變數
  121. int m_maxSize; //同步佇列最大的size
  122.  
  123. bool m_needStop; //停止的標誌
  124. };
  125.  
  126. int main()
  127. {
  128. std::cout << "Hello World!\n";
  129.  
  130. SyncQueue<int> q();
  131. q.Put();
  132.  
  133. int a = ;
  134. q.Take(a);
  135.  
  136. q.Put();
  137. q.Take(a);
  138.  
  139. q.Stop();
  140.  
  141. }

syncqueue.h

自己嘗試做一個簡化的raft選舉演示

實現定義2-5個節點,使用讀取配置檔案來獲取IP和埠以及節點ID

網路使用boost同步流程

一個執行緒收 四個執行緒傳送

1 收的執行緒根據接受的資料 判斷是心跳包還是選舉請求還是選舉請求回覆  來更新自己的時間邏輯編號term 更新是否投票isVote 和最新term中那些節點投了自己的選舉票map<int,int> // nodeid, term

2 傳送的節點每個200MS則輪詢一次,根據結點當前狀態減少等待時間(等待時間根據節點狀態調節為1000ms心跳間隔或者1500-5000的隨機選舉超時)

根據當前狀態決定傳送心跳包或者是選舉訊息 或者是選舉回覆訊息

待填坑

參考:

《etcd技術內幕》

http://thesecretlivesofdata.com/raft/#intro