1. 程式人生 > >RabbitMQ routingKey for Java【入門教程 4】

RabbitMQ routingKey for Java【入門教程 4】

routingKey :訊息路由

繫結關係         在之前的例子中也使用了類似的方式:

channel.queueBind(queueName, EXCHANGE_NAME, "");

        繫結是交換器和佇列之間的一種關係,使用者微博,微信的例子可以簡單的理解為關注,就是佇列(某屌絲)對交換器(女神)非常感興趣,關注了她,以後女神發的每條微博,屌絲都能看到。

        繫結可以使用routingkey這個引數,是為了避免所有的訊息都使用同一個路由線索帶來的麻煩。為了區分路由規則,我們建立建立一個唯一的路由線索。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

        繫結關係中使用的路由關鍵字【routingkey】是否有效取決於交換器的型別。如果交換器是分發【fanout】型別,就會忽略路由關鍵字【routingkey】的作用。如果交換器的分發為【direct】 關鍵字routingkey才會起作用

直連型別交換器         上一章的例子是通過分發【fanout】型別的交換器【logs】廣播日誌資訊,現在我們將日誌分debug、info、warn、error這幾種基本的級別,實際在生產環境中,避免磁碟空間浪費,應用只會將error級別的日誌打印出來。而分發【fanout】型別的交換器會將所有基本的日誌都發送出來,如果我們想只接收某一級別的日誌資訊,就需要使用直連【direct】型別的交換器了, 下面的圖中,佇列1通過ERROR這個routingkey繫結到E交換器,佇列2通過WARN和INFO繫結到E交換器,E交換器的型別是直連【direct】的,如果生產者【P】發出ERROR的日誌,只會有佇列1會收到,如果生產者【P】發出INFO和WARN的日誌,只有佇列2會收到,如果生產者【P】發出DEBUG級別的日誌,佇列1和佇列2都會忽略它。(理解這個圖  我們的程式碼就容易理解)

我們還是以訊息為例:先看看我們程式碼所走的流程圖

 上圖可以看出 smsConsumer只接受sms資訊 而emailConsumer接受sms和email資訊,那麼怎麼決定接受什麼型別的資訊呢?

所以這裡routingKey起到關鍵性作用

直接上程式碼 專案結構:

生產者:

package wxtest.rabbitMq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class msgProduce {

    private static String EXCHAGE_NAME = "exchange_direct";//設定一個交換機
    private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個路由
    private static String ROUTINGKEY_email = "routingkey_emai";//設定一個路由

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHAGE_NAME, "direct");//fanout屬於主題訂閱方式 設定路由無效  direct和路由配合使用

        //分發訊息
        for (int i = 0; i < 10; i++) {
            String message = "我是交換機的訊息 " + i;
            //basicPublish釋出的第一個引數屬於交換機  第二個引數屬於按照routingKey方式發放 分別向email和sms發放資訊
            //一部分訊息作為email傳送 一部分訊息作為sms傳送 我們可以看看消費者的消費情況
            if (i % 2 == 0) {
                channel.basicPublish(EXCHAGE_NAME, ROUTINGKEY_email, null, message.getBytes());
            } else {
                channel.basicPublish(EXCHAGE_NAME, ROUTINGKEY_SMS, null, message.getBytes());
            }
            System.out.println(" [x] Sent '" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

emailConsumer消費者: 注意觀察路由情況

package wxtest.rabbitMq.routing;

import com.rabbitmq.client.*;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class emailConsumer {
    private static String exchage_name = "exchange_direct";//設定一個交換機
    private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個路由
    private static String ROUTINGKEY_email = "routingkey_emai";//設定一個路由


    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchage_name, "direct");//繫結交換機
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchage_name, ROUTINGKEY_SMS);//繫結交換機所對應的佇列  設定路由
        channel.queueBind(queueName, exchage_name, ROUTINGKEY_email);//繫結交換機所對應的佇列 設定路由
        System.out.println(" emailConsumer Waiting for messages. To exit press CTRL+C");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println(" emailConsumer [x] Received '" + msg + "'");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

smsConsumer消費者: 注意觀察路由情況

package wxtest.rabbitMq.routing;

import com.rabbitmq.client.*;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class smsComsumer {
    private static String exchage_name = "exchange_direct";//設定一個交換機
    private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個交換機

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchage_name, "direct");//繫結交換機
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchage_name, ROUTINGKEY_SMS);//繫結交換機所對應的佇列 只接受sms資訊
        System.out.println(" smsComsumer Waiting for messages. To exit press CTRL+C");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println(" smsComsumer [x] Received '" + msg + "'");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

輸出結果如下:

 

上述程式碼將交換機 路由的設定都已經在程式碼中得以體現 ,仔細敲一遍  感受更深刻哦!