1. 程式人生 > >rubbitMq+spring實現延遲佇列(適用訂單超時未支付的問題)

rubbitMq+spring實現延遲佇列(適用訂單超時未支付的問題)

什麼是延遲佇列


通俗一點說,延遲佇列和我們生活中常用的定時器有點像,定時器會在指定的時間後響起,延遲佇列則會在指定的時間後處理訊息。延遲佇列主要的應用場景有訂單超時取消、超時自動評價等等。

實現原理


RabbitMQ給我們提供了TTL(Time-To-Live)和DLX (Dead-Letter-Exchange)這兩個特性,使用RabbitMQ實現延遲佇列利用的正是這兩個特性。TTL用於控制訊息的生存時間,如果超時,訊息將變成Dead Letter。DLX用於配置死信佇列,可以通過配置x-dead-letter-exchange和x-dead-letter-routing-key這兩個引數來指定訊息變成死信後需要路由到的交換器和佇列。

圖例

首先看看死信佇列的圖

然後就是延遲佇列,延遲佇列本身就是死信佇列的擴充套件

程式碼例項

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/rabbit
                           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--建立連線工廠-->
    <rabbit:connection-factory id="mqConnectionFactory"
                               host="***.***.**.**"
                               port="****"
                               username="***"
                               password="***"
                               publisher-confirms="true"
                               virtual-host="*****"/>

    <!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
    <rabbit:admin id="orderConnectAdmin" connection-factory="mqConnectionFactory"/>
   <!-- <rabbit:admin id="delayConnectAdmin" connection-factory="delayConnectionFactory"/>-->

    <!-- 說明:將需要關閉的訂單的訊息傳送到此佇列
        durable: 為 true 則設定佇列為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊。
        autoDelete: 設定是否自動刪除。為 true 則設定佇列為自動刪除。自動刪除的前提是:至少有一個消費者連線到
        這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除
     -->
    <rabbit:queue name="shanreal_order_shutdown_queue" durable="false" auto-declare="true">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
            <entry key="x-dead-letter-exchange" value="shanreal_exchange_delay" />
            <entry key="x-dead-letter-routing-key" value="shanreal_delay_key" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!--正常交換機
         durable:true 持久化。可以將交換器存檔,在伺服器重啟 的時候不會丟失相關資訊。false 反之
         auto-declare:true 自動刪除。自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結 ,
         之後所有與這個交換器繫結的佇列或者交換器都與此解綁
    -->
    <rabbit:fanout-exchange name="shanreal_exchange_normal" durable="false" auto-delete="true" id="shanreal_exchange_normal">
        <rabbit:bindings>
            <rabbit:binding queue="shanreal_order_shutdown_queue" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--死亡佇列,用於儲存超時訊息-->
    <rabbit:queue name="shanreal_delay_queue" durable="false" auto-declare="true"/>

    <!--死亡交換機,訊息過時後會由此交換機轉發到對應的佇列-->
    <rabbit:direct-exchange name="shanreal_exchange_delay" durable="false" auto-delete="true" id="shanreal_exchange_delay">
        <rabbit:bindings>
            <rabbit:binding queue="shanreal_delay_queue" key="shanreal_delay_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--定義rabbit orderAmqpTemplate用於資料的生產 -->
    <rabbit:template id="orderAmqpTemplate" connection-factory="mqConnectionFactory" exchange="shanreal_exchange_normal"/>

    <!--定義rabbit delayAmqpTemplate用於死信佇列資料的消費 -->
    <rabbit:template
            id="delayAmqpTemplate"
            connection-factory="mqConnectionFactory"
            exchange="shanreal_exchange_delay"
            queue="shanreal_delay_queue"/>

    <!-- 配置執行緒池 -->
    <bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <!-- 執行緒池維護執行緒的最少數量 -->
        <property name ="corePoolSize" value ="5" />
        <!-- 執行緒池維護執行緒所允許的空閒時間 -->
        <property name ="keepAliveSeconds" value ="30000" />
        <!-- 執行緒池維護執行緒的最大數量 -->
        <property name ="maxPoolSize" value ="1000" />
        <!-- 執行緒池所使用的緩衝佇列 -->
        <property name ="queueCapacity" value ="200" />
    </bean>

    <bean id="delayTask" class="com.shanreal.controller.gb.rubbitMq.DelayTask" />

    <!-- Queue Listener 當有訊息到達時會通知監聽在對應的佇列上的監聽物件-->
    <rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="shanreal_delay_queue" ref="delayTask"/>
    </rabbit:listener-container>


</beans>

生產者:

這裡可以單獨對每個訊息設定TTL(訊息生存時間)

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class ProducerService {

/*    @Resource(name="mqConnectionFactory")
    private ConnectionFactory mqConnectionFactory;*/

    @Resource(name="orderAmqpTemplate")
    private AmqpTemplate orderAmqpTemplate;

    public void send(String msg) {
        orderAmqpTemplate.convertAndSend((Object) msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        });
        System.out.println("Sent: " + msg);
    }

}

消費者:

這裡可以手動消費

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class ConsumerService {
    @Resource(name="delayAmqpTemplate")
    private AmqpTemplate delayAmqpTemplate;

    public void recive() {
        System.out.println("Received: " + delayAmqpTemplate.receiveAndConvert());
    }
}

執行生產者操作

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

@Controller
@RequestMapping(value="/mqTest")
public class TestController {

    @Resource(name = "producerService")
    private ProducerService producerService;

    @Resource(name = "consumerService")
    private ConsumerService consumerService;


    @ResponseBody
    @RequestMapping(value="/producerTest")
    public void producerTest() throws Exception {
        String  s = "我是生產者測試";
        producerService.send(s);
    }

    @ResponseBody
    @RequestMapping(value="/consumerTest")
    public void consumerTest() throws Exception {
        consumerService.recive();
    }
}

通過監聽死信佇列的訊息來進行消費

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class DelayTask implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            String receivedMsg = new String(message.getBody(), "UTF-8");
            System.out.println("Received : " + receivedMsg);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

參考:rubbitMq實戰指南

需要此書籍的pdf文件的可以留言,我私發郵箱