1. 程式人生 > >RabbitMQ使用教程(四):釋出/訂閱模式—Publish/Subscribe

RabbitMQ使用教程(四):釋出/訂閱模式—Publish/Subscribe

一、釋出/訂閱模式說明

今天我們來學習一點新的東西,之前我們是將一個訊息傳送給了一個特定的消費者,今天的做法完全不同,不再發送給某一個消費者,而是將一個訊息傳送給多個消費者,這便是:釋出/訂閱模式。

我們將使用該模式來實現一個日誌系統:一個程式產生日誌,一個程式處理日誌。

二、認識交換機

在之前的教程中,我們用郵局作了比喻,回顧一下我們之前的理解:

一個生產者是使用者程式,傳送訊息

一個佇列是存放訊息的地方,可以理解為緩衝區

一個消費者使用者程式,接收訊息

關鍵問題在於之前忘了講:生產者的訊息並不是直接傳送給佇列的,生產者本身也不知道要把訊息發給哪個佇列,而是將訊息傳送給交換機,再由交換機發送給佇列。
這裡寫圖片描述

生產者的訊息並不是直接傳送給佇列的,而是傳送給交換機的,交換機再發給佇列的。

交換機理解起來非常簡單,它一端接收生產者傳送過來的訊息,一端將訊息傳送給佇列。所以,當交換機接收到一個訊息時,它必須作出相應的處理,是發給一個特定的佇列?還是發給多個佇列?還是丟棄…

處理的方式取決於交換機的型別!

這裡有四種可用的交換機型別:direct、topic、headers、fanout

我們使用最後一種fanout,建立該型別的交換機:

channel.exchangeDeclare("logs", "fanout");

fanout交換機,你很容易猜到它的作用:將接收到的訊息廣播給每一個佇列。

三、無名交換機(預設)

在之前的教程中,我們根本不知道交換機的概念,但我們還是將訊息成功地傳送了出去。
回顧一下之前的程式碼:

channel.basicPublish("", "hello", null, message.getBytes());

注意第一個引數,我們填了”“,這個”“正是預設交換機的名稱,它的作用是根據routingKey,傳送訊息。發現了routingKey:hello,便會把訊息傳送給hello佇列。到這裡我們已經明白:第一個引數是交換機名稱,第二個引數是routingKey名稱。

現在我們需要建立日誌系統所需的交換機:

channel.basicPublish
( "logs", "", null, message.getBytes());
四、臨時佇列

大家應該記得我們之前的佇列:”hello”、”task_queue”,我們的每一個佇列都有一個名稱。沒錯,佇列的名稱很重要,當我們需要特定的消費者來處理特點佇列中的訊息,消費者就是根據佇列名來找到佇列的。但是這在我們的日誌系統中不適用,消費者需要接收所有的訊息,而不是一個一部分,另外我們也只對剛剛產生的訊息進行處理,日誌一旦產生,就應該及時記錄或列印。為了做到這兩點,我們需要做兩件事:

  1. 無論何時連線到RabbitMQ的都是一個最新的、空的佇列。我們可以建立一個隨機命名的佇列,或者讓RabbitMQ幫我們隨機命名。
  2. 一旦消費者斷開連線,該佇列就馬上把刪除(不再接受新的訊息)

我們可以使用queueDeclare()方法實現上面兩點,建立一個不持久、獨佔的、自動刪除的、隨機命名的佇列。獨佔佇列是指:可以私用的佇列,指定某個特定的連線可用,連線斷開就自動刪除。隨機命名如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

String queueName=channel.queueDeclare().getQueue();
五、佇列繫結交換機

我們已經建立了一個fanout型別的交換機:”logs”,現在我們需要指定該交換機的訊息需要傳送的佇列,這個指定關係稱為繫結。
這裡寫圖片描述

channel.queueBind(queueName, "logs", "");
六、程式碼實現

我們之前是使用的預設交換機,指定routingKey為佇列名稱的方式來發送訊息。

現在我們使用了fanout型別的”logs”交換機,(臨時)佇列與交換機繫結。這時候就不要routingKey了。

EmitLog.java



import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        for (int i = 1; i <=10; i++) {
            String message = "message" + i;
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());// arg0:不再使用""預設交換機,args1:有了交換機名,不再填對routingKey名
        }
        channel.close();
        connection.close();
    }

}

ReceiveLog1.java


import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLog1 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();// 產生一個不持久化、獨佔的、自動刪除的、隨機命名的佇列
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}


首先執行接收端,再執行傳送端,ReceiveLog2.java與ReceiveLog1.java的程式碼大同小異,這裡建議使用IDEA,可以很直觀的看到結果。因為,強大的IDEA不會有console覆蓋問題。

執行結果:

這裡寫圖片描述

這裡寫圖片描述

參考命令:

列出所有交換機:

rabbitmqctl list_exchanges

列出所有繫結關係:

rabbitmqctl list_bindings