1. 程式人生 > >RabbitMQ workQueues for Java【入門教程 2】

RabbitMQ workQueues for Java【入門教程 2】

 入門教程1 我們學到了 P——>佇列——>C 這種單一的模式。一個生產者對應一個消費者。那麼在實際中可能存在一個生產者對應多個消費者,如在車間裡面的生產線,一個流水線生產的部件可能供應對應多個工人小費。那麼就引入了今天所討論的知識。

工作佇列

工作佇列

我們通過Hello World的例子,從生產者傳送一條訊息到RabbitMQ,然後消費者接收到這條訊息並打印出來。這次我們模擬一個工廠流水線的場景,由工廠任務安排者(生產者P)向流水線(RabbitMQ的佇列hello)放入半成品,然後由多個工人(消費者C1和C2)從流水線獲取半成品進行處理。 

原始碼

目錄結構: 

 ConnectionUtil是連線rabbitMq的工具類,當前是一個生產者  兩個消費者甚至多個消費者。

生產者程式碼如下:

package wxtest.rabbitMq.workQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生產者
 */
public class produce {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 建立一個通道
        Channel channel = ConnectionUtil.getConnection().createChannel();
        // 指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 傳送訊息
        String message = null;
//我們來發送五個訊息到訊息佇列中
        for (int i = 0; i < 5; i++) {
            message = "Hello World! " + i;
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        // 關閉頻道和連線
        channel.close();
        ConnectionUtil.getConnection().close();
    }

}

消費者1:

package wxtest.rabbitMq.workQueue;

import com.rabbitmq.client.*;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消費者
 */

public class consumer1 {
    //與生產者的佇列名相同
    public static String queue_name = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue_name, false, false, false, null);
        System.out.println("consumer1 [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "Utf-8");
                System.out.println(" consumer1 [x] Received '" + message + "'");

                try {
                    doWork(message);
                } finally {
                    System.out.println("consumer1 [x] Done");
                    // 訊息處理完成確認
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(queue_name, false, consumer);
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000); // 暫停1秒鐘
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }

}

消費者2:

package wxtest.rabbitMq.workQueue;

import com.rabbitmq.client.*;
import wxtest.rabbitMq.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消費者
 */
public class consumer2 {
    //與生產者的佇列名相同
    public static String queue_name = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue_name, false, false, false, null);
        System.out.println("consumer2 [*] Waiting for messages. To exit press CTRL+C");
        channel.basicQos(1);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "Utf-8");
                System.out.println(" [consumer2] Received '" + message + "'");

                try {
                    doWork(message);
                } finally {
                    System.out.println("consumer2 [x] Done");
                    // 訊息處理完成確認
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(queue_name, false, consumer);
    }

    private static void doWork(String message) {
        try {
            Thread.sleep(1000); // 暫停1秒鐘
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }

}

輸出結果 消費者1為

consumer1 [*] Waiting for messages. To exit press CTRL+C  consumer1 [x] Received 'Hello World! 1' consumer1 [x] Done  consumer1 [x] Received 'Hello World! 3' consumer1 [x] Done 輸出結果 消費者2為

consumer2 [*] Waiting for messages. To exit press CTRL+C  [consumer2] Received 'Hello World! 0' consumer2 [x] Done  [consumer2] Received 'Hello World! 2' consumer2 [x] Done  [consumer2] Received 'Hello World! 4' consumer2 [x] Done  

忘記確認 忘記通過basicAck返回確認資訊是常見的錯誤。這個錯誤非常嚴重,將導致消費者客戶端退出或者關閉後,訊息會被退回RabbitMQ伺服器,這會使RabbitMQ伺服器記憶體爆滿,而且RabbitMQ也不會主動刪除這些被退回的訊息。  如果要監控這種錯誤,可以使用rabbitmqctl messages_unacknowledged命令打印出出相關的資訊。