RabbitMQ workQueues for Java【入門教程 2】
入門教程1 我們學到了 P——>佇列——>C 這種單一的模式。一個生產者對應一個消費者。那麼在實際中可能存在一個生產者對應多個消費者,如在車間裡面的生產線,一個流水線生產的部件可能供應對應多個工人小費。那麼就引入了今天所討論的知識。
工作佇列
我們通過Hello World的例子,從生產者傳送一條訊息到RabbitMQ,然後消費者接收到這條訊息並打印出來。這次我們模擬一個工廠流水線的場景,由工廠任務安排者(生產者P)向流水線(RabbitMQ的佇列hello)放入半成品,然後由多個工人(消費者C1和C2)從流水線獲取半成品進行處理。
原始碼
目錄結構:
ConnectionUtil是連線rabbitMq的工具類,當前是一個生產者 兩個消費者甚至多個消費者。
生產者程式碼如下:
package wxtest.rabbitMq.workQueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import wxtest.rabbitMq.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生產者 */ public class produce { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { // 建立一個通道 Channel channel = ConnectionUtil.getConnection().createChannel(); // 指定一個佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 傳送訊息 String message = null; //我們來發送五個訊息到訊息佇列中 for (int i = 0; i < 5; i++) { message = "Hello World! " + i; channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 關閉頻道和連線 channel.close(); ConnectionUtil.getConnection().close(); } }
消費者1:
package wxtest.rabbitMq.workQueue; import com.rabbitmq.client.*; import wxtest.rabbitMq.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消費者 */ public class consumer1 { //與生產者的佇列名相同 public static String queue_name = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queue_name, false, false, false, null); System.out.println("consumer1 [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "Utf-8"); System.out.println(" consumer1 [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println("consumer1 [x] Done"); // 訊息處理完成確認 channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(queue_name, false, consumer); } private static void doWork(String message) { try { Thread.sleep(1000); // 暫停1秒鐘 } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
消費者2:
package wxtest.rabbitMq.workQueue;
import com.rabbitmq.client.*;
import wxtest.rabbitMq.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消費者
*/
public class consumer2 {
//與生產者的佇列名相同
public static String queue_name = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queue_name, false, false, false, null);
System.out.println("consumer2 [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "Utf-8");
System.out.println(" [consumer2] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("consumer2 [x] Done");
// 訊息處理完成確認
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(queue_name, false, consumer);
}
private static void doWork(String message) {
try {
Thread.sleep(1000); // 暫停1秒鐘
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
輸出結果 消費者1為
consumer1 [*] Waiting for messages. To exit press CTRL+C consumer1 [x] Received 'Hello World! 1' consumer1 [x] Done consumer1 [x] Received 'Hello World! 3' consumer1 [x] Done 輸出結果 消費者2為
consumer2 [*] Waiting for messages. To exit press CTRL+C [consumer2] Received 'Hello World! 0' consumer2 [x] Done [consumer2] Received 'Hello World! 2' consumer2 [x] Done [consumer2] Received 'Hello World! 4' consumer2 [x] Done
忘記確認 忘記通過basicAck返回確認資訊是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉後,訊息會被退回RabbitMQ伺服器,這會使RabbitMQ伺服器記憶體爆滿,而且RabbitMQ也不會主動刪除這些被退回的訊息。 如果要監控這種錯誤,可以使用rabbitmqctl messages_unacknowledged命令打印出出相關的資訊。