1. 程式人生 > >RabbitMQ學習筆記五:RabbitMQ之優先級消息隊列

RabbitMQ學習筆記五:RabbitMQ之優先級消息隊列

-c virtual 調用 itl 3.5 rri color images 執行順序

RabbitMQ優先級隊列註意點:

1、只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效

2、RabbitMQ3.5以後才支持優先級隊列

代碼在博客:RabbitMQ學習筆記三:Java實現RabbitMQ之與Spring集成 最後面有下載地址,只是做了少許改變,改變的代碼如下:

消費者 spring-config.xml(還需要增加一個QueueListener監聽器,代碼就不復制到這裏了,可以參考項目中的其他監聽器)

<!-- ========================================RabbitMQ========================================= -->
    <!-- 連接工廠 -->
    <rabbit:connection-factory id="
connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" /> <!-- 監聽器 --> <rabbit:listener-container connection-factory="connectionFactory"> <!-- queues是隊列名稱,可填多個,用逗號隔開, method是ref指定的Bean調用Invoke方法執行的方法名稱 --> <rabbit:listener queues="
red" method="onMessage" ref="redQueueListener" /> <rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" /> <rabbit:listener queues="queue" method="queueList" ref="queueListener" /> </rabbit:listener-container> <!-- 隊列聲明 --> <rabbit:queue name="
red" durable="true" /> <rabbit:queue name="blue" durable="true" /> <rabbit:queue name="queue" durable="true" /> <!-- 紅色監聽處理器 --> <bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" /> <!-- 顏色監聽處理器 --> <bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" /> <!-- 優先級隊列監聽處理器 --> <bean id="queueListener" class="com.aitongyi.customer.QueueListener" />

生產者增加一個主方法:

public static void main(String[] args)
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必須要設置

        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        for (final int i : priority)
        {
            template.convertAndSend("queue", (Object) ("queue" + i), new MessagePostProcessor() {

                @Override
                public Message postProcessMessage(Message arg0) throws AmqpException
                {
                    arg0.getMessageProperties().setPriority(i);
                    return arg0;
                }
            });
        }
    }

當然,還需要在客戶端創建一個優先級隊列:

技術分享

註意,x-max-length = 9999,這個值最後不要寫太大了,否則你電腦的內存會一直處於100%的使用狀態(至於出現這種狀況怎麽解決,請點擊:RabbitMQ學習筆記四:RabbitMQ命令(附疑難問題解決)),並且這個取值範圍在0~255之間,超過了可能會出現問題,我測試了設置9999時,部分有問題,後面會把有問題的地方貼上來。

好了,所有的事情準備完畢了,我們先啟動消費者,然後再運行主方法,此時的隊列優先級設置為,private static final int[] priority = { 1, 5, 1, 2, 3, 4, 5, 5, 0, 3, 6, 10, 4, 100, 99, 98 };執行結果如下:

2017-05-16 09:51:48 399 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 417 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 435 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 493 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:51:48 514 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:48 596 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:48 950 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 975 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:49 015 [INFO] c.a.c.QueueListener - queueList Receved:queue0
2017-05-16 09:51:49 039 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:49 058 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:51:49 084 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:51:49 102 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:49 140 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:51:49 561 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:51:49 595 [INFO] c.a.c.QueueListener - queueList Receved:queue98

很奇怪,沒有按照優先級執行?文章前面已經提到,只有當消費者不足,不能及時進行消費的情況下,優先級隊列才會生效。所以,我們需要先將所有的消息隊列發送至服務器,然後再啟動消費者處理消息,這樣,就可以看到效果了。為了達到這個目的,則需要先運行主方法,然後再啟動消費者,隊列優先級設置和上方一樣,執行結果如下:

2017-05-16 09:56:23 296 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:56:23 359 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:56:23 438 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:56:23 500 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:56:23 594 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:56:23 656 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 765 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 874 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:24 077 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 202 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 262 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 311 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 389 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:56:24 422 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 500 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 547 [INFO] c.a.c.QueueListener - queueList Receved:queue0

這樣,優先級隊列就實現了,至於相同優先級的隊列,執行順序應該是隨機的吧,我也沒有測試,有興趣的同學可以自己研究。

附:隊列優先級設置為:private static final int[] priority = { 1, 5, 1, 2, 9998, 3, 4, 5, 999, 5, 0, 3, 6, 10, 4, 1000, 9999, 100, 99, 98, 899 };運行結果為:

2017-05-16 09:59:29 839 [INFO] c.a.c.QueueListener - queueList Receved:queue1000
2017-05-16 09:59:29 917 [INFO] c.a.c.QueueListener - queueList Receved:queue999
2017-05-16 09:59:29 995 [INFO] c.a.c.QueueListener - queueList Receved:queue899
2017-05-16 09:59:30 073 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:59:30 182 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:59:30 275 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:59:30 353 [INFO] c.a.c.QueueListener - queueList Receved:queue9999
2017-05-16 09:59:30 431 [INFO] c.a.c.QueueListener - queueList Receved:queue9998
2017-05-16 09:59:30 509 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:59:30 619 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:59:30 670 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 733 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 826 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:31 138 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 282 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 316 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 379 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 410 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:59:31 472 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 712 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 745 [INFO] c.a.c.QueueListener - queueList Receved:queue0

恩,排序結果是有點亂。。。

RabbitMQ學習筆記五:RabbitMQ之優先級消息隊列