1. 程式人生 > >《RabbitMQ官方指南》路由

《RabbitMQ官方指南》路由

Routing

上一節我們建立了一個簡單的日誌系統,已經能夠傳播日誌資訊給接收者了。

在這一節我們將給它增加一個特性-訂閱部分訊息。比如說,我們能夠從控制檯列印的所有日誌資訊中將至關重要的錯誤資訊指向日誌檔案(儲存在硬盤裡)。

Bindings

在前面的例子中我們已經創造了bindings,你可以這樣呼叫它

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個bindings就是queue和exchange之間的一種對映關係(多對多關係),將queue繫結到exchange上,也可以這樣理解: queue對來自此exchange的訊息感興趣(傳遞訊息)。

Bindings建立的時候也可以同時附帶一個額外的名為 routingKey 的引數,為了避免與 basic_publish 引數混淆,我們把他叫做 binding key .我們可以這樣建立一個帶有binding key的 bindings。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

這意味著一個 binding key 的值取決於於exchange的型別,像前面提到的 fanout 型別的 exchange, 就會忽略它的值。

Direct exchange

在前幾節中我們的日誌系統傳播所有的資訊給所有的消費者,我們想基於它實現根據資訊的嚴重程度過濾資訊的功能,比如說我們想要一個程式只會將error級別的日誌寫入硬碟,warning和info級別的日誌就不浪費硬碟空間了。

我們前面使用的是 fanout 型別的exchange,它並沒有給我們更多的靈活性 – 它只會無意識的傳播所有它接收到的資訊

這裡我們將使用一個 direct 型別的exchange 替換它, direct exchange的路由演算法是非常簡單的,只有當目標queue的 binding key 和 訊息的routing key完全相等時才會進行路由。

為了詳細說明它,考慮下面的流程:

在這個流程裡面,我們可以看到一個名為x的 direct exchange綁定了兩個queue,第一個queue通過名為orange 的 binding key 繫結,第二個佇列有兩個 bindings, 一個叫 black

,另一個叫green.

在上述流程中一個routing key為 orange 的訊息將被路由到Q1這個queue上面,routing key為black或者green的訊息將被路由到Q2這個queue上。其他的訊息將會被丟棄。

Multiple bindings

其實通過相同的binding key繫結多個queue也是完全可以的,在我們的示例中我們可以將xQ1也通過black binding key繫結在一起,此時這個direct exchange將和fanout exchange傳播資訊的行為一樣,都會將資訊傳播給相匹配的queue。所以routing key為black的訊息將被傳遞到Q1和Q2兩個queue裡面.

傳送日誌

下面我們將為我們的日誌系統使用這個模型。用direct exchange 替換掉fanout exchange來發送訊息,我們將把日誌的嚴重程度(severity)作為一個routing key.然後接收程式就可以根據訊息的嚴重程度(severity)來接收。首先,讓我們來看看傳送日誌部分。
像往常一樣,首先我們需要建立一個exchange:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

傳送訊息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

為了簡化事情我們假定’severity’ 就是’info’,’warning’或者’error’,即日誌訊息的嚴重程度有info,warning或者error三種級別。

訂閱

接收訊息部分除了建立binding時附帶上severity之外和前面章節都相同:

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

看看效果圖:

傳送訊息程式碼類EmitLogDirect.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

        channel.close();
        connection.close();
    }
}

接收訊息程式碼類ReceiveLogsDirect.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

將java程式碼編譯成class檔案(看第一節對編譯和類路徑選擇的建議),執行的時候為了方便我們用 $CP 環境變數(windows 上是 %CP%)代替類路徑.

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

如果你僅僅想將‘warning’和‘error’(不包括’info’)級別的訊息儲存進一個日誌檔案,只需開啟一個控制檯輸入:

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想輸出所有級別的日誌資訊在你的螢幕上,只需開啟終端這樣輸入:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

傳送一個error級別的日誌訊息:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

第五節檢視如何基於模式去監聽訊息