1. 程式人生 > >使用GO實現Paxos分散式一致性協議

使用GO實現Paxos分散式一致性協議

什麼是Paxos分散式一致性協議

最初的服務往往都是通過單體架構對外提供的,即單Server-單Database模式。隨著業務的不斷擴充套件,使用者和請求數都在不斷上升,如何應對大量的請求就成了每個服務都需要解決的問題,這也就是我們常說的高併發。為了解決單臺伺服器面對高併發的蒼白無力,可以通過增加伺服器數量來解決,即多Server-單Database(Master-Slave)模式,此時的壓力就來到了資料庫一方,資料庫的IO效率決定了整個服務的效率,繼續增加Server數量將無法提升服務效能。這就衍生出了當前火熱的微服務架構。當用戶請求經由負載均衡分配到某一服務例項上後,如何保證該服務的其他例項最終能夠得到相同的資料變化呢?這就要用到Paxos分散式一致性協議,Paxos解決的就是最終一致性問題,也就是一段時間後,無論get哪一個服務例項,都能獲取到相同的資料。目前國內外的分散式產品很多都使用了Paxos協議,可以說Paxos幾乎就是一致性協議的標準和代名詞。

Paxos有兩種協議,我們常常提到的其實是Basic Paxos,另一種叫Multi Paxos,如無特殊說明,本文中提到的Paxos協議均為Basic Paxos。

Paxos協議是由圖靈獎獲得者Leslie Lamport於1998年在其論文《The Part-Time Parliament》中首次提出的,講述了一個希臘小島Paxos是如何通過決議的。但由於該論文晦澀艱深,當時的計算機界大牛們也沒幾個人能理解。於是Lamport2001年再次發表了《Paxos Made Simple》,摘要部分是這麼寫的:

The Paxos algorithm, when presented in plain English, is very simple.

翻譯過來就是:不會吧,不會吧,這麼簡單的Paxos演算法不會真的有人弄不懂吧?然而事實卻是很多人對Paxos都望而卻步,理解Paxos其實並不難,但是Paxos的難點在於工程化,如何利用Paxos協議寫出一個能過夠真正在生產環境中跑起來的服務才是Paxos最難的地方,關於Paxos的工程化可以參考微信後臺團隊撰寫的《微信自研生產級paxos類庫PhxPaxos實現原理介紹》

Paxos如何保證一致性的

Paxos協議一共有兩個階段:Prepare和Propose,兩種角色:Proposer和Acceptor,每一個服務例項既是Proposer,同時也是Acceptor,Proposer負責提議,Acceptor決定是否接收來自Proposer的提議,一旦提議被多數接受,那麼我們就可以宣稱對該提議包含的值達成了一致,而且不會再改變。

階段一:Prepare 準備

  1. Proposer生成全域性唯一ProposalID(時間戳+ServerID)
  2. Proposer向所有Acceptor(包括Proposer自己)傳送Prepare(n = ProposalID)請求
  3. Acceptor比較n和minProposal, if n > minProposal, minProposal = n,Acceptor返回已接受的提議(acceptedProposal, acceptedValue)
    • 承諾1:不再接受n <= minProposal的Prepare請求
    • 承諾2:不再接受n < minProposal的Propose請求
    • 應答1:返回此前已接受的提議
  4. 當Proposer收到大於半數的返回後
    • Prepare請求被拒絕,重新生成ProposalID併發送Prepare請求
    • Prepare請求被接受且有已接受的提議,選擇最大的ProposalID對應的值作為提議的值
    • Prepare請求被接受且沒有已接受的提議,可選擇任意提議值

階段二:Propose 提議

  1. Proposer向所有Acceptor(包括Proposer自己)傳送Accept(n=ProposalID,value=ProposalValue)請求
  2. Acceptor比較n和minProposal, if n >= minProposal, minProposal = n, acceptedValue = value,返回已接受的提議(minProposal,acceptedValue)
  3. 當Proposer收到大於半數的返回後
    • Propose請求被拒絕,重新生成ProposalID併發送Prepare請求
    • Propose請求被接受,則資料達成一致性

一旦提議被半數以上的服務接受,那麼我們就可以宣稱整個服務叢集在這一提議上達成了一致。

 

需要注意的是,在一個服務叢集中以上兩個階段是很有可能同時發生的。 例如:例項A已完成Prepare階段,併發送了Propose請求。同時例項B開始了Prepare階段,並生成了更大的ProposalID傳送Prepare請求,可能導致例項A的Propose請求被拒絕。 每個服務例項也是同時在扮演Proposer和Acceptor角色,向其他服務傳送請求的同時,可能也在處理別的服務發來的請求。

 

使用GO語言實現Paxos協議

服務註冊與發現

由於每個服務例項都是在執行相同的程式碼,那我們要如何知曉其他服務例項的入口呢(IP和埠號)?方法之一就是寫死在程式碼中,或者提供一份配置檔案。服務啟動後可以讀取該配置檔案。但是這種方法不利於維護,一旦我們需要移除或新增服務則需要在每個機器上重新休息配置檔案。

除此之外,我們可以通過一個第三方服務:服務的註冊與發現來註冊並獲知當前叢集的總服務例項數,即將本地的配置檔案改為線上的配置服務。

服務註冊:Register函式,服務例項啟動後通過呼叫這個RPC方法將自己註冊在服務管理中

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    server := args.ServerInfo
    for _, server := range s.Servers {
        if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
            reply.Succeed = false
            return nil
        }
    }
    reply.ServerID = len(s.Servers)
    reply.Succeed = true
    s.Servers = append(s.Servers, server)

    fmt.Printf("Current registerd servers:\n%v\n", s.Servers)

    return nil
}

