1. 程式人生 > >架構設計|非同步請求如何同步處理?

架構設計|非同步請求如何同步處理?

本文創意來自一次業務需求,這次需要接入一個第三方外部服務。由於這個服務只提供非同步 API,為了不影響現有系統同步處理的方式,接入該外部服務時,應用對外遮蔽這種差異,內部實現非同步請求同步。

全文摘要:

  • 非同步給現有架構帶來的問題
  • Dubbo 非同步轉同步解決方法
  • 非同步轉同步架構設計方案

0x00. 前言

現有一個系統,整體架構如下所示:

這是一個很常見的同步設計方案,上游系統需要等待下游系統介面返回呼叫結果。

現在需要接入另外一個第三方服務 B,該服務與服務 A 最大區別在於,這是一個非同步 API。呼叫之後,僅僅返回受理成功,處理結果後續通過非同步通知返回。

接入之後,整體架構如下所示:

由於網路隔離策略,通知接收程式與通訊服務需要單獨分開部署。若沒此要求,可以將通訊服務 B 與通知接收程式合併成一個應用。

另外圖中所有應用採用雙節點部署。

為了不影響 OpenAPI 上游系統同步處理邏輯,通訊服務 B 呼叫第三方服務之後,不能立刻返回,需要等待結果通知,拿到具體返回結果。這就需要通訊服務 B 內部將非同步轉為同步。

這就是一個典型的非同步轉同步問題,整個過程涉及兩個問題。

  1. 通訊服務 B 業務執行緒如何進入等待狀態?又如何喚醒正確等待執行緒?
  2. 由於通訊服務 B 雙節點部署,通知接收程式如何將結果轉發到正在等待處理的節點?

問題 1 的解決方案參考了 Dubbo 設計思路。

我們在使用 Dubbo 呼叫遠端服務時,預設情況下,這是一種阻塞式呼叫方式,即 Consumer 端程式碼一直阻塞等待,直到 Provider 端返回為止。

由於 Dubbo 底層基於 Netty 傳送網路請求,這其是一個非同步的過程。為了讓業務執行緒能同步等待,這個過程就需要將非同步轉為同步。

0x01. Dubbo 非同步轉同步解決辦法

1.1 業務執行緒同步阻塞

Dubbo 發起遠端呼叫程式碼位於 DubboInvoker#doInvoke

Dubbo 版本為:2.6.X 版本。2,7.X 重構 DefaultFuture ,但是本質原理還是一樣。

預設情況下,Dubbo 支援同步呼叫方式,這裡將會建立 DefaultFuture

物件。

這裡有個非常重要邏輯,每個請求生成一個唯一 ID,然後將 IDDefaultFuture 對映關係,存入 Map 中。

這個請求 ID 在之所以這麼重要,是因為消費者併發呼叫服務傳送請求,同時將會有多個業務執行緒進入阻塞。當收到響應之後,我們需要喚醒正確的等待執行緒,並將處理結果返回。

通過 ID 這個唯一對映的關係,很自然可以找到其對應 DefaultFuture,喚醒其對應的業務執行緒。

業務執行緒呼叫 DefaultFuture#get方法進入阻塞。這段程式碼比較簡單,通過呼叫 Condition#await 阻塞線層。

1.2 喚醒業務執行緒

當消費者接收到服務提供者的返回結果,將會呼叫 DefaultFuture#received 方法。

通過響應物件中的唯一 ID,找到其對應 DefaultFuture 物件,從而將結果設定 DefaultFuture 物件中,然後喚醒的相應的業務執行緒。

這裡實際有個優化點,使用 done#signalAll 代替 done#signal。使用 condition 等待通知機制的時候需要注意這一點。

詳情參考:https://github.com/apache/dubbo/issues/3678

1.3 設計注意點

正常情況下,當消費者接收到響應之後,將會從 FUTURES 這個 Map 移除 DefaultFuture

但是在異常情況下,服務提供者若處理緩慢,不能及時返回響應結果,消費者業務執行緒將會因為超時甦醒。這種情況下 FUTURES 積壓了無效 DefaultFuture 物件。如果不及時清理,極端情況下,將會發生 OOM 。

