RabbitMQ routingKey for Java【入門教程 4】
routingKey :訊息路由
繫結關係 在之前的例子中也使用了類似的方式:
channel.queueBind(queueName, EXCHANGE_NAME, "");
繫結是交換器和佇列之間的一種關係,使用者微博,微信的例子可以簡單的理解為關注,就是佇列(某屌絲)對交換器(女神)非常感興趣,關注了她,以後女神發的每條微博,屌絲都能看到。
繫結可以使用routingkey這個引數,是為了避免所有的訊息都使用同一個路由線索帶來的麻煩。為了區分路由規則,我們建立建立一個唯一的路由線索。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
繫結關係中使用的路由關鍵字【routingkey】是否有效取決於交換器的型別。如果交換器是分發【fanout】型別,就會忽略路由關鍵字【routingkey】的作用。如果交換器的分發為【direct】 關鍵字routingkey才會起作用
直連型別交換器 上一章的例子是通過分發【fanout】型別的交換器【logs】廣播日誌資訊,現在我們將日誌分debug、info、warn、error這幾種基本的級別,實際在生產環境中,避免磁碟空間浪費,應用只會將error級別的日誌打印出來。而分發【fanout】型別的交換器會將所有基本的日誌都發送出來,如果我們想只接收某一級別的日誌資訊,就需要使用直連【direct】型別的交換器了, 下面的圖中,佇列1通過ERROR這個routingkey繫結到E交換器,佇列2通過WARN和INFO繫結到E交換器,E交換器的型別是直連【direct】的,如果生產者【P】發出ERROR的日誌,只會有佇列1會收到,如果生產者【P】發出INFO和WARN的日誌,只有佇列2會收到,如果生產者【P】發出DEBUG級別的日誌,佇列1和佇列2都會忽略它。(理解這個圖 我們的程式碼就容易理解)
我們還是以訊息為例:先看看我們程式碼所走的流程圖
上圖可以看出 smsConsumer只接受sms資訊 而emailConsumer接受sms和email資訊,那麼怎麼決定接受什麼型別的資訊呢?
所以這裡routingKey起到關鍵性作用
直接上程式碼 專案結構:
生產者:
package wxtest.rabbitMq.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import wxtest.rabbitMq.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class msgProduce { private static String EXCHAGE_NAME = "exchange_direct";//設定一個交換機 private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個路由 private static String ROUTINGKEY_email = "routingkey_emai";//設定一個路由 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHAGE_NAME, "direct");//fanout屬於主題訂閱方式 設定路由無效 direct和路由配合使用 //分發訊息 for (int i = 0; i < 10; i++) { String message = "我是交換機的訊息 " + i; //basicPublish釋出的第一個引數屬於交換機 第二個引數屬於按照routingKey方式發放 分別向email和sms發放資訊 //一部分訊息作為email傳送 一部分訊息作為sms傳送 我們可以看看消費者的消費情況 if (i % 2 == 0) { channel.basicPublish(EXCHAGE_NAME, ROUTINGKEY_email, null, message.getBytes()); } else { channel.basicPublish(EXCHAGE_NAME, ROUTINGKEY_SMS, null, message.getBytes()); } System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } }
emailConsumer消費者: 注意觀察路由情況
package wxtest.rabbitMq.routing; import com.rabbitmq.client.*; import wxtest.rabbitMq.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class emailConsumer { private static String exchage_name = "exchange_direct";//設定一個交換機 private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個路由 private static String ROUTINGKEY_email = "routingkey_emai";//設定一個路由 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchage_name, "direct");//繫結交換機 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchage_name, ROUTINGKEY_SMS);//繫結交換機所對應的佇列 設定路由 channel.queueBind(queueName, exchage_name, ROUTINGKEY_email);//繫結交換機所對應的佇列 設定路由 System.out.println(" emailConsumer Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println(" emailConsumer [x] Received '" + msg + "'"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, true, consumer); } }
smsConsumer消費者: 注意觀察路由情況
package wxtest.rabbitMq.routing; import com.rabbitmq.client.*; import wxtest.rabbitMq.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; public class smsComsumer { private static String exchage_name = "exchange_direct";//設定一個交換機 private static String ROUTINGKEY_SMS = "routingkey_sms";//設定一個交換機 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchage_name, "direct");//繫結交換機 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchage_name, ROUTINGKEY_SMS);//繫結交換機所對應的佇列 只接受sms資訊 System.out.println(" smsComsumer Waiting for messages. To exit press CTRL+C"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println(" smsComsumer [x] Received '" + msg + "'"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queueName, true, consumer); } }
輸出結果如下:
上述程式碼將交換機 路由的設定都已經在程式碼中得以體現 ,仔細敲一遍 感受更深刻哦!