服務發現:GetServers函式,服務通過呼叫該RPC方法獲取所有服務例項的資訊(IP和埠號)

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
	// return all servers
	reply.ServerInfos = s.Servers

	return nil
}

Prepare階段

Proposer,向所有的服務傳送Prepare請求,並等待直到半數以上的服務返回結果,這裡也可以等待所有服務返回後再處理,但是Paxos協議可以容忍小於半數的服務宕機,因此我們只等待大於N/2個返回即可。當返回的結果有任何一個請求被拒絕,那Proposer即認為這次的請求被拒絕,返回重新生成ProposalID併發送新一輪的Prepare請求。

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
	returnedReplies := make([]PrepareReply, 0)

	for _, otherS := range allServers {
		// use a go routine to call every server
		go func(otherS ServerInfo) {
			delay := rand.Intn(10)
			time.Sleep(time.Second * time.Duration(delay))
			args := PrepareArgs{s.Info, proposal.ID}
			reply := PrepareReply{}
			fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
			if Call(otherS, "Server.Prepare", &args, &reply) {
				if reply.HasAcceptedProposal {
					fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
				} else {
					fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
				}
				s.mu.Lock()
				returnedReplies = append(returnedReplies, reply)
				s.mu.Unlock()
			}
		}(otherS)
	}

	for {
		// wait for responses from majority
		if len(returnedReplies) > (len(allServers))/2.0 {
			checkReplies := returnedReplies

			// three possible response
			// 1. deny the prepare, and return an empty/accepted proposal
			//    as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
			// 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
			// 3. accept the prepare, and return an accepted proposal

			// check responses from majority
			// find the response with max proposal id
			acceptedProposal := NewProposal()

			for _, r := range checkReplies {
				// if any response refused the prepare, this server should resend prepare
				if !r.PrepareAccepted {
					return r
				}

				if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
					acceptedProposal = r.AcceptedProposal
				}
			}

			// if some other server has accepted proposal, return that proposal with max proposal id
			// if no other server has accepted proposal, return an empty proposal
			return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
		}

		//fmt.Printf("Waiting for response from majority...\n")
		time.Sleep(time.Second * 1)
	}
}

Acceptor,通過比較ProposalID和minProposal,如果ProposalID小於等於minProposal,則拒絕該Prepare請求,否則更新minProposal為ProposalID。最後返回已接受的提議

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	// 2 promises and 1 response
	// Promise 1
	// do not accept prepare request which ProposalID <= minProposalID
	// Promise 2
	// do not accept propose request which ProposalID < minProposalID
	// Response 1
	// respond with accepted proposal if any

	if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
		// ready to accept the proposal with Id s.minProposalID
		s.minProposalID = args.ProposalID
	}
	reply.HasAcceptedProposal = s.readAcceptedProposal()
	reply.AcceptedProposal = s.Proposal

	return nil
}

Propose階段

Proposer,同樣首先向所有的服務傳送Propose請求,並等待知道半數以上的服務返回結果。如果返回的結果有任何一個請求被拒絕,則Proposer認為這次的請求被拒絕,返回重新生成ProposalID併發送新一輪的Prepare請求

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
	returnedReplies := make([]ProposeReply, 0)

	for _, otherS := range allServers {
		go func(otherS ServerInfo) {
			delay := rand.Intn(5000)
			time.Sleep(time.Millisecond * time.Duration(delay))
			args := ProposeArgs{otherS, proposal}
			reply := ProposeReply{}
			fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
			if Call(otherS, "Server.Propose", &args, &reply) {
				fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
				s.mu.Lock()
				returnedReplies = append(returnedReplies, reply)
				s.mu.Unlock()
			}
		}(otherS)
	}

	for {
		// wait for responses from majority
		if len(returnedReplies) > (len(allServers))/2.0 {
			checkReplies := returnedReplies

			for _, r := range checkReplies {
				if !r.ProposeAccepted {
					return r
				}
			}

			return checkReplies[0]
		}

		time.Sleep(time.Second * 1)
	}
}

Acceptor,通過比較ProposalID和minProposal,如果ProposalID小於minProposal,則拒絕該Propose請求,否則更新minProposal為ProposalID,並將提議持久化到本地磁碟中。

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
	if s.minProposalID <= args.Proposal.ID {
		s.mu.Lock()
		s.minProposalID = args.Proposal.ID
		s.Proposal = args.Proposal
		s.SaveAcceptedProposal()
		s.mu.Unlock()

		reply.ProposeAccepted = true
	}

	reply.ProposalID = s.minProposalID

	return nil
}

 

執行

執行結果:

這裡我一共開啟了3個服務例項,並在每次請求之前加入了隨機的延遲,模擬網路通訊中的延遲,因此每個服務的每個請求並不是同時發出的

動圖一張:

靜態結果一張:

可以看到3個服務儘管一開始會嘗試以他們自己的埠號(5001,5002,5003)作為提議值,在Prepare/Propose失敗後,都會重新生成更大的ProposalID並開啟新一輪的提議過程(Prepare,Propose),且最後都以5003達成一致。

小結

至此,我們就用GO實現了Paxos協議的核心邏輯。但顯而易見的是,這段程式碼仍然存在很多問題,完全無法滿足生產環境的需求

  • 通過channel而不是mutex鎖來共享資料
  • 如何處理服務例項的移除和增加
  • 如何避免陷入活鎖

下一篇我們將探討如何解決上述問題。

 

最後歡迎關注我的個人公眾號:SoBrian,期待與大家共同交流,共同成長!

 

 

&n