1. 程式人生 > >延時訊息佇列

延時訊息佇列

在這裡插入圖片描述
下面程式碼按需要填寫
@Bean
public Queue delayQueuePerMessageTTL() {
Map<String, Object> argument = new HashMap<>();
argument.put(“x-message-ttl”, 1000 * 5);//How long a message published to a queue can live before it is discarded (milliseconds).(Sets the “x-message-ttl” argument.)
argument.put(“x-expires”, 100060);//How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the “x-expires” argument.)
argument.put(“x-max-length”, 10000);//How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the “x-max-length” argument.)
argument.put(“x-max-length-bytes”, 1024

1024*10);//Total body size for ready messages a queue can contain before it starts to drop them from its head.(Sets the “x-max-length-bytes” argument.)
argument.put(“x-dead-letter-exchange”, “exchangeName”);//Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the “x-dead-letter-exchange” argument.)
argument.put(“x-dead-letter-routing-key”, “routing-key”);//Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message’s original routing key will be used.(Sets the “x-dead-letter-routing-key” argument.)
argument.put(“x-max-priority”, 100);//Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the “x-max-priority” argument.)
argument.put(“x-queue-mode”, “lazy”);//Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the “x-queue-mode” argument.)
argument.put(“x-queue-master-locator”, “”);//Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.(Sets the “x-queue-master-locator” argument.)
return QueueBuilder.durable(“deladyQueues”).withArguments(argument).autoDelete().exclusive().build();
}
在實際的業務中我們會遇見生產者產生的訊息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支援延遲佇列功能,但是我們可以根據其特性Per-Queue Message TTL和 Dead Letter Exchanges實現延時佇列。也可以通過改特性設定訊息的優先順序。

1.Per-Queue Message TTL
RabbitMQ可以針對訊息和佇列設定TTL(過期時間)。佇列中的訊息過期時間(Time To Live, TTL)有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead message,消費者將無法再收到該訊息。
2.Dead Letter Exchanges
當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。訊息變成Dead Letter一向有以下幾種情況:
訊息被拒絕(basic.reject or basic.nack)並且requeue=false
訊息TTL過期
佇列達到最大長度
實際上就是設定某個佇列的屬性,當這個佇列中有Dead Letter時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange中去,進而被路由到另一個佇列,publish可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支援的immediate引數中的向publish確認的功能。

雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設定了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設定了 TTL 時,過期的 message 可能會排隊於未過期 message 的後面,直到這些訊息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計資訊中被計算進去(例如,queue 中存在的 message 的數量)。對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而第二種方法裡,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期時在即將投遞到消費者之前判定的,為什麼兩者得處理方法不一致?因為第一種方法裡,佇列中已過期的訊息肯定在佇列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期訊息即可,而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息,勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期,再進行刪除。

一、在佇列上設定TTL

1.建立delay.exchange

這裡Internal設定為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的繫結。

2.建立延時佇列(delay queue)

如上配置延時5min佇列(x-message-ttl=300000)

x-max-length:最大積壓的訊息個數,可以根據自己的實際情況設定,超過限制訊息不會丟失,會立即轉向delay.exchange進行投遞

x-dead-letter-exchange:設定為剛剛配置好的delay.exchange,訊息過期後會通過delay.exchange進行投遞

這裡不需要配置"dead letter routing key"否則會覆蓋掉訊息傳送時攜帶的routingkey,導致後面無法路由為剛才配置的delay.exchange

3.配置延時路由規則

需要延時的訊息到exchange後先路由到指定的延時佇列

1)建立delaysync.exchange通過Routing key將訊息路由到延時佇列

2.配置delay.exchange 將訊息投遞到正常的消費佇列

配置完成。

下面使用程式碼測試一下:

生產者:

package cn.slimsmart.study.rabbitmq.delayqueue.queue;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private static String queue_name = "test.queue";  

public static void main(String[] args) throws IOException {  
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setHost("10.1.199.169");  
    factory.setUsername("admin");  
    factory.setPassword("123456");  
    Connection connection = factory.newConnection();  
    Channel channel = connection.createChannel();  
    // 宣告佇列  
    channel.queueDeclare(queue_name, true, false, false, null);  
    String message = "hello world!" + System.currentTimeMillis();  
    channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());  
    System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
    // 關閉頻道和連線  
    channel.close();  
    connection.close();  
}  

}

消費者:

package cn.slimsmart.study.rabbitmq.delayqueue.queue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

private static String queue_name = "test.queue";  

