官網英文版學習——RabbitMQ學習筆記(四)Work queues
工作隊列:把每個任務只發送給一個工作者。
上一篇我們是從一個指定的隊列發送接收消息,在本文中,我們將創建一個工作隊列,用於在多個工作者之間分配耗時的任務。
工作隊列(即任務隊列)背後的主要思想是避免立即執行資源密集型的任務,並且必須等待任務完成。相反,我們把任務安排在以後做。我們將任務封裝為消息並將其發送到隊列。在後臺運行的worker進程將彈出任務並最終執行任務。當您運行多個worker時,這些任務將在它們之間共享。
因為工作隊列,是有多個工人從隊列裏面取得任務,我們就需要考慮較多的問題,比如說,消息怎麽分發,消息沒有傳遞到位如何,消息傳遞過程中,連接斷開如何,消費會不會重復發給兩個工人處理等等。下面我們先學習一些跟工作隊列相關的概念:
循環調度
使用任務隊列的優點之一是能夠輕松地並行工作。如果我們正在建立一個積壓的工作,我們可以增加更多的工人,這樣,規模就很容易。
RabbitMQ將按順序將每個消息發送給下一個使用者。平均來說,每個消費者都會收到相同數量的信息。這種分發消息的方式稱為循環。
消息確認
完成一項任務可能需要幾秒鐘。你可能會想,如果其中一個消費者開始了一項很長的任務,並且只完成了部分任務,會發生什麽。使用我們當前的代碼,一旦RabbitMQ向客戶傳遞消息,它立即標記為刪除。在這種情況下,如果你殺了一個工人,我們將失去它正在處理的信息。我們還將丟失已經發送給此特定工作人員但尚未處理的所有消息。
但我們不想失去任何任務。如果一個工作者死了,我們希望把任務交給另一個工作者。
為了確保消息不會丟失,RabbitMQ支持消息確認。使用者將一個ack(nowledgement)發回給RabbitMQ,告訴它已經接收、處理了一個特定的消息,並且RabbitMQ可以自由地刪除它。
如果使用者在沒有發送ack的情況下死亡(通道關閉、連接關閉或TCP連接丟失),RabbitMQ將理解消息沒有被完整地處理,並將它重新排隊。如果同時有其他消費者在線,它將很快重新交付給另一個消費者。這樣你就可以確保沒有信息丟失,即使工人偶爾也會死亡。
消息的持久性
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣做。確保消息不丟失需要兩件事情:我們需要將隊列和消息標記為持久的。
首先,我們需要確保RabbitMQ永遠不會丟失隊列。為了做到這一點,我們需要聲明它是持久的.
消息的公平分發機制
為了避免當消息分發後,有的工人非常忙,而有的很閑的問題,我們可以使用basicQos方法,並將prefetchCount = 1設置為。這告訴RabbitMQ不要一次向工作人員發送多個消息。或者,換句話說,在處理並確認之前的消息之前,不要向工作人員發送新的消息。相反,它將把它分派給下一個不太忙的員工。
上代碼:
生產者:
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class WorkQueueProduct { private final static String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 設置隊列為可持久性的(生產者和消費者都需要設置)註意:RabbitMQ不允許您重新定義具有不同參數的現有隊列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String[] str = {"ans1","b","c","d","e","f","g","h","1","2","3","4","5"}; String message = getMessage(str); // 將消息設置為持久性,設置消息的其他屬性為MessageProperties.PERSISTENT_TEXT_PLAIN,即可 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8")); System.out.println("[x]send:‘"+message+"‘"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消費者:兩個消費者代碼相同
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class WorkQueueConsumer1 { private static final String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connction = factory.newConnection(); Channel channel = connction.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 設置公平分發機制參數,設置為1後每次只發送一個消息,並且在沒有發送確認消息之前不會再次發送消息 channel.basicQos(1); // 內置回調函數,處理消息 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body, "utf-8"); System.out.println(" [x] Received ‘" + message + "‘"); try { dowork(message); } catch (Exception e) { // TODO: handle exception }finally{ System.out.println(" [x] Done"); // 任務處理完之後的消息確認 channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false,consumer); } private static void dowork(String task) { // TODO Auto-generated method stub for(char c :task.toCharArray()){ if(c == ‘.‘){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block Thread.currentThread().interrupt(); } } } } }
官網英文版學習——RabbitMQ學習筆記(四)Work queues