1. 程式人生 > >rabbitmq訊息佇列設定過期時間和過期訊息處理

rabbitmq訊息佇列設定過期時間和過期訊息處理

rabbitmq訊息佇列設定過期時間和過期訊息處理

適用場景

  • 電商秒殺搶購活動中處理使用者下單和付款時間不一致,設定過期時間,過期則不允許付款

參考 https://blog.csdn.net/zhu_tianwei/article/details/53563311

程式碼塊

生產者

public class Producer {
    private static String queue_name = "message_ttl_queue";

    public static void main(String[] args) throws
Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.2"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5672); factory.setVirtualHost("/guest"); Connection connection = factory
.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey"); channel
.queueDeclare("delay_queue", true, false, false, arguments); // 宣告佇列 channel.queueDeclare(queue_name, true, false, false, null); // 繫結路由 channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); String message = "hello world!" + System.currentTimeMillis(); // 設定延時屬性 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 永續性 non-persistent (1) or persistent (2) AMQP.BasicProperties properties = builder.expiration("5000").deliveryMode(2).build(); // routingKey =delay_queue 進行轉發 channel.basicPublish("", "delay_queue", properties, message.getBytes()); System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis()); // 關閉頻道和連線 channel.close(); connection.close(); } }

消費者

public class Receiver {

    private static String queue_name = "message_ttl_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("192.168.31.2");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
        factory.setVirtualHost("/guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 宣告佇列
        channel.queueDeclare(queue_name, true, false, false, null);
        // 繫結路由
        channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");

//        channel.exchangeDeclare("exchange_name", "direct", true);
//        channel.queueBind("queue_name", "exchange_name", "");
        channel.basicConsume(queue_name, false,
                    new DefaultConsumer(channel) {
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope,
                                                   AMQP.BasicProperties properties,
                                                   byte[] body) throws IOException {
                            String routingKey = envelope.getRoutingKey();
                            String contentType = properties.getContentType();
                            String str = new String(body);
                            System.out.println(str + " " + System.currentTimeMillis());
                            long deliveryTag = envelope.getDeliveryTag();
                            // (process the message components here ...)
                            channel.basicAck(deliveryTag, false);
                        }
                    });


        System.out.println(channel);
    }
}

自動回覆與手動回覆

channel.basicConsume(queue_name, false, ...)中flase表示手動回覆,此時要手動回覆,不然訊息會持久存在佇列中,回覆程式碼如下
long deliveryTag = envelope.getDeliveryTag();
                            // (process the message components here ...)
                            channel.basicAck(deliveryTag, false);