public static void main(String[] args) throws Exception {  
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setHost("10.1.199.169");  
    factory.setUsername("admin");  
    factory.setPassword("123456");  
    Connection connection = factory.newConnection();  
    Channel channel = connection.createChannel();  
    // 宣告佇列  
    channel.queueDeclare(queue_name, true, false, false, null);  
    QueueingConsumer consumer = new QueueingConsumer(channel);  
    // 指定消費佇列  
    channel.basicConsume(queue_name, true, consumer);  
    while (true) {  
        // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)  
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
        String message = new String(delivery.getBody());  
        System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
    }  
}  

}
二、在訊息上設定TTL

實現程式碼:

生產者:

package cn.slimsmart.study.rabbitmq.delayqueue.message;

import java.io.IOException;
import java.util.HashMap;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

private static String queue_name = "message_ttl_queue";  

public static void main(String[] args) throws IOException {  
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setHost("10.1.199.169");  
    factory.setUsername("admin");  
    factory.setPassword("123456");  
    Connection connection = factory.newConnection();  
    Channel channel = connection.createChannel();  
    HashMap<String, Object> arguments = new HashMap<String, Object>();  
    arguments.put("x-dead-letter-exchange", "amq.direct");  
    arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
    channel.queueDeclare("delay_queue", true, false, false, arguments);  

    // 宣告佇列  
    channel.queueDeclare(queue_name, true, false, false, null);  
    // 繫結路由  
    channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  

    String message = "hello world!" + System.currentTimeMillis();  
    // 設定延時屬性  
    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
    // 永續性 non-persistent (1) or persistent (2)  
    AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();  
    // routingKey =delay_queue 進行轉發  
    channel.basicPublish("", "delay_queue", properties, message.getBytes());  
    System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
    // 關閉頻道和連線  
    channel.close();  
    connection.close();  
}  

}

消費者:

package cn.slimsmart.study.rabbitmq.delayqueue.message;

import java.util.HashMap;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

private static String queue_name = "message_ttl_queue";  

public static void main(String[] args) throws Exception {  
    ConnectionFactory factory = new ConnectionFactory();  
    factory.setHost("10.1.199.169");  
    factory.setUsername("admin");  
    factory.setPassword("123456");  
    Connection connection = factory.newConnection();  
    Channel channel = connection.createChannel();  
    HashMap<String, Object> arguments = new HashMap<String, Object>();  
    arguments.put("x-dead-letter-exchange", "amq.direct");  
    arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
    channel.queueDeclare("delay_queue", true, false, false, arguments);  

    // 宣告佇列  
    channel.queueDeclare(queue_name, true, false, false, null);  
    // 繫結路由  
    channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  

    QueueingConsumer consumer = new QueueingConsumer(channel);  
    // 指定消費佇列  
    channel.basicConsume(queue_name, true, consumer);  
    while (true) {  
        // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)  
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
        String message = new String(delivery.getBody());  
        System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
    }  
}  

}

spring-rabbit整合教程

maven依賴:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.6.RELEASE</version>
    </dependency>

spring配置檔案(在檔案頭部引入rabbit的名稱空間和約束檔案):

<?xml version="1.0" encoding="UTF-8"?>

<!-- 定義Rabbit,指定連線工廠 -->
<rabbit:connection-factory id="connectionFactory"  host="你的rabbitMQ服務的ip"  virtual-host="/vhost名稱"  username="使用者名稱"  password="密碼"  port="5672"  />      

<!-- MQ的管理,包括佇列、交換器等 -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- 定義Rabbit模板,指定連線工廠以及定義exchange -->
<rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory"  />	

<!-- queue 佇列宣告 --> 
<!--    durable 是否持久化 ,exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除 ,auto-delete 當所有消費端連線斷開後,是否自動刪除佇列   -->
<rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>

<!-- 交換機定義 -->
<!-- direct-exchange 模式:訊息與一個特定的路由器完全匹配,才會轉發; topic-exchange 模式:按規則轉發訊息,最靈活 -->
<rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
	    <rabbit:bindings>
	        <!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
	        <rabbit:binding  queue="my_queue"  pattern="my_patt"/>
	    </rabbit:bindings>
</rabbit:topic-exchange>


<!-- 引入消費者 -->
<bean id="rabbitmqService" class="com.group.service.RabbitmqService"  />
    
<!-- 配置監聽 消費者   acknowledeg = manual,auto,none   -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
    <!-- queues 監聽佇列,多個用逗號分隔; ref 監聽器 -->
    <rabbit:listener queue-names="my_queue"  ref="rabbitmqService"  method="test"/>
</rabbit:listener-container>

那麼在專案中裝配amqpTemplate中就可以傳送訊息了

作者:MC-閏土
來源:CSDN
原文:https://blog.csdn.net/qq_22075041/article/details/78885113
版權宣告:本文為博主原創文章,轉載請附上博文連結!