1. 程式人生 > >rabbitmq系列(二)幾種常見模式的應用場景及實現

rabbitmq系列(二)幾種常見模式的應用場景及實現

一、簡單模式

原理:生產者將訊息交給預設的交換機,交換機獲取訊息後交給繫結這個生產者的佇列(投遞規則為佇列名稱和routing key 相同的佇列),監聽當前佇列的消費者獲取資訊並執行消費邏輯。

場景:有一個oa系統,使用者通過接收手機驗證碼進行註冊,頁面上點選獲取驗證碼後,將驗證碼放到訊息佇列,然後簡訊服務從佇列中獲取到驗證碼,併發送給使用者。

實現:

生產者:

public class Producter {

    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();
        // 4. 通過channel釋出訊息
        /**
         * 四個引數:
         * 第一個引數是交換機的名稱
         * 第二個引數是路由鍵
         * 第三個引數標識訊息的一些額外的屬性
         * 第四個是訊息的具體的內容
         */
        String message = "位元組";
        for(int i = 0;i < 5;i ++){

            channel.basicPublish("","byte001",null,message.getBytes());
        }
        // 5. 釋放資源,釋放channel 和 連結物件

        channel.close();
        connection.close();
    }
}

消費者:

public class Consumer {

    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        // 4. 創建出訊息佇列
        /**
         * 第一個引數是訊息佇列的名稱
         * 第二個引數表示訊息是否持久化
         * 第三個引數標識訊息佇列是否被channel獨佔
         * 第四個引數標識是否自動刪除訊息佇列,當訊息佇列沒有繫結交換機後是否自動刪除
         * 第五個是訊息佇列擴充套件引數
         */
        String queueName = "byte001";
        channel.queueDeclare(queueName, true, false, false, null);

        // 5. 建立消費者,對訊息進行處理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("獲取到的訊息:"+s);
            }
        };
        // 6. 通過channel消費者和訊息佇列關聯
        /**
         * 第一個引數是訊息佇列的名字
         * 第二個引數是否自動簽收(即消費訊息後告知伺服器已被消費)
         * 第三個引數是消費者
         */
        channel.basicConsume(queueName, true, consumer);
    }
}

二、工作模式

原理:生產者將訊息交給交換機,交換機交給繫結的佇列,佇列有多個消費者監聽,一條訊息只能由一個消費者消費,這樣就形成了資源競爭,誰的資源空閒大,爭搶到的可能性就大。

場景:有一個電商平臺,有兩個訂單服務,使用者下單的時候,任意一個訂單服務消費使用者的下單請求生成訂單即可。不用兩個訂單服務同時消費使用者的下單請求。

實現:

生產者:

public class Producter {

    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        // 申明佇列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 4. 通過channel釋出訊息
        /**
         * 四個引數:
         * 第一個引數是交換機的名稱
         * 第二個引數是路由鍵
         * 第三個引數標識訊息的一些額外的屬性
         * 第四個是訊息的具體的內容
         */
        String message = "位元組";
        for(int i = 0;i < 100;i ++){

            channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes());
        }
        // 5. 釋放資源,釋放channel 和 連結物件

        channel.close();
        connection.close();
    }
}

消費者1:

