1. 程式人生 > >官網英文版學習——RabbitMQ學習筆記(四)Work queues

官網英文版學習——RabbitMQ學習筆記(四)Work queues

In string bit 學會 on() true sleep 回調函數 user

工作隊列:把每個任務只發送給一個工作者。

上一篇我們是從一個指定的隊列發送接收消息,在本文中,我們將創建一個工作隊列,用於在多個工作者之間分配耗時的任務。

技術分享圖片

工作隊列(即任務隊列)背後的主要思想是避免立即執行資源密集型的任務,並且必須等待任務完成。相反,我們把任務安排在以後做。我們將任務封裝為消息並將其發送到隊列。在後臺運行的worker進程將彈出任務並最終執行任務。當您運行多個worker時,這些任務將在它們之間共享。

因為工作隊列,是有多個工人從隊列裏面取得任務,我們就需要考慮較多的問題,比如說,消息怎麽分發,消息沒有傳遞到位如何,消息傳遞過程中,連接斷開如何,消費會不會重復發給兩個工人處理等等。下面我們先學習一些跟工作隊列相關的概念:

循環調度

使用任務隊列的優點之一是能夠輕松地並行工作。如果我們正在建立一個積壓的工作,我們可以增加更多的工人,這樣,規模就很容易。

RabbitMQ將按順序將每個消息發送給下一個使用者。平均來說,每個消費者都會收到相同數量的信息。這種分發消息的方式稱為循環。

消息確認

完成一項任務可能需要幾秒鐘。你可能會想,如果其中一個消費者開始了一項很長的任務,並且只完成了部分任務,會發生什麽。使用我們當前的代碼,一旦RabbitMQ向客戶傳遞消息,它立即標記為刪除。在這種情況下,如果你殺了一個工人,我們將失去它正在處理的信息。我們還將丟失已經發送給此特定工作人員但尚未處理的所有消息。

但我們不想失去任何任務。如果一個工作者死了,我們希望把任務交給另一個工作者。

為了確保消息不會丟失,RabbitMQ支持消息確認。使用者將一個ack(nowledgement)發回給RabbitMQ,告訴它已經接收、處理了一個特定的消息,並且RabbitMQ可以自由地刪除它。

如果使用者在沒有發送ack的情況下死亡(通道關閉、連接關閉或TCP連接丟失),RabbitMQ將理解消息沒有被完整地處理,並將它重新排隊。如果同時有其他消費者在線,它將很快重新交付給另一個消費者。這樣你就可以確保沒有信息丟失,即使工人偶爾也會死亡。

消息的持久性

我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ服務器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告訴它不要這樣做。確保消息不丟失需要兩件事情:我們需要將隊列和消息標記為持久的。

首先,我們需要確保RabbitMQ永遠不會丟失隊列。為了做到這一點,我們需要聲明它是持久的.

消息的公平分發機制

為了避免當消息分發後,有的工人非常忙,而有的很閑的問題,我們可以使用basicQos方法,並將prefetchCount = 1設置為。這告訴RabbitMQ不要一次向工作人員發送多個消息。或者,換句話說,在處理並確認之前的消息之前,不要向工作人員發送新的消息。相反,它將把它分派給下一個不太忙的員工。

上代碼:

生產者:

package com.rabbitmq.HelloWorld;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class WorkQueueProduct {
	private final static String TASK_QUEUE_NAME = "task";

	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
//		設置隊列為可持久性的(生產者和消費者都需要設置)註意:RabbitMQ不允許您重新定義具有不同參數的現有隊列
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		String[] str = {"ans1","b","c","d","e","f","g","h","1","2","3","4","5"};
		String message = getMessage(str);
//		將消息設置為持久性,設置消息的其他屬性為MessageProperties.PERSISTENT_TEXT_PLAIN,即可	
		channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
		System.out.println("[x]send:‘"+message+"‘");
		channel.close();
		connection.close();
		
	}
	 private static String getMessage(String[] strings) {
		    if (strings.length < 1)
		      return "Hello World!";
		    return joinStrings(strings, " ");
		  }

	  private static String joinStrings(String[] strings, String delimiter) {
	    int length = strings.length;
	    if (length == 0) return "";
	    StringBuilder words = new StringBuilder(strings[0]);
	    for (int i = 1; i < length; i++) {
	      words.append(delimiter).append(strings[i]);
	    }
	    return words.toString();
	  }
}

消費者:兩個消費者代碼相同

package com.rabbitmq.HelloWorld;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class WorkQueueConsumer1 {
	private static final String TASK_QUEUE_NAME = "task";
	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		Connection connction = factory.newConnection();
		Channel channel = connction.createChannel();
		channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//		設置公平分發機制參數,設置為1後每次只發送一個消息,並且在沒有發送確認消息之前不會再次發送消息
		channel.basicQos(1);
//		內置回調函數,處理消息
		final Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				// TODO Auto-generated method stub
				String message = new String(body, "utf-8");
				System.out.println(" [x] Received ‘" + message + "‘");
				try {
					dowork(message);
				} catch (Exception e) {
					// TODO: handle exception
				}finally{
					System.out.println(" [x] Done");
//					任務處理完之後的消息確認
			        channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		channel.basicConsume(TASK_QUEUE_NAME, false,consumer);
	}
	private static void dowork(String task) {
		// TODO Auto-generated method stub
		for(char c :task.toCharArray()){
			if(c == ‘.‘){
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				Thread.currentThread().interrupt();
			}	
			}
		}
		
	}

}

官網英文版學習——RabbitMQ學習筆記(四)Work queues