1. 程式人生 > >分散式事務解決方案之訊息最終一致性(可靠訊息服務)下篇

分散式事務解決方案之訊息最終一致性(可靠訊息服務)下篇

背景:1.支付成功 通知訂單完成2.訂單完成,通知會計記賬上游訂單服務,必須開放可查詢訂單狀態介面,判斷訊息是否可以傳送下游會計消費成功後,必須回撥訊息服務,ACK操作(約束:冪等性。 例如:訊息id等)流程:訂單服務: 預儲存訊息 -> 訂單完成 -> 確認傳送訊息 會計服務:消費訂單訊息 -> 完成記賬 -> 確認訊息已消費訊息生命週期: 預存 -> 傳送中 -> 銷燬/確認消費1.Create MySql Message Table
ROP TABLE IF EXISTS `rp_transaction_message`;
CREATE TABLE `rp_transaction_message` (
  `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主鍵ID',
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本號',
  `editor` varchar(100) DEFAULT NULL COMMENT '修改者',
  `creater` varchar(100) DEFAULT NULL COMMENT '建立者',
  `edit_time` datetime DEFAULT NULL COMMENT '最後修改時間',
  `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '建立時間',
  `message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '訊息ID',
  `message_body` longtext NOT NULL COMMENT '訊息內容',
  `message_data_type` varchar(50) DEFAULT NULL COMMENT '訊息資料型別',
  `consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消費佇列',
  `message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '訊息重發次數',
  `areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡',
  `status` varchar(20) NOT NULL DEFAULT '' COMMENT '狀態',
  `remark` varchar(200) DEFAULT NULL COMMENT '備註',
  `field` varchar(1024) DEFAULT NULL COMMENT '擴充套件欄位',
  PRIMARY KEY (`id`),
  KEY `AK_Key_2` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.Message API (訊息服務API)

不需要配置MQ自身重發機制,一定時間內不成功 ,規定次數內重發,重發時間可設定梯度

public interface RpTransactionMessageService {

	/**
	 * 預儲存訊息. 
	 */
	public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
	
	
	/**
	 * 確認併發送訊息.
	 */
	public void confirmAndSendMessage(String messageId) throws MessageBizException;

	
	/**
	 * 儲存併發送訊息.
	 */
	public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

	
	/**
	 * 直接傳送訊息.
	 */
	public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
	
	
	/**
	 * 重發訊息.
	 */
	public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
	
	
	/**
	 * 根據messageId重發某條訊息.
	 */
	public void reSendMessageByMessageId(String messageId) throws MessageBizException;
	
	
	/**
	 * 將訊息標記為死亡訊息.
	 */
	public void setMessageToAreadlyDead(String messageId) throws MessageBizException;


	/**
	 * 根據訊息ID獲取訊息
	 */
	public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

	/**
	 * 根據訊息ID刪除訊息
	 */
	public void deleteMessageByMessageId(String messageId) throws MessageBizException;
	
	
	/**
	 * 重發某個訊息佇列中的全部已死亡的訊息.
	 */
	public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
	
	/**
	 * 獲取分頁資料
	 */
	PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;


}
3.Message APP (訊息確認,訊息恢復)
/**
 * 訊息定時器介面
 */
public interface MessageScheduled {

	/**
	 * 處理狀態為“待確認”但已超時的訊息.
	 */
	public void handleWaitingConfirmTimeOutMessages();

	/**
	 * 處理狀態為“傳送中”但超時沒有被成功消費確認的訊息
	 */
	public void handleSendingTimeOutMessage();

}
/**
 * message業務處理類
 */
@Component("messageBiz")
public class MessageBiz {

	private static final Log log = LogFactory.getLog(MessageBiz.class);

	@Autowired
	private RpTradePaymentQueryService rpTradePaymentQueryService;
	@Autowired
	private RpTransactionMessageService rpTransactionMessageService;

	/**
	 * 處理[waiting_confirm]狀態的訊息
	 * 
	 * @param messageMap
	 */
	public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {
		log.debug("開始處理[waiting_confirm]狀態的訊息,總條數[" + messageMap.size() + "]");
		// 單條訊息處理(目前該狀態的訊息,消費佇列全部是accounting,如果後期有業務擴充,需做佇列判斷,做對應的業務處理。)
		for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
			RpTransactionMessage message = entry.getValue();
			try {
				log.debug("結束處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息");
			} catch (Exception e) {
				log.error("處理[waiting_confirm]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e);
			}
		}
	}

	/**
	 * 處理[SENDING]狀態的訊息
	 * 
	 * @param messageMap
	 */
	public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		log.debug("開始處理[SENDING]狀態的訊息,總條數[" + messageMap.size() + "]");

		// 根據配置獲取通知間隔時間
		Map<Integer, Integer> notifyParam = getSendTime();

		// 單條訊息處理
		for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
			RpTransactionMessage message = entry.getValue();
			try {
				log.debug("開始處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息");
				// 判斷髮送次數
				int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
				log.debug("[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息,已經重新發送的次數[" + message.getMessageSendTimes() + "]");

				// 如果超過最大發送次數直接退出
				if (maxTimes < message.getMessageSendTimes()) {
					// 標記為死亡
					rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
					continue;
				}
				// 判斷是否達到傳送訊息的時間間隔條件
				int reSendTimes = message.getMessageSendTimes();
				int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);
				long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
				long needTime = currentTimeInMillis - times * 60 * 1000;
				long hasTime = message.getEditTime().getTime();
				// 判斷是否達到了可以再次傳送的時間條件
				if (hasTime > needTime) {
					log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]訊息上次傳送時間[" + sdf.format(message.getEditTime()) + "],必須過了[" + times + "]分鐘才可以再發送。");
					continue;
				}

				// 重新發送訊息
				rpTransactionMessageService.reSendMessage(message);

				log.debug("結束處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息");
			} catch (Exception e) {
				log.error("處理[SENDING]訊息ID為[" + message.getMessageId() + "]的訊息異常:", e);
			}
		}

	}

	/**
	 * 根據配置獲取通知間隔時間
	 * 
	 * @return
	 */
	private Map<Integer, Integer> getSendTime() {
		//TODO...   config 配置 次數相對應的時間間隔
	}
}
4.Message QUEUE(訊息消費)
·消費訊息,完成業務操作後,呼叫訊息服務ACK確認訊息已經被消費5.Message WEB (訊息管理)訊息管理視覺化介面,處理死亡訊息等優化1.訂單完成後,呼叫訊息RPC確認訊息可能超時,訂單將會被回滾,但是訊息已經被確認,導致會計成功記賬方案:確認訊息設定成非同步<dubbo:reference interface="com.roncoo.pay.service.message.api.RpTransactionMessageService" id="rpTransactionMessageService" check="false"><dubbo:method name="confirmAndSendMessage" async="true" return="false" /></dubbo:reference>2.訊息儲存DB方案:redis,mongodb3.被動方業務冪等性判斷方案:業務操作成功,記錄訊息id4.服務部署叢集導致任務重複排程方案:分散式任務排程(噹噹開源elastic-job,分散式作業排程框架)5.實時訊息服務方案:rabbitMQ,rocketMQ6.效能提高,解耦方案:每個業務使用一套訊息服務