分散式事務解決方案之訊息最終一致性(可靠訊息服務)下篇
阿新 • • 發佈:2018-12-31
背景:1.支付成功 通知訂單完成2.訂單完成,通知會計記賬上游訂單服務,必須開放可查詢訂單狀態介面,判斷訊息是否可以傳送下游會計消費成功後,必須回撥訊息服務,ACK操作(約束:冪等性。 例如:訊息id等)流程:訂單服務: 預儲存訊息 -> 訂單完成 -> 確認傳送訊息 會計服務:消費訂單訊息 -> 完成記賬 -> 確認訊息已消費訊息生命週期: 預存儲 -> 傳送中 -> 銷燬/確認消費1.Create MySql Message Table
·消費訊息,完成業務操作後,呼叫訊息服務ACK確認訊息已經被消費5.Message WEB (訊息管理)訊息管理視覺化介面,處理死亡訊息等優化1.訂單完成後,呼叫訊息RPC確認訊息可能超時,訂單將會被回滾,但是訊息已經被確認,導致會計成功記賬方案:確認訊息設定成非同步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>2.訊息儲存DB方案:redis,mongodb3.被動方業務冪等性判斷方案:業務操作成功,記錄訊息id4.服務部署叢集導致任務重複排程方案:分散式任務排程(噹噹開源elastic-job,分散式作業排程框架)5.實時訊息服務方案:rabbitMQ,rocketMQ6.效能提高,解耦方案:每個業務使用一套訊息服務
2.Message API (訊息服務API)ROP TABLE IF EXISTS `rp_transaction_message`; CREATE TABLE `rp_transaction_message` ( `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主鍵ID', `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本號', `editor` varchar(100) DEFAULT NULL COMMENT '修改者', `creater` varchar(100) DEFAULT NULL COMMENT '建立者', `edit_time` datetime DEFAULT NULL COMMENT '最後修改時間', `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '建立時間', `message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '訊息ID', `message_body` longtext NOT NULL COMMENT '訊息內容', `message_data_type` varchar(50) DEFAULT NULL COMMENT '訊息資料型別', `consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消費佇列', `message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '訊息重發次數', `areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡', `status` varchar(20) NOT NULL DEFAULT '' COMMENT '狀態', `remark` varchar(200) DEFAULT NULL COMMENT '備註', `field` varchar(1024) DEFAULT NULL COMMENT '擴充套件欄位', PRIMARY KEY (`id`), KEY `AK_Key_2` (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
不需要配置MQ自身重發機制,一定時間內不成功 ,規定次數內重發,重發時間可設定梯度
3.Message APP (訊息確認,訊息恢復)public interface RpTransactionMessageService { /** * 預儲存訊息. */ public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 確認併發送訊息. */ public void confirmAndSendMessage(String messageId) throws MessageBizException; /** * 儲存併發送訊息. */ public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 直接傳送訊息. */ public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 重發訊息. */ public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException; /** * 根據messageId重發某條訊息. */ public void reSendMessageByMessageId(String messageId) throws MessageBizException; /** * 將訊息標記為死亡訊息. */ public void setMessageToAreadlyDead(String messageId) throws MessageBizException; /** * 根據訊息ID獲取訊息 */ public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException; /** * 根據訊息ID刪除訊息 */ public void deleteMessageByMessageId(String messageId) throws MessageBizException; /** * 重發某個訊息佇列中的全部已死亡的訊息. */ public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException; /** * 獲取分頁資料 */ PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException; }
/**
* 訊息定時器介面
*/
public interface MessageScheduled {
/**
* 處理狀態為“待確認”但已超時的訊息.
*/
public void handleWaitingConfirmTimeOutMessages();
/**
* 處理狀態為“傳送中”但超時沒有被成功消費確認的訊息
*/
public void handleSendingTimeOutMessage();
}
4.Message QUEUE(訊息消費)/** * message業務處理類 */ @Component("messageBiz") public class MessageBiz { private static final Log log = LogFactory.getLog(MessageBiz.class); @Autowired private RpTradePaymentQueryService rpTradePaymentQueryService; @Autowired private RpTransactionMessageService rpTransactionMessageService; /** * 處理[waiting_confirm]狀態的訊息 * * @param messageMap */ public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) { log.debug("開始處理[waiting_confirm]狀態的訊息,總條數[" + messageMap.size() + "]"); // 單條訊息處理(目前該狀態的訊息,消費佇列全部是accounting,如果後期有業務擴充,需做佇列判斷,做對應的業務處理。) for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("結束處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息"); } catch (Exception e) { log.error("處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e); } } } /** * 處理[SENDING]狀態的訊息 * * @param messageMap */ public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.debug("開始處理[SENDING]狀態的訊息,總條數[" + messageMap.size() + "]"); // 根據配置獲取通知間隔時間 Map<Integer, Integer> notifyParam = getSendTime(); // 單條訊息處理 for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) { RpTransactionMessage message = entry.getValue(); try { log.debug("開始處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息"); // 判斷髮送次數 int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times")); log.debug("[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息,已經重新發送的次數[" + message.getMessageSendTimes() + "]"); // 如果超過最大發送次數直接退出 if (maxTimes < message.getMessageSendTimes()) { // 標記為死亡 rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId()); continue; } // 判斷是否達到傳送訊息的時間間隔條件 int reSendTimes = message.getMessageSendTimes(); int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes); long currentTimeInMillis = Calendar.getInstance().getTimeInMillis(); long needTime = currentTimeInMillis - times * 60 * 1000; long hasTime = message.getEditTime().getTime(); // 判斷是否達到了可以再次傳送的時間條件 if (hasTime > needTime) { log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]訊息上次傳送時間[" + sdf.format(message.getEditTime()) + "],必須過了[" + times + "]分鐘才可以再發送。"); continue; } // 重新發送訊息 rpTransactionMessageService.reSendMessage(message); log.debug("結束處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息"); } catch (Exception e) { log.error("處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e); } } } /** * 根據配置獲取通知間隔時間 * * @return */ private Map<Integer, Integer> getSendTime() { //TODO... config 配置 次數相對應的時間間隔 } }
·消費訊息,完成業務操作後,呼叫訊息服務ACK確認訊息已經被消費5.Message WEB (訊息管理)訊息管理視覺化介面,處理死亡訊息等優化1.訂單完成後,呼叫訊息RPC確認訊息可能超時,訂單將會被回滾,但是訊息已經被確認,導致會計成功記賬方案:確認訊息設定成非同步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>2.訊息儲存DB方案:redis,mongodb3.被動方業務冪等性判斷方案:業務操作成功,記錄訊息id4.服務部署叢集導致任務重複排程方案:分散式任務排程(噹噹開源elastic-job,分散式作業排程框架)5.實時訊息服務方案:rabbitMQ,rocketMQ6.效能提高,解耦方案:每個業務使用一套訊息服務