1. 程式人生 > >rabbitmq學習(二):rabbitmq(消息隊列)的作用以及rabbitmq之直連交換機

rabbitmq學習(二):rabbitmq(消息隊列)的作用以及rabbitmq之直連交換機

tde pub 假設 代碼 持久化 tor 安裝 live 服務

前言

  上篇介紹了AMQP的基本概念,組成及其與rabbitmq的關系。了解了這些東西後,下面我們開始學習rabbitmq(消息隊列)的作用以及用java代碼和rabbitmq通訊進行消息發布和接收。因為消息的的接收以及路由都是通過交換機實現的,所以接下來我們要學習如何利用不同的交換機進行消息的發布。最後會再學習如何利用rabbitmq進行rpc的調用。

一、rabbitmq(消息隊列)的作用

1.異步處理消息

  假設用戶在網站註冊成功後,需要向用戶發送郵件和信息提示其註冊成功。正常的做法是,後臺將註冊信息寫入數據庫,然後再給用戶發郵件發短信。

  技術分享圖片

  該流程的問題在於,我們其實只需要將註冊信息寫入數據庫之後就可以告知用戶註冊成功,並不需要等待發送郵件和發送短信成功後再去告知。這樣會延長請求處理的時間。利用消息隊列我們可以異步的解決這個問題。

  技術分享圖片

  我們可以在將註冊信息寫入數據庫之後,把要發送註冊郵件和發送短信的消息寫入消息隊列,然後就告知用戶註冊成功。發送郵件和短信將由訂閱了消息的應用異步的去執行。這樣將節省請求處理的時間。

2.系統解耦

  購物網站通常會將訂單系統和庫存系統分成兩個不同的應用。正常情況下用戶下單後訂單系統會調用庫存系統,然後返回給用戶信息。

  技術分享圖片

  這裏有兩個問題:1.如果庫存系統掛了,那麽下單就會失敗。

          2.訂單系統和庫存系統耦合度太高

  為了解決這個問題就可以引入消息隊列

  技術分享圖片

  訂單系統在處理完業務邏輯後,將訂單消息寫入消息隊列,庫存系統訂閱訂單消息,消息隊列就會將訂單消息推送給庫存系統。庫存系統再去處理。這樣就解決了上述兩個問題。即使庫存系統掛了了,消息隊列也會將訂單消息持久化,保證庫存系統正常後,可以正確的處理庫存。

3.流量削峰

  流量削峰在秒殺活動中應用廣泛

  場景:秒殺活動,一般會因為流量過大,導致應用掛掉,為了解決這個問題,一般在應用前段加入消息隊列。此時起到兩個作用:

    a.可以控制活動人數,超過一定閾值的訂單直接丟棄

    b.可以緩解短時間的高流量壓垮應用(應用程序按自己的最大處理能力獲取訂單)

  技術分享圖片

  用戶發來請求,服務器收到之後,先寫入消息隊列,加入消息超過隊列的最大長度,則直接丟掉用戶請求,或跳轉到錯誤頁面。秒殺業務再根據隊列中的消息,做後續處理。

二、準備

在寫代碼之前,還需要我們先安裝rabbitmq,因為rabbitmq是用erlang開發的,所以還需要我們下載erlang。具體下載的教程,這裏不再講解,網站有很多教程,可以學習一下。這裏說一下比較關鍵的幾個點。

  1.rabbitmq有個管理後臺,訪問地址為localhost:15672,默認的用戶名:guest,默認的密碼:guest

  2.client端通信口5672

  3.管理口15672

  4.server間內部通信口25672

  5.erlang發現口:4369

三、直連交換機代碼實現

  1.生產者

  

/**
 * 消息發送者
 */
