1. 程式人生 > >rabbitmq5-釋出訂閱模式

rabbitmq5-釋出訂閱模式

前面的幾篇博文中講解的是兩個簡單的模式,並沒有提到exchange互動及這個東西,那麼是時候該引入完整的模式了,我們將引入exchange,同時介紹exchange的幾種工作的模式

一、釋出訂閱模式概念及應用場景:

1、概念

釋出訂閱模式使用的是“fanout”扇出模式,即exchange將忽略routing key把其投遞到所有繫結的佇列中

2、應用場景:
  • 廣播:例如通知所有店小二,週五放假

二、關鍵元件“exchange”:

2.1、exchange的幾種模式:
  • direct:預設的一種模式,訊息分發規則為:訊息會被投遞到binding key和routing key完全匹配的佇列中去
  • topic:和direct類似,不過binding key和routing key的匹配規則可以使用正則表示式
  • header:將忽略routing key,是使用傳送訊息是的basicProperties物件中的headers來進行匹配的,headers是一個鍵值對型別傳送訊息的header與佇列繫結時鍵值對匹配時,佇列繫結交換機時使用x-match,在匹配是有兩種模式,①:any只要匹配其中一個就可以,②:all則表示所有的都要完全匹配
  • fanout:本節要講的一種模式。同上面一樣,訊息投遞的時候將忽略routing key,會把接收到的訊息投遞到所有與之繫結的佇列中去
2.2 留一個問題思考一下:
  • 1、publishe端的肯定很好解決
  • 2、subscriber端的,以及這個訊息什麼時候被刪除,因為不可能一直儲存在隊列當中的呀

三、把程式碼呈上來:

3.1、publisher:
public class Publisher {

    public static final String EXCHANGE_NAME = "publisher";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        Channel channel = conn.createChannel();
        BuiltinExchangeType type = BuiltinExchangeType.FANOUT;
        boolean
durable = true; boolean autoDelete = false; /** * 是否為內部交換機,如果是則不能與外部client直接,通用用於例如死信佇列 */ boolean internal = false; // 在這裡最後一個引數還是不做講解,在後面高階的部分在做講解,死信佇列、延遲佇列、優先順序佇列 channel.exchangeDeclare(EXCHANGE_NAME,type,durable,autoDelete,internal,null); // 這個裡面我們不指定 routing key,因為沒有意義,在fanout模式下會被忽視掉的 channel.basicPublish(EXCHANGE_NAME,"",null,"我假裝是你們想要傳送的訊息".getBytes()); } }
3.1、subscriber:

這個裡面和官方給的程式碼不一樣,如果不宣告佇列使用官方的形式的話,將會有一個臨時的佇列,

public class Subscriber01 {
    public static final String EXCHANGE_NAME = "publisher";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection conn = ConnUtils.getConn();
        final Channel channel = conn.createChannel();
        channel.queueDeclare("publisher",true,false,false,null);
        // 因為routing key會被忽略,所以還是無需指定
        // 這個地方queue:佇列的名稱是需要指定的
        channel.queueBind("publisher",EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        boolean autoAck = false;
        channel.basicConsume("publisher",autoAck,consumer);
    }
}

下面我們來回答上面的問題:
在上面的測試中,我們兩個訂閱者繫結的是同一個佇列,結果只有其中的一個subscriber收到了訊息,另外一個沒有收到,這個是正常的,因為rabbitmq當消費者應答之後,mq會自動的把訊息刪除,現在我們看一下概念,釋出訂閱模式指的是:exchange忽略routing key把訊息投遞到所有的繫結它的佇列,並不是說投遞到所有的消費者,因此我們前面的假設都是有問題的,這也是我在學習的過程中算是胡思亂想的吧,希望大家能夠明悟。

如果有疑問或者是需要交流可以在下面評論也可以掃碼進群