1. 程式人生 > >RabbitMQ(五)—路由選擇

RabbitMQ(五)—路由選擇

前篇博文中,我們建立了一個簡單的日誌系統。可以廣播訊息給多個消費者。本篇博文,我們將新增新的特性——我們可以只訂閱部分訊息。比如:我們可以接收Error級別的訊息寫入檔案。同時仍然可以在控制檯列印所有日誌。

Bindings(繫結)

上一篇部落格中我們已經使用過繫結。類似下面的程式碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");  

繫結表示轉換器與佇列之間的關係。可以簡單的人為:佇列對該轉發器上的訊息感興趣。

繫結可以設定額外的routingKey引數。為了與避免basicPublish方法(釋出訊息的方法)的引數混淆,我們準備

把它稱作繫結鍵(binding key)。下面展示如何使用繫結鍵(binding key)來建立一個繫結:

channel.queueBind(queueName, EXCHANGE_NAME, "black");  

繫結鍵關鍵取決於轉換器的型別。對於fanout型別,忽略此引數。

Direct exchange(直接轉發)

前面講到我們的日誌系統廣播訊息給所有的消費者。我們想對其擴充套件,根據訊息的嚴重性來過濾訊息。例如:我們希望將致命錯誤的日誌訊息記錄到檔案,而不是把磁碟空間浪費在warn和info型別的日誌上。我們使用的fanout轉發器,不能給我們太多的靈活性。它僅僅只是盲目的廣播而已。我們使用direct轉發器進行代替,其背後的演算法很簡單——訊息會被推送至繫結鍵(binding key)和訊息釋出附帶的選擇鍵(routing key)完全匹配的佇列。


這裡寫圖片描述

在上圖中,我們可以看到direct型別的轉發器與2個佇列進行了繫結。第一個佇列使用的繫結鍵是orange,第二個佇列繫結鍵為black和green。這樣當訊息釋出到轉發器是,附帶orange繫結鍵的訊息將被路由到佇列Q1中去。附帶black和green繫結鍵的訊息被路由到Q2中去。其他訊息全部丟棄。

Multiple bindings(多重繫結)


這裡寫圖片描述

使用一個繫結鍵繫結多個佇列是完全合法的。如上圖,繫結鍵black綁定了2個佇列——Q1和Q2。

Emitting logs(傳送日誌)

我們將這種模式用於日誌系統,傳送訊息給direct型別的轉發器。我們將 提供日誌嚴重性做為繫結鍵。那樣,接收程式可以選擇性的接收嚴重性的訊息。首先關注傳送日誌的程式碼:

像往常一樣首先建立一個轉換器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

然後為傳送訊息做準備:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());  

為了簡化程式碼,我們假定日誌的嚴重性是‘info’,‘warning’,‘error’中之一。

Subscribing(訂閱)

接收訊息跟前面博文中的一樣。我們僅需要修改一個地方:為每一個我們感興趣的嚴重性的訊息,建立一個新的繫結。

String queueName = channel.queueDeclare().getQueue();  

for(String severity : argv){      
  channel.queueBind(queueName, EXCHANGE_NAME, severity);  
}  

完整的例子

這裡寫圖片描述

傳送端程式碼(EmitLogDirect.java)

public class EmitLogDirect {  
    private final static String EXCHANGE_NAME = "direct_logs";  

    public static void main(String[] args) throws IOException {  
        /** 
         * 建立連線連線到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        // 設定MabbitMQ所在主機ip或者主機名  
        factory.setHost("127.0.0.1");  
        // 建立一個連線  
        Connection connection = factory.newConnection();  
        // 建立一個頻道  
        Channel channel = connection.createChannel();  
        // 指定轉發——廣播  
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  

        //所有日誌嚴重性級別  
        String[] severities={"error","info","warning"};  
        for(int i=0;i<3;i++){  
            String severity = severities[i%3];//每一次傳送一條不同嚴重性的日誌  

            // 傳送的訊息  
            String message = "Hello World"+Strings.repeat(".", i+1);  
            //引數1:exchange name  
            //引數2:routing key  
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());  
            System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");  
        }  
        // 關閉頻道和連線  
        channel.close();  
        connection.close();  
    }  
}

消費者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {  
    private static final String EXCHANGE_NAME = "direct_logs";  

    public static void main(String[] argv) throws IOException, InterruptedException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("127.0.0.1");  
        // 開啟連線和建立頻道,與傳送端一樣  
        Connection connection = factory.newConnection();  
        final Channel channel = connection.createChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
        // 宣告一個隨機佇列  
        String queueName = channel.queueDeclare().getQueue();  

        //所有日誌嚴重性級別  
        String[] severities={"error","info","warning"};  
        for (String severity : severities) {  
            //關注所有級別的日誌(多重繫結)  
            channel.queueBind(queueName, EXCHANGE_NAME, severity);  
        }  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  

        // 建立佇列消費者  
        final Consumer consumer = new DefaultConsumer(channel) {  
              @Override  
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                String message = new String(body, "UTF-8");  
                System.out.println(" [x] Received '"  + envelope.getRoutingKey() + "':'" + message + "'");  
              }  
            };  
            channel.basicConsume(queueName, true, consumer);  
    }  
} 

消費者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {  
    private static final String EXCHANGE_NAME = "direct_logs";  

    public static void main(String[] argv) throws IOException, InterruptedException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("127.0.0.1");  
        // 開啟連線和建立頻道,與傳送端一樣  
        Connection connection = factory.newConnection();  
        final Channel channel = connection.createChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");  
        // 宣告一個隨機佇列  
        String queueName = channel.queueDeclare().getQueue();  

        String severity="error";//只關注error級別的日誌,然後記錄到檔案中去。  
        channel.queueBind(queueName, EXCHANGE_NAME, severity);  

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  

        // 建立佇列消費者  
        final Consumer consumer = new DefaultConsumer(channel) {  
              @Override  
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                String message = new String(body, "UTF-8");  
                //記錄日誌到檔案:  
                print2File( "["+ envelope.getRoutingKey() + "] "+message);  
              }  
            };  
            channel.basicConsume(queueName, true, consumer);  
    }  

    private static void print2File(String msg) {  
        try {  
            String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();  
            String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());  
            File file = new File(dir, logFileName + ".log");  
            FileOutputStream fos = new FileOutputStream(file, true);  
            fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());  
            fos.flush();  
            fos.close();  
        } catch (FileNotFoundException e) {  
            e.printStackTrace();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }    
} 

最終結果:

這裡寫圖片描述

羅哩羅嗦的說這麼多,其實就是說了這麼一件事:我們可以使用Direct exchange+routingKey來過濾自己感興趣的訊息。一個佇列可以繫結多個routingKey。這就是我們今天的主題——路由選擇。