1. 程式人生 > >曹工雜談:分散式事務解決方案之基於本地訊息表實現最終一致性

曹工雜談:分散式事務解決方案之基於本地訊息表實現最終一致性

# 曹工雜談:分散式事務解決方案之基於本地訊息表實現最終一致性 # 前言 為什麼寫這個?其實我這邊的業務場景,嚴格來說,不算是典型的分散式事務,需求是這樣說的:因為我這邊負責的一個服務消費者consumer,是使用者登入的入口;正常情況下,登入時候要走使用者中心,這是個單獨的服務;如果使用者中心掛了,我這邊自然是沒法登入的。 現在的需求就是說,假設使用者中心掛了,也要可以正常登入。因為我這個consumer其實也是快取了使用者的資料的,在本地登入也可以的,如果在我本地登入的話,我就得後續等使用者中心恢復後,再把相關狀態同步過去。 基於這樣一個需求,我這邊的實現方案是: 1.配置檔案裡維護一個開關,表示是否開啟:故障轉移模式。暫不考慮動態修改開關(如果要做,簡單做就提供個介面來改;複雜做,就放到配置中心裡,我們現在用的nacos,可以改了後推送到服務端) 2.如果開關是開啟的,表示需要進行故障轉移,則登入、退出登入等各種需要訪問使用者中心的請求,都儲存到資料庫中;資料庫會有一張表,用來存放這類請求。大致如下: ```sql CREATE TABLE `cached_http_req_to_resend` ( `http_req_id` bigint(20) NOT NULL COMMENT '主鍵', `req_type` tinyint(4) NOT NULL COMMENT '請求型別,1:推送待處置結果給第三方系統', `third_sys_feign_name` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '第三方系統的名稱,和feignClient的保持一致', `http_req_body` varchar(4000) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '請求體', `current_state` tinyint(4) DEFAULT NULL COMMENT '該請求當前狀態,1:成功;2:失敗;3:待處理;4:失敗次數過多,放棄嘗試', `fail_count` tinyint(4) DEFAULT NULL COMMENT '截止目前,失敗次數;超過指定次數後,將跳過該請求', `success_time` datetime DEFAULT NULL COMMENT '請求成功傳送的時間', `create_time` datetime DEFAULT NULL COMMENT '建立時間', `related_entity_id` bigint(21) DEFAULT NULL COMMENT '相關的實體的id,比如在推送待處置警情時,這個id為處警id', PRIMARY KEY (`http_req_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ``` 3.單獨開一個schedule執行緒,定時去掃這個表,發現有需要處理的,就去重新發送請求就行了,成功了的,直接更新狀態為success。 這個模式,其實就算是分散式事務中的:本地訊息表方案了。 本地訊息表,有一個注意的點,就是要把儲存訊息的操作和業務相關操作,放到同一個事務中,這樣可以確保,業務成功了,訊息肯定是落庫了的,很可靠。然後再開啟個定時任務,去掃描訊息表即可。 我這邊不是發訊息,而是發請求,道理是類似的。 下面開始基於程式碼demo來講解。 # 程式碼結構 這邊就是簡單的幾個module,基於spring cloud開發了一個服務提供者和一個服務消費者。服務提供者對外暴露的介面,通過api.jar的形式,提供給消費者,這種算是強耦合了,有優點,也有缺點,這裡就不討論了。 消費者通過feign呼叫服務提供者。有人會問,不需要eureka這些東西嗎,其實是可以不需要的,我們直接在ribbon的配置中,把服務對應的:ip和埠寫死就完了。 ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728123955592-362215636.png) 我們這裡就是,消費者訪問服務提供者,正常情況下直接訪問就行了;但我們這裡,模擬的就是服務A訪問不了的情況,所以會直接把請求落庫,後續由定時執行緒去處理。 # 服務提供者-api 我們看看服務提供者api,裡面僅有一個介面: ```java public interface FeignServiceA { /** * * @return */ @RequestMapping("/login") public Message login(@RequestBody LoginReqVO loginReqVO); } ``` ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728124540899-758684138.png) # 服務提供者的邏輯 ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728124633520-665374919.png) 其中,邏輯如下: ```java @RestController @Slf4j public class DemoController extends BaseController implements FeignServiceA { // 1 @Override public Message login(@RequestBody LoginReqVO loginReqVO) { log.info("login is ok,param:{}", loginReqVO); LoginRespVO vo = new LoginRespVO(); vo.setUserName(loginReqVO.getUserName()); vo.setAge(11); vo.setToken(UUID.randomUUID().toString()); return successResponse(vo); } } ``` 這裡1處就是提供了一個介面,接口裡返回一點點資訊。測試一下: ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728125459074-960416039.png) # 服務消費者之正常請求服務提供者 ## pom.xml中依賴服務提供者的api ```xml com.example
service-provider-A-api 0.0.1-SNAPSHOT
``` ## feign client程式碼 我們需要寫一個介面,繼承其feign api。 ```java @FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { } ``` 要呼叫的時候,怎麼弄呢? 直接注入該介面,然後呼叫對應的方法就行了,這樣就可以了。 ```java @Autowired private RpcServiceForServiceA rpcServiceForServiceA; Message message = rpcServiceForServiceA.login(reqVO); ``` 但是,我們好像沒有配置註冊中心之類的東西,這個我們可以繞過,因為最終發起呼叫的是,ribbon這個元件。 ribbon提供了幾個介面,其中一個,就是用來獲取服務對應的例項列表。 這裡要說的,就是下面這個介面: ```java package com.netflix.loadbalancer; import java.util.List; /** * Interface that defines the methods sed to obtain the List of Servers * @author stonse * * @param */ public interface ServerList { public List getInitialListOfServers(); /** * Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle * */ public List getUpdatedListOfServers(); } ``` 這個介面,有多個實現,ribbon自帶了幾個實現,然後eureka 、nacos的客戶端,都自己進行了實現。 ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728130312211-622034785.png) ribbon自帶的實現中,有一個叫做: ```java public class ConfigurationBasedServerList extends AbstractServerList { private IClientConfig clientConfig; ... @Override public List getUpdatedListOfServers() { // 1 String listOfServers = clientConfig.get("listOfServers"); return derive(listOfServers); } ``` 1處可以看到,它獲取服務對應的例項,就是通過去配置檔案裡獲取`listOfServers`這個key中配置的那些。 總之,最終我們向下面這樣配置就行了: ```properties SERVICE-A.ribbon.ReadTimeout=3000 SERVICE-A.ribbon.listOfServers=localhost:8082 SERVICE-A.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList ``` 這裡的字首,`SERVICE-A`和之前下面這個地方一致就行了: ```java @FeignClient(value = "SERVICE-A") public interface RpcServiceForServiceA extends FeignServiceA { } ``` 正常情況下,就說完了,直接呼叫就行,和httpclient呼叫沒啥本質差別。只不過ribbon提供了負載均衡、重試等各種功能。 # 設計表結構,在使用故障轉移模式時,儲存請求 表結構我前面已經貼了,這裡就展示下資料吧(可點選放大檢視): ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728130906629-300148431.png) 儲存請求的程式碼很簡單: ```java @Override public LoginRespVO login(LoginReqVO reqVO) { boolean failOverModeOn = isFailOverModeOn(); /** * 故障轉移沒有開啟,則正常呼叫服務 */ if (!failOverModeOn) { ... return ...; } /** * 1 使用本地資料進行服務,並將請求儲存到資料庫中 */ iCachedHttpReqToResendService.saveLoginReqWhenFailOver(reqVO); /** * 返回一個 dummy 資料 */ return new LoginRespVO(); } ``` 上面的1處,就會儲存請求到資料庫。 # 定時執行緒消費邏輯 ##概覽 定時執行緒這邊,我設計得比較複雜一點。因為實際場景中,上面的表中,會儲存多個第三方服務的請求;比如service-A,service-B。 所以,這裡的策略是: ![](https://img2020.cnblogs.com/blog/519126/202007/519126-20200728132141899-27725954.png) 簡單來說,就是定時執行緒,拿到任務後,按照第三方服務的名字來進行group by操作,比如,要傳送到service-A的請求放一起,按時間排好序;要傳送給service-B的放一起,排好序。 然後找到service-A,service-B各自對應的處理器,然後把資料丟給這些處理器;處理器拿到後,就會放到阻塞佇列裡; 然後此時worker執行緒就會被阻塞佇列給喚醒,喚醒後,就去開始處理這些請求,包括髮起feign呼叫,並且更新結果到資料庫中。 ## 定時執行緒入口 ```java @Scheduled(cron = "0/30 * * * * ? ") public void sendCachedFeignReq() { Thread.currentThread().setName("SendCachedFeignReqTask"); log.info("start sendCachedFeignReq"); /** * 1、獲取鎖 */ boolean success = iCommonDistributedLockService.tryLock(DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.lockName, DISTRIBUTED_LOCK_ENUM.SEND_CACHED_FEIGN_REQ_TO_REMOTE_SERVER.expireDurationInSeconds); /** * 進行業務邏輯處理 */ iCachedHttpReqToResendService.processCachedFeignReqForLoginLogout(); ... } ``` 這裡還加了個分散式鎖的操作,用資料庫實現的,還沒經過充分測試,可能會有點小問題,不過不是重點。 下面看看業務邏輯: ```java @Override public void processCachedFeignReqForLoginLogout() { // 1 String[] feignClients = {EFeignClient.SERVICE_A.getName()}; // 2 for (String feignClient : feignClients) { /** * 3 從資料庫獲取要傳送到該服務的請求 */ List recordsFromDb = getRecordsFromDb(feignClient); if (CollectionUtils.isEmpty(recordsFromDb)) { continue; } /** * 4 根據feign client,找到對應的處理器 */ CachedHttpReqProcessor cachedHttpReqProcessor = cachedHttpReqProcessors.stream().filter(item -> item.support(feignClient)).findFirst().orElse(null); if (cachedHttpReqProcessor == null) { throw new RuntimeException(); } /** * 5 利用對應的處理器,處理該部分請求 */ cachedHttpReqProcessor.process(recordsFromDb); } } ``` * 1,定義一個數組,陣列中包括所有要處理的第三方系統 * 2,遍歷 * 3,根據該serviceName,比如,根據service-A,去資料庫查詢對應的請求(這裡可能和前面的圖有點出入,以這裡的程式碼為準) * 4,根據該service-A,找到對應的處理器 * 5,利用第四步找到的處理器,來處理第三步中查到的資料 ## 怎麼找到service-A對應的處理器 我們先看看處理器這個介面: ```java public interface CachedHttpReqProcessor { /** * 該處理器是否支援處理該service * @param feignClientName * @return */ boolean support(String feignClientName); /** * 具體的處理邏輯 * @param list */ void process(Collection list); /** * worker執行緒的名字 * @return */ String getThreadName(); } ``` 然後看看針對service-A的處理器,是怎麼實現的: ```java @Service public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor { // 1 @Override public boolean support(String feignClientName) { return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName); } @Override public String getThreadName() { return "CachedHttpReqProcessorForServiceA"; } ``` 1處,判斷傳入的feign客戶端,是否等於`EFeignClient.SERVICE_A`,如果是,說明找到了對應的處理器。 我們這裡將這個service,註冊為了bean;在有多個serviceA,serviceB的時候,就會有多個CachedHttpReqProcessor處理器。 我們在之前的上層入口那裡,就注入了一個集合: ```java @Autowired private List cachedHttpReqProcessors; ``` 然後在篩選對應的處理器時,就是通過遍歷這個集合,找到合適的處理器。 具體的,大家可以把程式碼拉下來看看。 ## CachedHttpReqProcessor的處理邏輯 對於serviceA,serviceB,service C,由於處理邏輯很大部分是相同的,我們這裡提取了一個抽象類。 ```java @Slf4j public abstract class AbstractCachedHttpReqProcessor implements CachedHttpReqProcessor { private LinkedBlockingQueue blockingQueue = new LinkedBlockingQueue<>(500); private AtomicBoolean workerInited = new AtomicBoolean(false); Thread workerThread; @Override public void process(Collection list) { if (CollectionUtils.isEmpty(list)) { return; } /** * 1 直到有任務要處理時(該方法被呼叫時),才去初始化執行緒 */ if (workerInited.compareAndSet(false, true)) { // 2 workerThread = new Thread(new InnerWorker()); workerThread.setDaemon(true); workerThread.setName(getThreadName()); workerThread.start(); } /** * 放到阻塞佇列裡 */ blockingQueue.addAll(list); } ``` 我們這裡1處,給每個處理器,定義了一個工作執行緒,且只在本方法被呼叫時,才去初始化該執行緒;為了防止併發,使用了AtomicBoolean,保證只會初始化一次。 2處,給執行緒設定了Runnable,它會負責實際的業務處理。 然後3處,直接把要處理的任務,丟到阻塞佇列即可。 ## Worker的處理邏輯 任務已經是到了阻塞隊列了,那麼,誰去處理呢,就是worker了。如果大家忘了整體的設計,可以回去看看那張圖。 ```java public abstract boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend); /** * 從佇列取資料;取到後,呼叫子類的方法去處理; * 子類處理後,返回處理結果 * 根據結果,設定成功或者失敗的狀態 */ public class InnerWorker implements Runnable { @Override public void run() { while (true) { // 1 boolean interrupted = Thread.currentThread().isInterrupted(); if (interrupted) { log.info("interrupted ,break out"); break; } // 2 CachedHttpReqToResend cachedHttpReqToResend; try { cachedHttpReqToResend = blockingQueue.take(); } catch (InterruptedException e) { log.info("interrupted,e:{}", e); break; } // 3 Integer reqType = cachedHttpReqToResend.getReqType(); if (reqType == null) { continue; } try { /** * 4 使用模板方法設計模式,交給子類去實現 */ boolean success = doProcess(reqType, cachedHttpReqToResend); // 5 if (!success) { cachedHttpReqToResend.setFailCount(cachedHttpReqToResend.getFailCount() + 1); } else { cachedHttpReqToResend.setCurrentState(CachedHttpReqToResend.CURRENT_STATE_SUCCESS); cachedHttpReqToResend.setSuccessTime(new Date()); } // 6 boolean count = iCachedHttpReqToResendService.updateById(cachedHttpReqToResend); if (count) { log.debug("update sucess"); } } catch (Throwable throwable) { log.error("e:{}", throwable); continue; } } } } ``` * 1,判斷是否被中斷了,這樣可以在程式關閉時,感知到;避免執行緒洩漏 * 2,從阻塞佇列中,獲取任務 * 3,判斷請求型別是否為null,這個是必須要的 * 4,使用模板方法設計模式,具體邏輯,具體怎麼發請求,誰去發,交給子類實現 * 5、6,根據結果,更新這條資料的狀態。 ## 子類中的具體邏輯 我們這裡貼個全貌: ```java @Service @Slf4j public class CachedHttpReqProcessorForServiceA extends AbstractCachedHttpReqProcessor { @Autowired private FeignServiceA feignServiceA; @Autowired private ObjectMapper objectMapper; @Override public boolean support(String feignClientName) { return Objects.equals(EFeignClient.SERVICE_A.getName(), feignClientName); } @Override public String getThreadName() { return "CachedHttpReqProcessorForServiceA"; } /** * 1 根據請求type欄位,我們就知道是要傳送哪一個請求 * @param reqType * @param cachedHttpReqToResend * @return */ @Override public boolean doProcess(Integer reqType, CachedHttpReqToResend cachedHttpReqToResend) { switch (reqType) { // 2 case CachedHttpReqToResend.REQ_TYPE_LOGIN_TO_SERVICE_A: { // 3 String httpReqBody = cachedHttpReqToResend.getHttpReqBody(); try { // 4 LoginReqVO loginReqVO = objectMapper.readValue(httpReqBody, LoginReqVO.class); /** * 5 發起登入 */ Message message = feignServiceA.login(loginReqVO); boolean success = FeignMsgUtils.isSuccess(message); return success; } catch (Throwable e) { log.error("e:{}", e); return false; } } } return true; } } ``` * 1,這個類就是實現了父類中的抽象方法,這裡體現的就是模板方法設計模式 * 2,根據請求type,判斷要訪問哪個介面 * 3,4,將請求體進行反序列化 * 5,發起請求,呼叫feign。 # 程式碼如何使用 具體的程式碼,我放在了:
建表語句: 服務提供者A的訪問入口: ```shell curl -i -X POST \ -H "Content-Type:application/json" \ -d \ '{ "userName": "zhangsan", "password":"123" }' \ 'http://localhost:8082/login' ``` 服務消費者的application.properties中: ```properties failover.mode=true ``` 這個為true時,就是故障轉移模式,訪問如下介面時,請求會落庫 為false的話,就會直接進行feign呼叫。 # 程式碼中的bug 其實這個程式碼是有bug的,因為我們是定時執行緒,假設每隔30s執行,那假設我一開始取了10條出來,假設全部放到隊列了,阻塞佇列此時有10條,假設worker處理特別慢,30s內也沒執行完的話,定時執行緒會再次取出狀態沒更新的那個任務,又丟到佇列裡。 任務就被重複消費了。 大家可以想想怎麼處理這個問題,通過這個bug,我也發現,blockingqueue是一種比較徹底的解耦方式,但是,我們這裡的業務,解耦了嗎,如果業務不是解耦的,用這個方式,其實是有點問題。 過兩天我再更新這部分的方案,生產者和消費者,這裡還是需要通訊的,才能避免任務重複消費的問題。 # 總結 要實現一個本地訊息表最終一致性方案,有一定開發量,而且我這裡,消費過程中,強行引入了多執行緒和生產者、消費者模式,增加了部分複雜度。 不過,程式碼不就是要多折