1. 程式人生 > >藉助訊息佇列解決分散式事務

藉助訊息佇列解決分散式事務

先介紹一下RabbitMQ的基本概念

核心概念

Queue:真正儲存資料的地方
Exchange接受請求,轉存資料
Bind:收到請求後儲存到哪裡
訊息生產者:傳送資料的應用
訊息消費者:取出資料處理的應用

Bind的幾種分發規則:Direct、Topic、Fanout
Fanout:與該Exchange繫結的Queue都發一份資料
Direct:給完全匹配的Queue傳送資料,routingkey與bindkey完全一致
Topic:給模糊匹配的Queue傳送資料,binding key中可以存在兩種特殊字元“”與“#”,用於做模糊匹配,其中“”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

工作流程

生產者將資料通過RabbitMQ-client傳送到RabbitMQ-server中的exchange,exchange根據路由配置,分發給Queue,消費者從Queue拿到資料
在這裡插入圖片描述

分散式事務的產生

多個系統相互配合工作,產生資料一致性問題。
例如外賣場景中,下單中心,運單中心兩個系統要配合工作,必須保證兩個系統資料一致性。
錯誤的解決方案:
使用API介面呼叫,下單中心插入資料,呼叫運單中心的API介面處理資料,並啟動事務回滾。
咋一看這場景沒有什麼問題,畢竟有事務回滾,一起成功一起失敗,但其實存在API呼叫超時的情況,此時下單中心以為呼叫失敗回滾,而運單中心只是超時仍會繼續執行程式,從而造成兩個系統資料不一致。
假設API呼叫成功,也有可能是在訂單中心提交事務時失敗了,此時訂單中心回滾,而API已經呼叫,下單中心的資料已經產生,資料不一致。

使用訊息佇列解決分散式事務

在這裡插入圖片描述
問題的核心就是保證可靠生產與可靠消費
可靠生產下單中心處理資料和狀態表更改應該保證事務一致。生產者往訊息佇列傳送資料時,在本地建立一張狀態表,看是否成功傳送給佇列。利用RabbitMQ的確認機制看是否重發還是定時掃描狀態表重發,保證可靠生產。兜底方案還是定時掃描狀態表。
程式碼

#開啟mq的訊息回執確認機制
spring.rabbitmq.publisher-confirms=true
@PostConstruct   //回撥函式
    public void setUp(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //ack為true 代表傳送成功
                if(!ack){   //傳送失敗
                    System.out.println(cause);
                    return;
                }
                System.out.println("傳送成功");
                //真實應該在本地記錄表中更改為已傳送標誌

            }
        });
    }

可靠消費:消費端開啟手動ACK,給MQ佇列傳送確認資訊。對每條資料進行記錄,一旦有異常記錄可以試著去重試重新要求MQ再發資料,但不能重試太多次。可以將資料主鍵插入資料庫,這樣同一資料就不會執行兩次,或者使用redis記錄資料操作。
程式碼

#開啟消費端手動確認機制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = "hello")
    public void process(String message, Channel channel , @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            System.out.println("Receiver1  : " + message);
            //保證冪等性 防止重複資料重複處理

            //回覆MQ 已經接受到資料一切正常
            channel.basicAck(tag,false);
        } catch (IOException e) {
            //出現異常,通知MQ重發資料,但要記錄下資料的異常處理次數
            //對於錯誤資料另外處理,人工干預
            channel.basicNack(tag,false,false);
        }
    }

優缺點

  • 通用性強
  • 擴充套件性強

缺點

  • 適合非同步場景
  • 處理有延遲,業務上需要適應