DefaultFuture 內部將會開啟一個非同步執行緒,定時輪詢 FUTURES 判斷 DefaultFuture 超時時間,及時清理已經無效(超時)的 DefaultFuture

0x02. 轉發方案設計

根據 Dubbo 解決思路,問題 1 解決辦法就比較簡單了。具體流程如下:

  1. 通訊服務 B 內部生成一個唯一請求 ID ,發給第三方服務
  2. 若請求成功,內部版使用 Map 儲存對應關係,並使業務執行緒阻塞等待
  3. 通訊服務 B 收到非同步通知結果,通過 ID 查詢對應業務執行緒,喚醒的相應的執行緒

這個設計過程需要注意設定合理的超時時間,這個超時時間需要考慮遠端服務呼叫耗時,可以參考如下公式:

業務執行緒等待時間=通訊服務 B 介面的超時時間 - 呼叫第三方服務 B 介面消耗時間

這裡就不貼出具體的程式碼,詳細程式碼參考 Dubbo DefaultFuture

接下來重點看下通知服務如何將結果轉發給正確的通訊服務 B 的節點。這裡想到兩種方案:

  1. SocketServer 方案
  2. MQ 方案

2.1 SocketServer

通訊服務 B 使用 SocketServer 構建一個服務接收程式,當通知接收程式收到第三方服務 B 通知時,通過 Socket 將結果轉發給通訊服務 B。

整個系統架構如下所示:

由於生產服務雙節點部署,通知接收程式就不能寫死轉發地址。這裡我們將請求 ID 與通訊服務 B socket 服務地址關係存入 Redis 中,然後通知接收程式通過 ID 找到正確的地址。

這個方案說實話有點複雜。

第一 SocketServer 編碼難度較大,編寫一個高效 SocketServer 就比較難,一不小心可能產生各種 Bug。

第二通訊服務 B 服務地址配置在配置檔案中,由於兩個節點地址不同,這就導致同一應用存在不同配置。這對於後面維護就很不友好。

第三額外引入 Redis 依賴,系統複雜度變高。

2.2 MQ 方案

相對 SocketServer 方案,MQ 方案相對簡單,這裡採用 MQ 廣播消費的方式,架構如圖所示:

通知接收程式收到非同步通知之後,直接將結果傳送到 MQ

通訊服務 B 開啟廣播消費模式,拉取 MQ 訊息。

通訊服務 B_1 拉取訊息,通過請求 ID 對映關係,沒找到內部等待的執行緒,知道這不是自己的等待訊息,於是 B_1 直接丟棄即可。

通訊服務 B_2 拉取訊息,通過請求 ID 對映關係,順利找到正在等待的執行緒,然後可以喚醒等待執行緒,返回最後的結果。

對比 SocketServer 方案,MQ 方案整體流程比較簡單,程式設計難度低,也沒用存在特殊的配置。

不過這個方案十分依賴 MQ 訊息實時性,若 MQ 訊息投遞延遲很高,這就會導致通訊服務 B 業務執行緒超時甦醒,業務異常返回。

這裡我們選擇使用 RocketMQ,長輪詢 Pull 方式,可保證訊息非常實時,

綜上,這裡採用 MQ 的方案。

0x03. 總結

非同步轉同步我們需要解決同步阻塞,以及如何喚醒的問題。

阻塞/喚醒可以分別使用 Condition#await/signalAll。不過這個過程我們需要生成一個唯一請求 ID,並且儲存這個 ID 與業務執行緒對映關係。後續等到結果返回我們才能通過唯一 ID 喚醒正確等待執行緒。

只要瞭解上面幾點,非同步轉同步的問題就就可以迎刃而解。

另外,如果你也有碰到非同步轉同步問題,本文的方案希望對你有幫助。如果你有其他設計方案,歡迎留言,一起討論~

參考資料

  1. http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html
  2. http://dubbo.apache.org/zh-cn/blog/dubbo-invoke.html

最會說一句 (求關注)

這篇文章其實寫了挺久的,寫的挺難得。之前很早想到寫這篇文章,但是沒想好到底咋寫,艱難產出。

看到這裡,點個關注呀,點個讚唄。別下次一定啊,大哥。寫文章很辛苦的,需要來點正反饋。

才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。

感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注

歡迎關注我的公眾號:程式通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn