1. 程式人生 > >訊息佇列MQ, rabbitMQ和rocketMQ的實現方式

訊息佇列MQ, rabbitMQ和rocketMQ的實現方式

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求.

MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。

在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的

請求響應時間,從而提高了系統的吞吐量。

基本概念:

Broker:簡單來說就是訊息佇列伺服器實體。
Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
producer:訊息生產者,就是投遞訊息的程式。
consumer:訊息消費者,就是接受訊息的程式。
channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。

rocket MQ的實現:

第一階段Prepared訊息,會拿到訊息的地址。
第二階段執行本地事務,
第三階段通過第一階段拿到的地址去訪問訊息,並修改狀態。在業務方法內要向訊息佇列提交兩次請求,一次傳送訊息和一次確認訊息。如果確認訊息傳送失敗了RocketMQ會定期掃描訊息叢集中的事務訊息,這時候發現了Prepared訊息,它會向訊息傳送者確認,所以生產方需要實現一個check介面,RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。

這是rocketmq的實現,但是參考官方問題,極端情況下需要手工介入的。

rabbitmq實現方式:

訊息傳送一致性:
1.執行本地業務事務,寫訊息表
2.獲取訊息表的內容,傳送訊息
3.訊息傳送確認成功,則再起事務更新訊息表,如果不成功,則不更新。這個會存在這樣一個問題:訊息已經發送成功,但是rabbit mq沒有返回,則無法更新資料庫;導致下一次重複傳送。這個需要訊息接收方要做冪等性檢查。

訊息接收一致性:
1.rabbit mq傳送訊息
2.訊息接收方接收訊息處理,最後返回ack
3.rabbit mq接收到ack確認後,更新訊息傳送狀態。這裡有個問題,如果訊息接收方成功處理訊息,但是由於特殊情況沒有返回ack, rabbit mq接收到ack,這條訊息狀態已經改變不會再發送,需要手工處理。

使用mq本來就是解決前面流量大後端流量小的問題,所有的mq都強調事後一致性,就算是阿里的rocketmq,號稱支援分散式事務,但是實際上也不是

最需要考慮的兩個問題:

1.訊息消費的順序問題:傳送訊息指定佇列,訊息消費者指定佇列可以解決,消費者只能一個。
2.訊息消費的重複問題:每次消費訊息時候建立一訊息表,在消費訊息前先查詢該表,如果訊息存在就說明已經消費

消費者程式碼示例:

System.out.println("你好現在是 " + new Date() +"");
    System.out.println("HelloSender傳送內容 : " + users.toString());


    /**
     * ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥。
     * ReturnCallback介面用於實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥。
     */
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        //Users users1 = (Users)message.getBody().toString();
        //String correlationId = message.getMessageProperties().getCorrelationId();

        System.out.println("Message : " + new String(message.getBody()));
        //System.out.println("Message : " + new String(message.getBody()));
        System.out.println("replyCode : " + replyCode);
        System.out.println("replyText : " + replyText);  //錯誤原因
        System.out.println("exchange : " + exchange);
        System.out.println("routingKey : " + routingKey);//queue名稱

    });

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            CorrelationDataEx c = (CorrelationDataEx)correlationData;
            System.out.println("傳送訊息: " + c.getMsg());
            System.out.println("HelloSender 訊息傳送成功 :" + correlationData.toString() );
            /**
             * 通過設定correlationData.id為業務主鍵,訊息傳送成功後去繼續做候選業務。
             */
        } else {
            System.out.println("HelloSender訊息傳送失敗" + cause);
        }
    });

    /**
     * CorrelationDataEx繼承CorrelationData, 把需要傳送訊息的關鍵欄位加入
     * 這樣confirmcallback可以返回帶有關鍵欄位的correlationData,我們可以通過這個來確定傳送的是那條業務記錄
     */
    CorrelationDataEx c = new CorrelationDataEx();
    c.setId(users.getId().toString());
    c.setMsg(users.toString());

    /**
     * 加上這個,可以從returncallback引數中讀取傳送的json訊息,否則是二進位制bytes
     * 比如:如果returncallback觸發,則表明訊息沒有投遞到佇列,則繼續業務操作,比如將訊息記錄標誌位未投遞成功,記錄投遞次數
     */
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

    rabbitTemplate.convertAndSend(EXCHANGE, QUEUE_TWO_ROUTING, users, c);

}

生產者向mq傳送一條訊息,mq接收成功會給生產者返回一個ack,表示成功接收,生產者可以提交事務。 但是這裡面也是有一個問題,如果資料庫提交失敗,那麼傳送成功的訊息是多餘的,這個就要靠消費者消費的時候檢查訊息的冪等性來限制。

消費者程式碼示例

@RabbitHandler
@RabbitListener(queues = QUEUE_ONE_ROUTING) //containerFactory = "rabbitListenerContainerFactory", concurrency = "2")
public void process(Users users, Channel channel, Message message) throws IOException {
    System.out.println("HelloReceiver收到  : " + users.toString() + "收到時間" + new Date());

    try {
        //告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉 這樣以後就不會再發了
        // 否則訊息伺服器以為這條訊息沒處理掉 後續還會在發
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("receiver success");
    } catch (IOException e) {
        e.printStackTrace();
        //丟棄這條訊息,則不會重新發送了
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        System.out.println("receiver fail");
    }
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

消費者成功消費了,需要呼叫basicAck來向mq發一個ack,這樣mq就會知道這個訊息已經消費了,刪除之。如果不發ack,mq還有這個訊息。

這是rocket mq和rabbit mq不同而已。這是rocket mq和rabbit mq不同而已。mq需要做持久化,這樣宕機後起來會把未消費的訊息重新讀入。但是不管kafka 等等,最後還都是建議手工介入,如果碰上極端情況下。