1. 程式人生 > >訊息中介軟體--RabbitMQ學習(十七)---高階特性之死信佇列

訊息中介軟體--RabbitMQ學習(十七)---高階特性之死信佇列

死信佇列:DLX,Dead- Letter- Exchange

  • 利用DLX,當訊息在一個佇列中變成死信( dead message)之後它能被重新 publish到另一個 Exchange,這個 Exchange就是DLX

死信佇列訊息變成死信有一下幾種情況

  • 訊息被拒絕( basic. reject/ basic nack)並且 requeue= false
  • 訊息TTL過期
  • 佇列達到最大長度

死信佇列

  • DLX也是一個正常的 Exchange,和一般的 Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性
  • 當這個佇列中有死信時, Rabbitmq就會自動的將這個訊息重新發佈設置的 Exchange上去,進而被路由到另一個佇列
  • 可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補 rabbitmq3.0以前支援的 Immediate引數的功能
  • 死信佇列設定
  • 首先需要設定死信佇列的 exchange和 queue,然後進行繫結
  • Exchange: dix. exchange
  • Queue: dix.queue
  • Routingkey:#
  • 然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在佇列加上一個引數即可: arguments;put(“x- dead-letter-exchangedlx exchange”)

消費端程式碼實現

public class Consumer {

    public static void main(String[] args) throws Exception{
        //1 建立一個connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.159");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2通過連線工場建立連線
        Connection connection = connectionFactory.newConnection();
        //3通過connection建立channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";

        channel.exchangeDeclare(exchangeName,"topic",true,false,null);
        Map<String,Object> arguments = new HashMap<>();
        //進行死信佇列引數的配置
        arguments.put("x-dead-letter-exchange","dlx.exchange");
        //將死信佇列繫結到一個佇列上
        channel.queueDeclare(queueName,true,false,false,arguments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //進行死信佇列的申明
        channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
        channel.queueDeclare("dlx.queue",true,false,false,null);
        channel.queueBind("dlx.queue","dlx.exchange","#");


        //如果要使用限流方式  必須關閉自動簽收下面的false
        channel.basicConsume(queueName,true,new MyConsumer(channel));
    }
}

生產端程式碼

public class Producter {
    public static void main(String[] args) throws Exception{
        //1 建立一個connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.159");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2通過連線工場建立連線
        Connection connection = connectionFactory.newConnection();
        //3通過connection建立channel
        Channel channel = connection.createChannel();
        //開啟訊息的確認模式
        channel.confirmSelect();
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.save";

        //傳送訊息

//        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes());
        for(int i=0;i<1;i++){
            String msg = "hello";
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            AMQP.BasicProperties properties = builder.expiration("10000")
                    .expiration("10000")
                    .contentEncoding("UTF-8")
                    .deliveryMode(2).build();
            channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes());
        }
    }
}```
### 自定義消費者

public class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel=channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("consumerTag: "+consumerTag); System.err.println("envelope "+envelope); System.err.println("properties "+properties); System.err.println("body "+new String(body)); } }