1. 程式人生 > >RabbitMQ使用教程(六):更強大的交換機—Topics

RabbitMQ使用教程(六):更強大的交換機—Topics

一、Topics交換機

之前我們學習了釋出/訂閱模式、路由模式,其中一個使用了最簡單的fanout交換機,一個使用了帶個性化的direct交換機,儘管direct在一定程度上提供了個性化操作入口,改善了我們的日誌系統,但是還遠遠不夠,因為需求總是千奇百怪的,direct的限制在於:不支援多重標準。

還是以我們的日誌系統為例,我們不僅需要根據嚴重級別來處理日誌,還可能需要根據不同的來源作出不同的處理,這時候就需要使用更大的交換機:Topics。Topics交換機可以實現更加複雜的訊息傳送規則,即傳送訊息時,指定更為複雜的routingKey。

二、Topics的配置規則

使用Topics的交換機,我們就不能使用任意名字的routingKey,其必須遵守其規則:以一串words命名,使用逗號分隔。

比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.

可以隨意地增加routingKey的words個數,但是最大長度為255個位元組。

Topics的邏輯跟direct大同小異,它與direct一樣,可以發給多個特定的消費者,但是其有兩點值得關注的:

①:*可以代表任何一個單詞

②:#可以代表0個或多個單詞

如下圖:
這裡寫圖片描述
我們的生產者打算髮送關於動物的訊息:

Q1佇列綁定了:* . orange . *
Q2佇列綁定了:* . * . rabbit、lazy . #

這裡說明Q1對橘色的動物感興趣,而Q2對兔子、懶的動物感興趣。
一下是一些訊息指定的routingKey,以及它將會發送到的佇列:

quick.orange.rabbit:二者

“lazy.orange.elephant”:二者

“quick.orange.fox”:Q1

“lazy.brown.fox” :Q2

“quick.brown.fox”:無(該訊息將會被丟棄)

“orange”:無

“quick.orange.male.rabbit”:無(位數過長,訊息將被丟棄)

“lazy.orange.male.rabbit”:Q2

三、特殊的Topics

Topics交換機非常的強大,它也可以實現fanout與direct那樣的使用效果,當一個佇列的routingKey繫結為:“#”時,它接收所有訊息;當其繫結為“*”、“#”都沒有使用時,其相當於direct。

四、程式碼實現

Topics交換機非常的強大,它也可以實現fanout與direct那樣的使用效果,當一個佇列的routingKey繫結為:“#”時,它接收所有訊息;當其繫結為“*”、“#”都沒有使用時,其相當於direct。

我們將error日誌分層處理,sql異常的儲存在sql異常相關的磁碟,service異常儲存在service相關的目錄,為了簡單,我們使用列印語句代替將要執行的操作。

EmitTopicLog.java:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitTopicLog {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //debug日誌
        for (int i = 1; i <= 3; i++) {
            String message = "debug_message" + i;
            channel.basicPublish(EXCHANGE_NAME, "debug", null, message.getBytes());
        }
        //info日誌
        for (int i = 1; i <= 3; i++) {
            String message = "info_message" + i;
            channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
        }
        //sql error日誌
        for (int i = 1; i <= 3; i++) {
            String message = "error_message" + i;
            channel.basicPublish(EXCHANGE_NAME, "userQuery.sql.error", null, message.getBytes());
        }
        //sql error日誌
        for (int i = 1; i <= 3; i++) {
            String message = "error_message" + i;
            channel.basicPublish(EXCHANGE_NAME, "userUpdate.sql.error", null, message.getBytes());
        }
        //service error日誌
        for (int i = 1; i <= 3; i++) {
            String message = "error_message" + i;
            channel.basicPublish(EXCHANGE_NAME, "userLogin.service.error", null, message.getBytes());
        }
        channel.close();
        connection.close();
    }
}

ReceiveLog1.java:


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLog1 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

ReceiveLog2.java:


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLog2 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(message);
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

ReceiveLog3.java:


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLog3 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "*.sql.error");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("將"+message+"記錄到sql異常相關的目錄");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

ReceiveLog4.java:


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLog4 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "*.service.error");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                FileUtil.save(message);
                System.out.println("將"+message+"記錄到service異常相關的目錄");
            }
        };
        channel.basicConsume(queueName,true,consumer);
    }
}

執行結果:
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述

五、總結

將釋出/訂閱模式、路由模式、Topic模式進行對比,很容易發現以下結論:

Publish/Subcribe、Routing、Topics都是圍繞交換機進行的,差別僅是交換機的型別不同。

下篇將和大家一起來探索遠端呼叫RPC