1. 程式人生 > >RabbitMQ教程 (二):Work Queues

RabbitMQ教程 (二):Work Queues

Work Queues 工作佇列 (using the Java Client)

Prerequisites 須知

  This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
  本教程假定RabbitMQ 已在標準埠(5672)上的localhost上安裝並執行。如果您使用不同的主機,埠或憑據,則需要調整連線設定。

Where to get help

  If you’re having trouble going through this tutorial you can contact us through the mailing list.

  In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we’ll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
  在第一篇教程中,我們編寫了程式來發送和接收來自命名佇列的訊息。在這個中,我們將建立一個工作佇列,用於在多個工作人員之間分配耗時的任務。

  The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
  工作佇列(又稱:任務佇列)背後的主要思想是避免立即執行資源密集型任務

,並且必須等待它完成。相反,我們安排任務稍後完成。我們將任務封裝 為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當您執行許多工作程式時,它們之間將共享任務。

  This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window.
  這個概念在Web應用程式中特別有用,因為在短的HTTP請求視窗中無法處理複雜的任務。

Preparation 準備

  In the previous part of this tutorial we sent a message containing “Hello World!”. Now we’ll be sending strings that stand for complex tasks. We don’t have a real-world task, like images to be resized or pdf files to be rendered, so let’s fake it by just pretending we’re busy - by using the Thread.sleep() function. We’ll take the number of dots in the string as its complexity; every dot will account for one second of “work”. For example, a fake task described by Hello… will take three seconds.
  在本教程的前一部分中,我們傳送了一條包含“Hello World!”的訊息。現在我們將傳送代表複雜任務的字串。我們沒有現實世界的任務,比如要調整大小的影象或要渲染的pdf檔案,所以讓我們假裝我們很忙 - 通過使用Thread.sleep()函式來偽造它。我們將字串中的點數作為其複雜性; 每個點都會佔據“工作”的一秒鐘。例如,Hello …描述的假任務 將花費三秒鐘。

  We will slightly modify the Send.java code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let’s name it NewTask.java:
  我們將稍微修改前一個示例中的Send.java程式碼,以允許從命令列傳送任意訊息。該程式將任務安排到我們的工作佇列,所以我們將其命名為 NewTask.java:

  String message = getMessage(argv);

  channel.basicPublish("", "hello", null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");

Some help to get the message from the command line argument: 一些幫助從命令列引數獲取訊息:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

  Our old Recv.java program also requires some changes: it needs to fake a second of work for every dot in the message body. It will handle delivered messages and perform the task, so let’s call it Worker.java:
  我們舊的Recv.java程式還需要進行一些更改:它需要為訊息體中的每個點偽造一秒鐘的工作。它將處理傳遞的訊息並執行任務,所以我們稱之為Worker.java

final Consumer consumer = new DefaultConsumer(channel) {

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
  boolean autoAck = true; // acknowledgment is covered below
  channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

Our fake task to simulate execution time:
我們的假任務是模擬執行時間:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

Compile them as in tutorial one (with the jar files in the working directory and the environment variable CP):
像在教程一中那樣編譯它們(使用工作目錄中的jar檔案和環境變數CP):

javac -cp $CP NewTask.java Worker.java

Round-robin dispatching 迴圈排程

  One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
  使用任務佇列的一個優點是能夠輕鬆地並行工作。如果我們正在積壓工作積壓,我們可以新增更多工人,這樣就可以輕鬆擴充套件。

  First, let’s try to run two worker instances at the same time. They will both get messages from the queue, but how exactly? Let’s see.
  首先,讓我們嘗試同時執行兩個worker例項。他們都會從佇列中獲取訊息,但究竟如何呢?讓我們來看看。
  You need three consoles open. Two will run the worker program. These consoles will be our two consumers - C1 and C2.
  你需要開啟三個控制檯。兩個將執行工作程式。這些遊戲機將成為我們的兩個消費者 - C1和C2

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

In the third one we’ll publish new tasks. Once you’ve started the consumers you can publish a few messages:
在第三個中,我們將釋出新任務。啟動消費者後,您可以釋出一些訊息:

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....

Let’s see what is delivered to our workers: 讓我們看看交給我們工人的是什麼:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

  By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
  預設情況下,RabbitMQ將按順序將每條訊息傳送給下一個消費者。平均而言,每個消費者將獲得相同數量的訊息。這種分發訊息的方式稱為迴圈法。與三個或更多工人一起嘗試。

Message acknowledgment 訊息確認

  Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
  執行任務可能需要幾秒鐘。您可能想知道如果其中一個消費者開始執行長任務並且僅在部分完成時死亡會發生什麼。使用我們當前的程式碼,一旦RabbitMQ向客戶傳送訊息,它立即將其標記為刪除。在這種情況下,如果你殺死一個工人,我們將丟失它剛剛處理的訊息。我們還將丟失分發給這個特定工作者但尚未處理的所有訊息。

  But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
  但我們不想失去任何任務。如果工人死亡,我們希望將任務交付給另一名工人。

  In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
  為了確保訊息永不丟失,RabbitMQ支援 訊息確認。消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定訊息,RabbitMQ可以自由刪除它。

  If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
  如果消費者死亡(其通道關閉,連線關閉或TCP連線丟失)而不傳送確認,RabbitMQ將理解訊息未完全處理並將重新排隊。如果其他消費者同時線上,則會迅速將其重新發送給其他消費者。這樣你就可以確保沒有訊息丟失,即使工人偶爾會死亡。

  There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.
  沒有任何訊息超時; 當消費者死亡時,RabbitMQ將重新發送訊息。即使處理訊息需要非常長的時間,也沒關係。

  Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It’s time to set this flag to false and send a proper acknowledgment from the worker, once we’re done with a task.
  預設情況下,手動訊息確認已開啟。在前面的示例中,我們通過autoAck = true 標誌明確地將它們關閉。一旦我們完成任務,就應該將此標誌設定為false並從工作人員傳送適當的確認。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)一次只接受一條未包含的訊息(見下文)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicPropertiesproperties, byte[] body) throws IOException {
  
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

  Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
  使用此程式碼,我們可以確定即使您在處理訊息時使用CTRL + C殺死一名工作人員,也不會丟失任何內容。工人死後不久,所有未經確認的訊息將被重新傳遞。
  Acknowledgement must be sent on the same channel the delivery it is for was received on. Attempts to acknowledge using a different channel will result in a channel-level protocol exception. See the doc guide on confirmations to learn more.
  確認必須在收到的交付的同一通道上傳送。嘗試使用不同的通道進行確認將導致通道級協議異常。有關確認文件指南,請參閱瞭解更多資訊。

Forgotten acknowledgment 被遺忘的確認

  It’s a common mistake to miss the basicAck. It’s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages.
  錯過basicAck是一個常見的錯誤。這是一個簡單的錯誤,但後果是嚴重的。當您的客戶端退出時,訊息將被重新傳遞(這可能看起來像隨機重新傳遞),但RabbitMQ將會佔用越來越多的記憶體,因為它無法釋放任何未經處理的訊息

  In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:
  為了除錯這種錯誤,您可以使用rabbitmqctl 來列印messages_unacknowledged欄位:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo: 在Windows上,刪除sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message durability 訊息永續性

  We have learned how to make sure that even if the consumer dies, the task isn’t lost. But our tasks will still be lost if RabbitMQ server stops.
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ伺服器停止,我們的任務仍然會丟失。

  When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable.
當RabbitMQ退出或崩潰時,它將忘記佇列和訊息,除非你告訴它不要。確保訊息不會丟失需要做兩件事:我們需要將佇列和訊息都標記為持久。

  First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
首先,我們需要確保RabbitMQ永遠不會丟失我們的佇列。為此,我們需要宣告它是持久的:

  boolean durable = true;
  channel.queueDeclare("hello", durable, false, false, null);

  Although this command is correct by itself, it won’t work in our present setup. That’s because we’ve already defined a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let’s declare a queue with different name, for example task_queue:
  雖然此命令本身是正確的,但它在我們當前的設定中不起作用。那是因為我們已經定義了一個名為hello的佇列 ,這個佇列不可持久化。RabbitMQ不允許您使用不同的引數重新定義現有佇列,並將向嘗試執行此操作的任何程式返回錯誤。但是有一個快速的解決方法 - 讓我們宣告一個具有不同名稱的佇列,例如task_queue:

  boolean durable = true;
  channel.queueDeclare("task_queue", durable, false, false, null);

  This queueDeclare change needs to be applied to both the producer and consumer code.
  此queueDeclare更改需要應用於生產者和消費者程式碼。

  At this point we’re sure that the task_queue queue won’t be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting MessageProperties (which implements BasicProperties) to the value PERSISTENT_TEXT_PLAIN.
  此時我們確信即使RabbitMQ重新啟動,task_queue佇列也不會丟失。現在我們需要將訊息標記為永續性 - 通過將MessageProperties(實現BasicProperties)設定為值PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

Note on message persistence 有關訊息永續性的註釋

  Marking messages as persistent doesn’t fully guarantee that a message won’t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message – it may be just saved to cache and not really written to the disk. The persistence guarantees aren’t strong, but it’s more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
  將訊息標記為永續性並不能完全保證訊息不會丟失。雖然它告訴RabbitMQ將訊息儲存到磁碟,但是當RabbitMQ接受訊息並且尚未儲存訊息時,仍然有一個短時間視窗。此外,RabbitMQ不會為每條訊息執行fsync(2) - 它可能只是儲存到快取而不是真正寫入磁碟。永續性保證不強,但對於我們簡單的任務佇列來說已經足夠了。如果您需要更強的保證,那麼您可以使用 釋出者確認

Fair dispatch 公平派遣

  You might have noticed that the dispatching still doesn’t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn’t know anything about that and will still dispatch messages evenly.
  您可能已經注意到排程仍然無法完全按照我們的意願執行。例如,在有兩個工人的情況下,當所有奇怪的訊息都很重,甚至訊息很輕時,一個工人將經常忙碌而另一個工作人員幾乎不會做任何工作。好吧,RabbitMQ對此一無所知,仍然會均勻地傳送訊息。

  This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
  發生這種情況是因為RabbitMQ只是在訊息進入佇列時排程訊息。它不會檢視消費者未確認訊息的數量。它只是盲目地向第n個消費者傳送每個第n個訊息。
在這裡插入圖片描述

  In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
  為了克服這個缺點,我們可以使用prefetchCount = 1的basicQos方法。這告訴RabbitMQ不要一次向一個worker傳送一條訊息。或者,換句話說,在處理並確認前一個訊息之前,不要向工作人員傳送新訊息。相反,它會將它傳送給下一個仍然不忙的工人。

  int prefetchCount = 1;
  channel.basicQos(prefetchCount);

Note about queue size 關於佇列大小的說明

  If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
  如果所有工作人員都很忙,您的佇列就會填滿。您將需要密切關注這一點,並可能新增更多工作人員,或者採取其他策略。

Putting it all together

Final code of our NewTask.java class:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
  //...
}

