1. 程式人生 > >RabbitMQ學習第四記:路由模式(direct)

RabbitMQ學習第四記:路由模式(direct)

exceptio 指定 手動 ack key static ech 保存日誌 sta

1、什麽是路由模式(direct)

  路由模式是在使用交換機的同時,生產者指定路由發送數據,消費者綁定路由接受數據。與發布/訂閱模式不同的是,發布/訂閱模式只要是綁定了交換機的隊列都會收到生產者向交換機推送過來的數據。而路由模式下加了一個路由設置,生產者向交換機發送數據時,會聲明發送給交換機下的那個路由,並且只有當消費者的隊列綁定了交換機並且聲明了路由,才會收到數據。下圖取自於官方網站(RabbitMQ)的路由模式的圖例

P:消息的生產者

X:交換機

紅色:隊列

C1,C2:消息消費者

error,info,warning:路由

  舉個日誌處理例子:系統需要針對日誌做分析,首先所有的日誌級別的日誌都需要保存,其次error日誌級別的日誌需要單獨做處理。這時就可以使用路由模式來處理了,聲明交換機使用路由模式,每個日誌級別的日誌對應一個路由(error,info,warning)。聲明一個保存日誌隊列用於接受所有日誌,綁定交換機並綁定所有路由。聲明第二個隊列用於處理error級別日誌,綁定交換機且只綁定error路由。以下是代碼講解。(先運行兩個消費者,在運行生產者。如果沒有提前將隊列綁定到交換機,那麽直接運行生產者的話,消息是不會發到任何隊列裏的)

2、生產者(Send)代碼

復制代碼
public class Send
{
//交換機名稱
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名稱warning
private final static String ROUTING_KEY_WARNING = "warning";

//路由名稱info
private final static String ROUTING_KEY_INFO    = "info";

//路由名稱error
private final static String ROUTING_KEY_ERROR   = "error";

public static void main(String[] args)
{
    try
    {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //從連接中獲取一個通道
        Channel channel = connection.createChannel();
        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message = "this is warning log";
        //發送消息(warning級別日誌)
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_WARNING, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        //發送消息(info級別日誌)
        message = "this is info log";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_INFO, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        //發送消息(error級別日誌)
        message = "this is error log";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_ERROR, null, message.getBytes("utf-8"));
        System.out.println("[send]:" + message);
        channel.close();
        connection.close();
    }
    catch (IOException | TimeoutException e)
    {
        e.printStackTrace();
    }
}

}

運行結果:
[send]:this is warning log
[send]:this is info log
[send]:this is error log

復制代碼
3、消費者1(ReceiveAllLog)

復制代碼
public class ReceiveAllLog
{
//交換機名稱
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名稱warning
private final static String ROUTING_KEY_WARNING = "warning";

//路由名稱info
private final static String ROUTING_KEY_INFO    = "info";

//路由名稱error
private final static String ROUTING_KEY_ERROR   = "error";

//隊列名稱
private static final String QUEUE_NAME          = "test_queue_save_all_log";

public static void main(String[] args)
{
    try
    {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //從連接中獲取一個通道
        final Channel channel = connection.createChannel();
        //聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //將隊列綁定到交換機(指定路由info,error,warning)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_INFO);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_WARNING);
        //保證一次只分發一個  
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            //當消息到達時執行回調方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException
            {
                String message = new String(body, "utf-8");
                System.out.println("[test_queue_save_all_log] Receive message:" + message);
                try
                {
                    //消費者休息2s處理業務
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    //手動應答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //設置手動應答
        boolean autoAck = false;
        //監聽隊列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
}

}

運行結果:
[test_queue_save_all_log] Receive message:this is warning log
[test_queue_save_all_log] Receive message:this is info log
[test_queue_save_all_log] Receive message:this is error log

復制代碼
4、消費者2(ReceiveErrorLog)

復制代碼
public class ReceiveErrorLog
{
//交換機名稱
private final static String EXCHANGE_NAME = "test_exchange_direct";

//路由名稱error
private final static String ROUTING_KEY_ERROR = "error";

//隊列名稱
private static final String QUEUE_NAME        = "test_queue_handel_error";

public static void main(String[] args)
{
    try
    {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //從連接中獲取一個通道
        final Channel channel = connection.createChannel();
        //聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //將隊列綁定到交換機(指定路由error)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_ERROR);
        //保證一次只分發一個  
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            //當消息到達時執行回調方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException
            {
                String message = new String(body, "utf-8");
                System.out.println("[test_queue_handel_error] Receive message:" + message);
                try
                {
                    //消費者休息2s處理業務
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    //手動應答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //設置手動應答
        boolean autoAck = false;
        //監聽隊列
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
}

}

運行結果:
[test_queue_handel_error] Receive message:this is error log
復制代碼
總結:

  1.兩個隊列消費者設置的路由不一樣,接收到的消息就不一樣。路由模式下,決定消息向隊列推送的主要取決於路由,而不是交換機了。

  2.該模式必須設置交換機,且聲明路由模式:channel.exchangeDeclare(EXCHANGE_NAME, "direct");

RabbitMQ學習第四記:路由模式(direct)