1. 程式人生 > >RabbitMQ訊息分發模式----"Publish/Subscribe"釋出/訂閱模式

RabbitMQ訊息分發模式----"Publish/Subscribe"釋出/訂閱模式

介紹

我們都是基於一個佇列傳送和接受訊息。 

前面講的幾種,不管是生產者端還是消費者端都必須知道一個指定的QueueName才能傳送、獲取訊息。  而RabbitMQ訊息模型的核心思想是生產者不會將訊息直接傳送給佇列。現在介紹一下完整的訊息傳遞模式:

如果同一個訊息,要求每個消費者都處理的話,就需要RabbitMQ提供的訊息分發模式中的------"Publish/Subscribe"釋出/訂閱模式。

因為,生產者通常不會知道訊息會被哪些消費者接收。

下面將的模式,生產者雖然不是直接將訊息傳送給Queue,但是會交給Exchange,所以需要定義Exchange的訊息分發模式 ,之前的程式中,有如下一行程式碼:

channel.basicPublish("", queueName , null , msg.getBytes());

第一個引數為空字串,其實第一個引數就是用來指定轉發器名稱----ExchangeName的,這裡用空字串,就表示訊息會交給預設的Exchange。

Exchange----轉發器

它的作用是一方面它接受生產者的訊息,另一方面向佇列推送訊息。

轉發器必須清楚的知道如何處理接收到的訊息。附加一個特定的佇列嗎?附加多個佇列?或者是否丟棄?這些規則通過轉發器的型別進行定義。

Exchange的型別有:Direct、Topic(主題模式)、Headers和Fanout。

1.Fanout--廣播模式


我們可以通過以下程式碼建立一個指定型別的轉發器:

channel.exchangeDeclare("logs", "fanout"); 
fanout轉發器非常簡單,從名字就可以看出,它是廣播接受到的訊息給所有的佇列。
現在我們可以指定轉發器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());  

你可能還記得之前我們用佇列時,會指定一個名字。佇列有名字對我們來說是非常重要的——我們需要為消費者指定同一個佇列。

但這並不是我們的日誌系統所關心的。我們要監聽所有日誌訊息,而不僅僅是一類日誌。我們只對對當前流動的訊息感興趣。解決這些問題,我們需要完成兩件事。

首先,每當我盟連線到RabbitMQ時,需要一個新的空佇列。為此我們需要建立一個隨機名字的空佇列,或者更好的,讓伺服器選好年則一個隨機名字的空佇列給我們。

其次,一旦消費者斷開連線,佇列將自動刪除。

我們提供一個無參的queueDeclare()方法,建立一個非持久化、獨立的、自動刪除的佇列,且名字是隨機生成的。

String queueName = channel.queueDeclare().getQueue();  

Binding----繫結

我們建立了一個廣播的轉發器和隨機佇列。我們需要告訴轉發器轉發訊息的佇列。這個關聯轉發器和佇列的我們叫它Binding。


channel.queueBind(queueName, "logs", "");  
完整程式碼:
傳送端:
package cn.rabbitmq.publish;
import java.io.IOException;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendTest {
    private final static String EXCHANGE_NAME = "logs";  
    public static void main(String[] args) throws IOException {  
        /** 
         * 建立連線連線到MabbitMQ 
         */  
        Connection connection = ConnectionUtil.getConnection();
        // 建立一個頻道  
        Channel channel = connection.createChannel();  
        // 指定轉發——廣播  
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
          for(int i=0;i<3;i++){  
            // 傳送的訊息  
            String message = "Hello World!";  
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }          
        // 關閉頻道和連線  
        channel.close();  
        connection.close();  
    }  
}
接收端--(兩個一樣):
package cn.rabbitmq.publish;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import cn.rabbitmq.util.ConnectionUtil;
public class FanoutRecv1 {
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 宣告一個隨機佇列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定接收者,第二個引數為自動應答,無需手動應答
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

2.driect---直接指定模式


任何傳送到Direct Exchange的訊息都會被轉發到RouteKey中指定的Queue

1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字串,下文稱其為default Exchange)。

2.這種模式下不需要將Exchange進行任何繫結(binding)操作

3.訊息傳遞時需要一個“RouteKey”,可以簡單的理解為要傳送到的佇列名字。

4.如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。

生產者端程式碼:

package cn.rabbitmq.publish;
import java.io.IOException;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SendTest {
    private final static String EXCHANGE_NAME = "logs1";  
        public static void main(String[] args) throws IOException {  
        /** 
         * 建立連線連線到MabbitMQ 
         */  
        Connection connection = ConnectionUtil.getConnection();
        // 建立一個頻道  
        Channel channel = connection.createChannel();  
        // 指定轉發——廣播  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
  
        for(int i=0;i<3;i++){  
            // 傳送的訊息  
            String message = "Hello World!";  
            channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");  
        }  
        // 關閉頻道和連線  
        channel.close();  
        connection.close();  
    }  
}
消費者端程式碼:

消費者一:routKey與生產者相同

package cn.rabbitmq.publish;
import java.io.IOException;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class FanoutRecv2 {
    private static final String EXCHANGE_NAME = "logs1";
    private final static String QUEUE_NAME = "logs1";
    public static void main(String[] argv) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 宣告一個隨機佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定接收者,第二個引數為自動應答,無需手動應答
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}<strong>
</strong>
消費者二:routKey與生產者不同
package cn.rabbitmq.publish;
import java.io.IOException;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class FanoutRecv2 {
    private static final String EXCHANGE_NAME = "logs1";
    private final static String QUEUE_NAME = "logs1";
    public static void main(String[] argv) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 宣告一個隨機佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 指定接收者,第二個引數為自動應答,無需手動應答
        channel.basicConsume(QUEUE_NAME, true, consumer);
<span style="white-space:pre">	</span>while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
注意:啟動時先啟動一次生產者,然後再啟動消費者。如果沒有接收到訊息再啟動一次生產者即可達到想要的效果。

http://blog.csdn.net/lmj623565791/article/details/37657225