1. 程式人生 > >輕松搞定RabbitMQ(五)——路由選擇

輕松搞定RabbitMQ(五)——路由選擇

byte[] view 轉發器 ews 磁盤空間 表示 info 直接 net

轉自 http://blog.csdn.net/xiaoxian8023/article/details/48733249

翻譯地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

在前篇博文中,我們建立了一個簡單的日誌系統。可以廣播消息給多個消費者。本篇博文,我們將添加新的特性——我們可以只訂閱部分消息。比如:我們可以接收Error級別的消息寫入文件。同時仍然可以在控制臺打印所有日誌。


Bindings(綁定)

在上一篇博客中我們已經使用過綁定。類似下面的代碼:

[java] view plain copy
  1. channel.queueBind(queueName, EXCHANGE_NAME, "");

綁定表示轉換器與隊列之間的關系。可以簡單的人為:隊列對該轉發器上的消息感興趣。

綁定可以設定額外的routingKey參數。為了與避免basicPublish方法(發布消息的方法)的參數混淆,我們準備把它稱作綁定鍵(binding key)。下面展示如何使用綁定鍵(binding key)來創建一個綁定:

[java] view plain copy
  1. channel.queueBind(queueName, EXCHANGE_NAME, "black");

綁定鍵關鍵取決於轉換器的類型。對於fanout類型,忽略此參數。


Direct exchange(直接轉發)

前面講到我們的日誌系統廣播消息給所有的消費者。我們想對其擴展,根據消息的嚴重性來過濾消息。例如:我們希望將致命錯誤的日誌消息記錄到文件,而不是把磁盤空間浪費在warn和info類型的日誌上。我們使用的fanout轉發器,不能給我們太多的靈活性。它僅僅只是盲目的廣播而已。我們使用direct轉發器進行代替,其背後的算法很簡單——消息會被推送至綁定鍵(binding key)和消息發布附帶的選擇鍵(routing key)完全匹配的隊列。

技術分享

在上圖中,我們可以看到direct類型的轉發器與2個隊列進行了綁定。第一個隊列使用的綁定鍵是orange,第二個隊列綁定鍵為black和green。這樣當消息發布到轉發器是,附帶orange綁定鍵的消息將被路由到隊列Q1中去。附帶black和green綁定鍵的消息被路由到Q2中去。其他消息全部丟棄。


Multiple bindings(多重綁定)

技術分享

使用一個綁定鍵綁定多個隊列是完全合法的。如上圖,綁定鍵black綁定了2個隊列——Q1和Q2。


Emitting logs(發送日誌)

我們將這種模式用於日誌系統,發送消息給direct類型的轉發器。我們將 提供日誌嚴重性做為綁定鍵。那樣,接收程序可以選擇性的接收嚴重性的消息。首先關註發送日誌的代碼:

像往常一樣首先創建一個轉換器:

[java] view plain copy
  1. channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然後為發送消息做準備:

[java] view plain copy
  1. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

為了簡化代碼,我們假定日誌的嚴重性是‘info’,‘warning’,‘error’中之一。


Subscribing(訂閱)

接收消息跟前面博文中的一樣。我們僅需要修改一個地方:為每一個我們感興趣的嚴重性的消息,創建一個新的綁定。

[java] view plain copy
  1. String queueName = channel.queueDeclare().getQueue();
  2. for(String severity : argv){
  3. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  4. }

完整的例子

技術分享

發送端代碼(EmitLogDirect.java)

[java] view plain copy
  1. public class EmitLogDirect {
  2. private final static String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] args) throws IOException {
  4. /**
  5. * 創建連接連接到MabbitMQ
  6. */
  7. ConnectionFactory factory = new ConnectionFactory();
  8. // 設置MabbitMQ所在主機ip或者主機名
  9. factory.setHost("127.0.0.1");
  10. // 創建一個連接
  11. Connection connection = factory.newConnection();
  12. // 創建一個頻道
  13. Channel channel = connection.createChannel();
  14. // 指定轉發——廣播
  15. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  16. //所有日誌嚴重性級別
  17. String[] severities={"error","info","warning"};
  18. for(int i=0;i<3;i++){
  19. String severity = severities[i%3];//每一次發送一條不同嚴重性的日誌
  20. // 發送的消息
  21. String message = "Hello World"+Strings.repeat(".", i+1);
  22. //參數1:exchange name
  23. //參數2:routing key
  24. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
  25. System.out.println(" [x] Sent ‘" + severity +"‘:‘"+ message + "‘");
  26. }
  27. // 關閉頻道和連接
  28. channel.close();
  29. connection.close();
  30. }
  31. }

消費者1(ReceiveLogs2Console.java)

[java] view plain copy
  1. public class ReceiveLogs2Console {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] argv) throws IOException, InterruptedException {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("127.0.0.1");
  6. // 打開連接和創建頻道,與發送端一樣
  7. Connection connection = factory.newConnection();
  8. final Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10. // 聲明一個隨機隊列
  11. String queueName = channel.queueDeclare().getQueue();
  12. //所有日誌嚴重性級別
  13. String[] severities={"error","info","warning"};
  14. for (String severity : severities) {
  15. //關註所有級別的日誌(多重綁定)
  16. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  17. }
  18. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  19. // 創建隊列消費者
  20. final Consumer consumer = new DefaultConsumer(channel) {
  21. @Override
  22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  23. String message = new String(body, "UTF-8");
  24. System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘");
  25. }
  26. };
  27. channel.basicConsume(queueName, true, consumer);
  28. }
  29. }

消費者2(ReceiveLogs2File.java)

[java] view plain copy
  1. public class ReceiveLogs2File {
  2. private static final String EXCHANGE_NAME = "direct_logs";
  3. public static void main(String[] argv) throws IOException, InterruptedException {
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("127.0.0.1");
  6. // 打開連接和創建頻道,與發送端一樣
  7. Connection connection = factory.newConnection();
  8. final Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  10. // 聲明一個隨機隊列
  11. String queueName = channel.queueDeclare().getQueue();
  12. String severity="error";//只關註error級別的日誌,然後記錄到文件中去。
  13. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  14. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  15. // 創建隊列消費者
  16. final Consumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. String message = new String(body, "UTF-8");
  20. //記錄日誌到文件:
  21. print2File( "["+ envelope.getRoutingKey() + "] "+message);
  22. }
  23. };
  24. channel.basicConsume(queueName, true, consumer);
  25. }
  26. private static void print2File(String msg) {
  27. try {
  28. String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
  29. String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
  30. File file = new File(dir, logFileName + ".log");
  31. FileOutputStream fos = new FileOutputStream(file, true);
  32. fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
  33. fos.flush();
  34. fos.close();
  35. } catch (FileNotFoundException e) {
  36. e.printStackTrace();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

最終結果:

技術分享
羅哩羅嗦的說這麽多,其實就是說了這麽一件事:我們可以使用Direct exchange+routingKey來過濾自己感興趣的消息。一個隊列可以綁定多個routingKey。這就是我們今天的主題——路由選擇。

輕松搞定RabbitMQ(五)——路由選擇