1. 程式人生 > >[從原始碼學設計]螞蟻金服SOFARegistry之訊息匯流排非同步處理

[從原始碼學設計]螞蟻金服SOFARegistry之訊息匯流排非同步處理

# [從原始碼學設計]螞蟻金服SOFARegistry之訊息匯流排非同步處理 [toc] ## 0x00 摘要 SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。 本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。 本文為第五篇,介紹SOFARegistry訊息匯流排的非同步處理。 ## 0x01 為何分離 前文我們講述了SOFARegistry的訊息匯流排,本文我們講講一個變種 DataChangeEventCenter。 DataChangeEventCenter 是被獨立出來的,專門處理資料變化相關的訊息。 為什麼要分離呢?因為: - 從架構說,DataChangeEventCenter 是專門處理資料變化訊息,這是一種解耦; - 從技術上來說,DataChangeEventCenter 也和 EventCenter 有具體實現技巧的不同,所以需要分開處理; - 但更深入的原因是業務場景不同,下面分析中我們可以看出,DataChangeEventCenter 和業務耦合的相當緊密; ## 0x02 業務領域 ### 2.1 應用場景 DataChangeEventCenter 的獨特業務場景如下: - 需要提供歸併功能。即短期內會有多個通知來到,不需要逐一處理,只處理最後一個即可; - 非同步處理訊息; - 需要保證訊息順序; - 有延遲操作; - 需要提高處理能力,並行處理; 因此,DataChangeEventCenter 程式碼和業務聯絡相當緊密,前文的 EventCenter 已經不適合了。 ### 2.2 延遲和歸併 關於延遲和歸併操作,我們單獨說明下。 #### 2.2.1 業務特點 螞蟻金服業務的一個**特點**是:通過連線敏感的特性對服務宕機做到秒級發現。 因此 SOFARegistry 在健康檢測的設計方面決定“服務資料與服務釋出者的實體連線繫結在一起,斷連馬上清資料”,簡稱此特點叫做連線敏感性。連線敏感性是指在 SOFARegistry 裡所有 Client 都與 SessionServer 保持長連線,每條長連線都設定基於 SOFABolt 的連線心跳,如果長連線斷連客戶端立即發起重新建連,時刻保持 Client 與 SessionServer 之間可靠的連線。 #### 2.2.2 問題 但帶來了一個問題就是:可能因為網路問題,短期內會出現大量重新建連操作。比如只是網路問題導致連線斷開,實際的服務程序沒有宕機,此時客戶端立即發起重新連線 SessionServer 並且重新註冊所有服務資料。 但是 假如此過程耗時足夠短暫(例如 500ms 內發生斷連和重連),服務訂閱者**應該
**感受不到服務下線。從而 SOFARegistry 內部應該做相應處理。 #### 2.2.3 解決 SOFARegistry 內部做了歸併和延遲操作來保證使用者不受影響。比如 DataServer 內部的資料通過 mergeDatum 延遲合併變更的 Publisher 服務資訊,version 是合併後最新的版本號。 對於 DataChangeEventCenter,就是通過訊息的延遲和歸併來協助完成這個功能。 ### 2.3 螞蟻金服實現 下面是 DataChangeEventCenter 總體的功能描述: - 當有資料釋出者 publisher 上下線時,會分別觸發 publishDataProcessor 或 unPublishDataHandler; - Handler 首先會判斷當前節點的狀態: - 若是非工作狀態則返回請求失敗; - 若是工作狀態,Handler 會往 dataChangeEventCenter 中新增一個數據變更事件,則觸發資料變化事件中心 DataChangeEventCenter 的 onChange 方法。用於非同步地通知事件變更中心資料的變更; - 事件變更中心收到該事件之後,會往佇列中加入事件。此時 dataChangeEventCenter 會根據不同的事件型別非同步地對上下線資料進行相應的處理; - 與此同時,DataChangeHandler 會把這個事件變更資訊通過 ChangeNotifier 對外發布,通知其他節點進行資料同步; ## 0x03 DataChangeEventCenter ### 3.1 總述 DataChangeEventCenter具體分成四部分: - Event Center:組織成訊息中心; - Event Queue:用於多路分別處理,增加處理能力; - Event Task:每一個Queue內部啟動一個執行緒,用於非同步處理,增加處理能力; - Event Handler:用於處理內部ChangeData; 接下來我們一一介紹,因為 DataChangeEventCenter 和業務結合緊密,所以我們會深入結合業務進行講解。 ### 3.2 DataChangeEventCenter #### 3.2.1 定義 DataChangeEventCenter 中維護著一個 DataChangeEventQueue 佇列陣列,這是核心。陣列中的每個元素是一個事件佇列。具體定義如下: ```java public class DataChangeEventCenter { /** * count of DataChangeEventQueue */ private int queueCount; /** * queues of DataChangeEvent */ private DataChangeEventQueue[] dataChangeEventQueues; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumCache datumCache; } ``` #### 3.2.2 訊息型別 DataChangeEventCenter 專門處理 IDataChangeEvent 型別訊息,其具體實現為三種: - public class ClientChangeEvent implements IDataChangeEvent - public class DataChangeEvent implements IDataChangeEvent - public class DatumSnapshotEvent implements IDataChangeEvent 這些不同型別的訊息可以放入同一個佇列,具體放入哪個佇列,是根據特定判別方式來決定
,比如根據Publisher的DataInfoId來做hash,以此決定放入哪個Queue。 即,當對應 handler 的 onChange 方法被觸發時,會計算該變化服務的 dataInfoId 的 Hash 值,從而進一步確定出該服務註冊資料所在的佇列編號,進而把該變化的資料封裝成一個數據變化物件,傳入到佇列中。 #### 3.2.3 初始化 在初始化函式中,構建了EventQueue,每一個Queue啟動了一個執行緒,用來處理訊息。 ```java @PostConstruct public void init() { if (isInited.compareAndSet(false, true)) { queueCount = dataServerConfig.getQueueCount(); dataChangeEventQueues = new DataChangeEventQueue[queueCount]; for (int idx = 0; idx < queueCount; idx++) { dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache); dataChangeEventQueues[idx].start(); } } } ``` #### 3.2.4 Put 訊息 put訊息比較簡單,具體如何判別應該把Event放入哪一個Queue是根據具體方式來判斷,比如根據Publisher的DataInfoId來做hash,以此決定放入哪個Queue: ```java int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); ``` #### 3.2.5 如何處理訊息 具體是通過 dataChangeEventQueues.onChange 來做處理,比如如下幾個函式,分別處理不同的訊息型別。具體都是找到queue,然後呼叫: ```java public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } } public void onChange(ClientChangeEvent event) { for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } } public void onChange(DatumSnapshotEvent event) { for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) { dataChangeEventQueue.onChange(event); } } public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) { int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); } ``` ### 3.3 DataChangeEvent 因為 DataChangeEvent 最常用,所以我們單獨拿出來說明。 DataChangeEvent會根據DataChangeTypeEnum和DataSourceTypeEnum來進行區分,就是處理型別和訊息來源。 DataChangeTypeEnum具體分為: - MERGE,如果變更型別是MERGE,則會更新快取中需要更新的新Datum,並且更新版本號; - COVER,如果變更型別是 COVER,則會覆蓋原有的快取; DataSourceTypeEnum 具體分為: - PUB :pub by client; - PUB_TEMP :pub temporary data; - SYNC:sync from dataservers in other datacenter; - BACKUP:from dataservers in the same datacenter; - CLEAN:local dataInfo check,not belong this node schedule remove; - SNAPSHOT:Snapshot data, after renew finds data inconsistent; 具體定義如下: ```java public class DataChangeEvent implements IDataChangeEvent { /** * type of changed data, MERGE or COVER */ private DataChangeTypeEnum changeType; private DataSourceTypeEnum sourceType; /** * data changed */ private Datum datum; } ``` ### 3.4 DataChangeEventQueue DataChangeEventQueue 是這個子模組的核心,用於多路分別處理,增加處理能力
。每一個Queue內部啟動一個執行緒,用於非同步處理,也能增加處理能力。 #### 3.4.1 核心變數 這裡的核心是: - Block