(NewTask.java source)

And our Worker.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java source)

  Using message acknowledgments and prefetchCount you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.
  使用訊息確認和prefetchCount,您可以設定工作佇列。即使RabbitMQ重新啟動,永續性選項也可以使任務生效。

  For more information on Channel methods and MessageProperties, you can browse the JavaDocs online.
  有關Channel方法和MessageProperties的更多資訊,您可以線上瀏覽 JavaDocs

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.

Production [Non-]Suitability Disclaimer

  Please keep in mind that this and other tutorials are, well, tutorials. They demonstrate one new concept at a time and may intentionally oversimplify some things and leave out others. For example topics such as connection management, error handling, connection recovery, concurrency and metric collection are largely omitted for the sake of brevity. Such simplified code should not be considered production ready.

  Please take a look at the rest of the documentation before going live with your app. We particularly recommend the following guides: Publisher Confirms and Consumer Acknowledgements, Production Checklist and Monitoring.

Getting Help and Providing Feedback

  If you have questions about the contents of this tutorial or any other topic related to RabbitMQ, don’t hesitate to ask them on the RabbitMQ mailing list.

Help Us Improve the Docs ❤️

  If you’d like to contribute an improvement to the site, its source is available on GitHub. Simply fork the repository and submit a pull request. Thank you!