1. 程式人生 > >【SSM分散式架構電商專案-27】RabbitMQ的5種佇列

【SSM分散式架構電商專案-27】RabbitMQ的5種佇列

5種佇列

這裡寫圖片描述

匯入itcast-rabbitmq

這裡寫圖片描述

簡單佇列

這裡寫圖片描述
P:訊息的生產者
C:訊息的消費者
紅色:佇列

生產者將訊息傳送到佇列,消費者從佇列中獲取訊息。

匯入RabbitMQ的客戶端依賴

這裡寫圖片描述

獲取MQ的連線

這裡寫圖片描述

package cn.itcast.rabbitmq.util;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {

    public static Connection getConnection
() throws Exception { //定義連線工廠 ConnectionFactory factory = new ConnectionFactory(); //設定服務地址 factory.setHost("localhost"); //埠 factory.setPort(5672); //設定賬號資訊,使用者名稱、密碼、vhost factory.setVirtualHost("/taotao"); factory.setUsername("taotao"
); factory.setPassword("taotao"); // 通過工程獲取連線 Connection connection = factory.newConnection(); return connection; } }

生產者傳送訊息到佇列

這裡寫圖片描述

package cn.itcast.rabbitmq.simple;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public
class Send { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從連線中建立通道 Channel channel = connection.createChannel(); // 宣告(建立)佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 訊息內容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和連線 channel.close(); connection.close(); } }

管理工具中檢視訊息

這裡寫圖片描述
點選上面的佇列名稱,查詢具體的佇列中的資訊:
這裡寫圖片描述

消費者從佇列中獲取訊息

這裡寫圖片描述

package cn.itcast.rabbitmq.simple;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

Work模式

這裡寫圖片描述

這裡寫圖片描述
一個生產者、2個消費者。

一個訊息只能被一個消費者獲取。

消費者1

package cn.itcast.rabbitmq.work;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻伺服器只會發一條訊息給消費者
        //channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回確認狀態
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2

package cn.itcast.rabbitmq.work;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一時刻伺服器只會發一條訊息給消費者
        //channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成狀態
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

生產者

向佇列中傳送50條訊息。
這裡寫圖片描述

package cn.itcast.rabbitmq.work;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 訊息內容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

測試

測試結果:
1、 消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
2、 消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。

其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的訊息多才對。

Work模式的“能者多勞”

這裡寫圖片描述
測試:
消費者1比消費者2獲取的訊息更多。

訊息的確認模式

消費者從佇列中獲取訊息,服務端如何知道訊息已經被消費呢?

模式1:自動確認
只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功訊息,都認為是訊息已經成功消費。
模式2:手動確認
消費者從佇列中獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。

自動模式:
這裡寫圖片描述
手動模式:
這裡寫圖片描述

訂閱模式

這裡寫圖片描述
這裡寫圖片描述
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個佇列
3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機
4、每個佇列都要繫結到交換機
5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的

這裡寫圖片描述

訊息的生產者(看作是後臺系統)

向交換機中傳送訊息。
這裡寫圖片描述
注意:訊息傳送到沒有佇列繫結的交換機時,訊息將丟失,因為,交換機沒有儲存訊息的能力,訊息只能存在在佇列中。

消費者1(看作是前臺系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.ps;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2(看作是搜尋系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.ps;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

測試

測試結果:
同一個訊息被多個消費者獲取。

在管理工具中檢視佇列和交換機的繫結關係:

這裡寫圖片描述

使用訂閱模式能否實現商品資料的同步?

答案:可以的。

後臺系統就是訊息的生產者。
前臺系統和搜尋系統是訊息的消費者。
後臺系統將訊息傳送到交換機中,前臺系統和搜尋系統都建立自己的佇列,然後將佇列繫結到交換機,即可實現。

訊息,新增商品、修改商品、刪除商品。

前臺系統:修改商品、刪除商品。
搜尋系統:新增商品、修改商品、刪除商品。

所以使用訂閱模式實現商品資料的同步並不合理。

路由模式

這裡寫圖片描述

這裡寫圖片描述

生產者

這裡寫圖片描述

package cn.itcast.rabbitmq.routing;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 訊息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1(前臺系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.routing;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費2(搜尋系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.routing;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }![這裡寫圖片描述](https://img-blog.csdn.net/20180514155441717?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Nja2V2aW5jeWg=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
}

萬用字元模式

這裡寫圖片描述

這裡寫圖片描述

生產者

這裡寫圖片描述

package cn.itcast.rabbitmq.topic;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 訊息內容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1(前臺系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.topic;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消費者2(搜尋系統)

這裡寫圖片描述

package cn.itcast.rabbitmq.topic;

import cn.itcast.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}