訊息中介軟體--RabbitMQ學習(十七)---高階特性之死信佇列
阿新 • • 發佈:2018-12-19
死信佇列:DLX,Dead- Letter- Exchange
- 利用DLX,當訊息在一個佇列中變成死信( dead message)之後它能被重新 publish到另一個 Exchange,這個 Exchange就是DLX
死信佇列訊息變成死信有一下幾種情況
- 訊息被拒絕( basic. reject/ basic nack)並且 requeue= false
- 訊息TTL過期
- 佇列達到最大長度
死信佇列
- DLX也是一個正常的 Exchange,和一般的 Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性
- 當這個佇列中有死信時, Rabbitmq就會自動的將這個訊息重新發佈設置的 Exchange上去,進而被路由到另一個佇列
- 可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補 rabbitmq3.0以前支援的 Immediate引數的功能
- 死信佇列設定
- 首先需要設定死信佇列的 exchange和 queue,然後進行繫結
- Exchange: dix. exchange
- Queue: dix.queue
- Routingkey:#
- 然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在佇列加上一個引數即可: arguments;put(“x- dead-letter-exchangedlx exchange”)
消費端程式碼實現
public class Consumer { public static void main(String[] args) throws Exception{ //1 建立一個connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.159"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2通過連線工場建立連線 Connection connection = connectionFactory.newConnection(); //3通過connection建立channel Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName,"topic",true,false,null); Map<String,Object> arguments = new HashMap<>(); //進行死信佇列引數的配置 arguments.put("x-dead-letter-exchange","dlx.exchange"); //將死信佇列繫結到一個佇列上 channel.queueDeclare(queueName,true,false,false,arguments); channel.queueBind(queueName,exchangeName,routingKey); //進行死信佇列的申明 channel.exchangeDeclare("dlx.exchange","topic",true,false,null); channel.queueDeclare("dlx.queue",true,false,false,null); channel.queueBind("dlx.queue","dlx.exchange","#"); //如果要使用限流方式 必須關閉自動簽收下面的false channel.basicConsume(queueName,true,new MyConsumer(channel)); } }
生產端程式碼
public class Producter { public static void main(String[] args) throws Exception{ //1 建立一個connectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.0.159"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2通過連線工場建立連線 Connection connection = connectionFactory.newConnection(); //3通過connection建立channel Channel channel = connection.createChannel(); //開啟訊息的確認模式 channel.confirmSelect(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.save"; //傳送訊息 // channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes()); for(int i=0;i<1;i++){ String msg = "hello"; AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties properties = builder.expiration("10000") .expiration("10000") .contentEncoding("UTF-8") .deliveryMode(2).build(); channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes()); } } }``` ### 自定義消費者
public class MyConsumer extends DefaultConsumer { private Channel channel; public MyConsumer(Channel channel) { super(channel); this.channel=channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("consumerTag: "+consumerTag); System.err.println("envelope "+envelope); System.err.println("properties "+properties); System.err.println("body "+new String(body)); } }