public class Consumer {

    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        // 4. 創建出訊息佇列
        /**
         * 第一個引數是訊息佇列的名稱
         * 第二個引數表示訊息是否持久化
         * 第三個引數標識訊息佇列是否被channel獨佔
         * 第四個引數標識是否自動刪除訊息佇列,當訊息佇列沒有繫結交換機後是否自動刪除
         * 第五個是訊息佇列擴充套件引數
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1); // 告訴伺服器,在我們沒有確認當前訊息時不要給我們傳送新的訊息
        // 5. 建立消費者,對訊息進行處理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者1收到的內容:"+s);
                try {
                    Thread.sleep(10); // 模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 引數2false為確認收到訊息,true為拒絕收到訊息
            }
        };
        // 6. 通過channel消費者和訊息佇列關聯
        /**
         * 第一個引數是訊息佇列的名字
         * 第二個引數是否自動簽收(即消費訊息後告知伺服器已被消費)
         * 第三個引數是消費者
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

消費者2:

public class Consumer2 {

    public static final String QUEUE_NAME = "byte002";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        // 4. 創建出訊息佇列
        /**
         * 第一個引數是訊息佇列的名稱
         * 第二個引數表示訊息是否持久化
         * 第三個引數標識訊息佇列是否被channel獨佔
         * 第四個引數標識是否自動刪除訊息佇列,當訊息佇列沒有繫結交換機後是否自動刪除
         * 第五個是訊息佇列擴充套件引數
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1); // 告訴伺服器,在我們沒有確認當前訊息時不要給我們傳送新的訊息
        // 5. 建立消費者,對訊息進行處理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者2收到的內容:"+s);
                try {
                    Thread.sleep(500); // 模擬消費耗時
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false); // 引數2false為確認收到訊息,true為拒絕收到訊息
            }
        };
        // 6. 通過channel消費者和訊息佇列關聯
        /**
         * 第一個引數是訊息佇列的名字
         * 第二個引數是否自動簽收(即消費訊息後告知伺服器已被消費)
         * 第三個引數是消費者
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

保證資源競爭的程式碼就是這一行channel.basicQos(1);如果不加這一行,我們會發現兩個消費者是輪詢消費訊息的。

三、釋出訂閱模式

原理:生產者將訊息扔給交換機,交換機型別是fanout,不同的佇列註冊到交換機上,不同的消費註冊在不同的佇列上。所有消費者都會收到訊息。

場景:有一個商城,我們新新增一個商品後,可能同時需要去更新快取和資料庫。

實現:

生產者:

public class Producter {

    // 定義交換機的名字
    public static final String EXCHANGE_NAME="byte003";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();
        // 定義一個交換機,型別是fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        // 因為訊息先發到交換機,交換機沒有儲存功能,所以如果沒有消費者,訊息會丟失
        channel.basicPublish(EXCHANGE_NAME,"",null,"釋出訂閱模式的訊息".getBytes());

        channel.close();
        connection.close();
    }
}

消費者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte003";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue003";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消費者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte003";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue004";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

需要注意的一點就是交換機沒有儲存功能,如果沒有消費者,則訊息會丟失。

四、路由模式

原理:生產者將訊息傳送給交換機,訊息攜帶具體的routingkey。交換機型別是direct,接收到訊息中的routingkey,比對與之繫結的佇列的routingkey,分發到不同的佇列上。

場景:還是一樣,有一個商城,新添加了一個商品,實時性不是很高,只需要新增到資料庫即可,不用重新整理快取。

實現:

生產者:

public class Producter {

    // 定義交換機的名字
    public static final String EXCHANGE_NAME="byte004";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();
        // 定義一個交換機,型別是direct
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        // 因為訊息先發到交換機,交換機沒有儲存功能,所以如果沒有消費者,訊息會丟失
        channel.basicPublish(EXCHANGE_NAME,"key1",null,"釋出路由模式的訊息".getBytes());

        channel.close();
        connection.close();
    }
}

消費者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte004";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue005";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        /**
         * 引數3是routingkey,只有和它一樣的routingkey的訊息才會被當前消費者收到
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"key1");
        // 如果要接收多個routingkey的訊息,在執行一次上面的程式碼即可,如下
        channel.queueBind(queueName,EXCHANGE_NAME,"key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消費者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte004";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue006";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"key2");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

五、主題模式

原理:路由模式的一種,路由功能添加了模糊匹配。星號(*)代表1個單詞,#號(#)代表一個或多個單詞。具體可參考路由模式。

場景:還是一樣,有一個商城,新添加了一個商品,實時性不是很高,只需要新增到資料庫即可,資料庫包含了主資料庫mysql1和從資料庫mysql2的內容,不用重新整理快取。

實現:

生產者:

public class Producter {

    // 定義交換機的名字
    public static final String EXCHANGE_NAME="byte004";
    public static void main(String[] args) throws Exception {

        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();
        // 定義一個交換機,型別是topic
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 因為訊息先發到交換機,交換機沒有儲存功能,所以如果沒有消費者,訊息會丟失
        channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"釋出路由模式的訊息".getBytes());

        channel.close();
        connection.close();
    }
}

消費者1:

public class Consumer1 {
    public static final String EXCHANGE_NAME="byte004";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue005";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        /**
         * 引數3是routingkey,只有和它一樣的routingkey的訊息才會被當前消費者收到
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"key.*");
        // 如果要接收多個routingkey的訊息,在執行一次上面的程式碼即可,如下
        channel.queueBind(queueName,EXCHANGE_NAME,"abc.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者1:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

消費者2:

public class Consumer2 {
    public static final String EXCHANGE_NAME="byte004";

    public static void main(String[] args) throws Exception {


        // 1. 創建出連結工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 通過連結工廠建立連結物件
        Connection connection = factory.newConnection();

        // 3. 通過連結物件創建出channel
        Channel channel = connection.createChannel();

        String queueName = "queue006";
        channel.queueDeclare(queueName,false,false,false,null);

        // 繫結佇列到交換機
        channel.queueBind(queueName,EXCHANGE_NAME,"key.#");
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * consumerTag 用來標識.可以再監聽佇列時候設定
             * envelope 信封,通過envelope可以通過這個獲取到很多東西
             * properties 額外的訊息屬性
             * body:訊息體
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String s = new String(body, "UTF-8");
                System.out.println("消費者2:"+s);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(queueName,false,consumer);
    }
}

程式碼已上傳:

github地址: https://github.com/binzh303/zhixie-code-example

gitee地址: https://gitee.com/javaXiaoCaiJi/zhixie-code-example

如果文章對您有幫助,請記得點贊關注喲~
歡迎大家關注我的公眾號:位元組傳說,每日推送技術文章供大家學習參考。