public class LogProducer {
    //交換機名字
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};

    public static void main(String[] args) {
        //創建連接工廠並設置連接信息,這些連接信息都是默認的,所以這裏註釋掉了,打開也是可以的。
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        connectionFactory.setHost("localhost");
//        connectionFactory.setUsername("guest");
//        connectionFactory.setPassword("guest");
//        connectionFactory.setPort(5672);
        Connection connection = null;
        Channel channel = null;
        try {
            //獲取連接
            connection = connectionFactory.newConnection();
            //連接中打開通道
            channel = connection.createChannel();
            //聲明交換機
            //參數1:交換機名字,參數2:交換機類型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            for (String routingKey : routingKeys){
                //將消息發送給交換機,我們這裏發送的消息就是routingKey
                //參數1:交換機名字,參數2:消息路由鍵,參數3:消息屬性,參數4:消息體
                channel.basicPublish(EXCHANGE_NAME, routingKey,null, routingKey.getBytes());
                System.out.println("RoutingSendDirect -> routingkey: " + routingKey + ", send message " + routingKey);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

  2.消費者

  消費者1

  

/**
 * 消息消費者1
 */
public class Consumer1 {
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{"info", "warning"};

    public static void main(String[] args) {
        //創建連接工廠並設置連接信息,這些連接信息都是默認的,所以這裏註釋掉了,打開也是可以的。
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        connectionFactory.setHost("localhost");
//        connectionFactory.setUsername("guest");
//        connectionFactory.setPassword("guest");
//        connectionFactory.setPort(5672);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //聲明隊列,這裏隊列的名字由代理自動生成
            String queueName = channel.queueDeclare().getQueue();
            //聲明交換機
            //參數1:交換機名字,參數2:交換機類型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            for (String routingKey : routingKeys) {
                //將交換機和隊列用routing key綁定起來
                //參數1:隊列名,參數2:交換機名,參數3:隊列和交換機之間綁定的路由鍵
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("ReceveLogDirect1 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            }

            //聲明消費者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定從哪個消費者從哪個通道獲取消息,並指明自動確認的機制
            //參數1:隊列名,參數2:確認機制,true表示自動確認,false代表手動確認,參數3:消費者
            channel.basicConsume(queueName, true, consumer);
            System.out.println("ReceveLogDirect1 waitting for message");
            
            while (true){
                //獲取消息,這一步會一直阻塞,直到收到消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                //獲取消息
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceveLogDirect1 receive message " + message);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  消費者2

  

/**
 * 消息消費者2
 */
public class Consumer2 {
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由關鍵字
    private static final String[] routingKeys = new String[]{"error"};

    public static void main(String[] args) {
        //創建連接工廠並設置連接信息,這些連接信息都是默認的,所以這裏註釋掉了,打開也是可以的。
        ConnectionFactory connectionFactory = new ConnectionFactory();
//        connectionFactory.setHost("localhost");
//        connectionFactory.setUsername("guest");
//        connectionFactory.setPassword("guest");
//        connectionFactory.setPort(5672);
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //聲明隊列,這裏隊列的名字由代理自動生成
            String queueName = channel.queueDeclare().getQueue();
            //聲明交換機
            //參數1:交換機名字,參數2:交換機類型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            for (String routingKey : routingKeys) {
                //將交換機和隊列用routing key綁定起來
                //參數1:隊列名,參數2:交換機名,參數3:隊列和交換機之間綁定的路由鍵
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
                System.out.println("ReceveLogDirect2 -> queue: " + queueName + ", exchange_name: " + EXCHANGE_NAME + ", routingKey: " + routingKey);
            }

            //聲明消費者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定從哪個消費者從哪個通道獲取消息,並指明自動確認的機制
            //參數1:隊列名,參數2:確認機制,true表示自動確認,false代表手動確認,參數3:消費者
            channel.basicConsume(queueName, true, consumer);
            System.out.println("ReceveLogDirect2 waitting for message");

            while (true){
                //獲取消息,這一步會一直阻塞,直到收到消息
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                //獲取消息
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("ReceveLogDirect2 receive message " + message);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  先運行Consumer1,Consumer2。再運行LogProducer。運行結果如下:

  LogProducer:

  技術分享圖片

  Consumer1:

  技術分享圖片

  Consumer2:

  技術分享圖片

  從運行結果中可以看出,消息是根據路由鍵被交換機路由到對應的隊列上的。

  

rabbitmq學習(二):rabbitmq(消息隊列)的作用以及rabbitmq之直連交換機