1. 程式人生 > >MQTT---HiveMQ原始碼詳解(二十)Cluster-Replicate/VectorClock

MQTT---HiveMQ原始碼詳解(二十)Cluster-Replicate/VectorClock

MQTT交流群:221405150

17章中我們講解了整個HiveMQ的Cluster的原理以及實現方式,值得一提的當然是資料的Replicate,以及當Replicate資料與本地資料存在衝突時,HiveMQ是如何實現的。

Replicate

  • 在每一條被 running node持久化的資料都會使用Primary環 Replicate。

  • 當node從JOINING狀態變更為RUNNING狀態前都會使用Primary環 Replicate。

  • 當node變更為MERGE_MINORITY成功後都會使用Primary環 Replicate。

  • 當node變更為MERGE_MINORITY成功後都會使用Minority環 Replicate。

VectorClock

幾乎所有做Cluster不可避免的就是需要解決衝突,各種解法比較多,其中VectorClock做法比較流行,下面我們看看HiveMQ如何實現即可,具體使用VectorClock的原因、以及原理我們就不過多描述了。

這裡寫圖片描述

注意:VectorClocks上的成員變數為Map<String, VectorClock>,
貌似是idea uml外掛顯示問題。

  • VectorClock持有vectors的一個node與vector對應關係,提供遞增、合併、比較這幾種功能/服務,以記錄一個key在一個node上的Vector。

  • VectorClocks持有每一個key的VectorClock,為每一個key提供新增、刪除、get等方法/服務, 通過VectorClocks就可以獲得到每個key的完整的VectorClock。

這裡寫圖片描述

  • 在node獲得到Replicate要求時,當本地資料與備份資料存在衝突時,就會使用VectorClock來進行解決衝突。

  • 在每個ClusterPersistence中都會持有一個VectorClocks用以解決衝突。

示例

下面我們就列舉一段ClientSessionClusterPersistenceImpl處理Replica請求時,當存在衝突解決衝突的程式碼。

public ListenableFuture<Void> handleReplica(@NotNull String clientId, @NotNull ClientSession clientSession, long
requestTimestamp, VectorClock requestVectorClock) { Preconditions.checkNotNull(clientId, "Client id must not be null"); Preconditions.checkNotNull(clientSession, "Client session must not be null"); return getExecutor(clientId).add(() -> { VectorClock localVectorClock = vectorClocks.get(clientId); //當請求的向量時鐘在比本地向量時鐘之前或者相當,則忽略本次備份 if (requestVectorClock.before(localVectorClock) || requestVectorClock.equals(localVectorClock)) { return null; } //當本地向量時鐘在請求向量時鐘之前,則直接儲存即可 if (localVectorClock.before(requestVectorClock)) { vectorClocks.put(clientId, requestVectorClock); clientSessionLocalPersistence.persistent(clientId, clientSession, requestTimestamp); } else { //當兩個向量時鐘一致,則合併解決衝突 localVectorClock.merge(requestVectorClock); localVectorClock.increment(clusterConnection.getClusterId()); vectorClocks.put(clientId, localVectorClock); ClientSession localClientSession = clientSessionLocalPersistence.get(clientId); if (!localClientSession.isConnected() && clientSession.isConnected()) { clientSessionLocalPersistence.persistent(clientId, clientSession, requestTimestamp); } } return null; }); }