1. 程式人生 > >RabbitMQ入門教程(六):路由選擇Routing

RabbitMQ入門教程(六):路由選擇Routing

簡介

本節主要演示使用直連線型別,將多個路由鍵繫結到同一個佇列上。也可以將同一個鍵繫結到多個佇列上(多重繫結multiple bindings),此時滿足鍵的佇列都能收到訊息,不滿足的直接被丟棄。

這裡寫圖片描述

這裡寫圖片描述

生產者

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1"
); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Routing 的路由規則使用直連線 String EXCHANGE_NAME = "exchange.direct.routing"
; String[] routingKeys = {"debug", "info", "warning", "error"}; for (int i = 0; i < 20; i++){ int random = (int)(Math.random() * 4); String routingKey = routingKeys[random]; String message = "Hello RabbitMQ - " + routingKey + " - " + i; channel.basicPublish(EXCHANGE_NAME, routingKey, null
, message.getBytes("UTF-8")); } // 關閉資源 channel.close(); connection.close(); } }

消費者1

public class Consumer1 {
    @Test
    public void testBasicConsumer1() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.direct.routing";
        // 生成一個隨機的名稱,queueDeclare()方法沒有任何引數,當最後一個消費者斷開時就會刪除掉該佇列,當消費者結束後可以看到佇列就刪除了
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 在消費者端佇列繫結

        // 將一個對列繫結多個路由鍵
        String[] routingKeys = {"debug", "info"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received '" + message + "', 處理業務中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}

消費者2

public class Consumer2 {
    @Test
    public void testBasicConsumer2() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();


        String EXCHANGE_NAME = "exchange.direct.routing";
        // 生成一個隨機的名稱
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 在消費者端佇列繫結

        // 將一個對列繫結多個路由鍵
        String[] routingKeys = {"warning", "error"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received '" + message + "', 處理業務中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}

執行結果

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述