1. 程式人生 > >RabbitMQ在java-web中的簡單應用

RabbitMQ在java-web中的簡單應用

RabbitMQ的使用場景

MQ,是Message Queue(訊息佇列)的簡寫。簡而言之,RabbitMQ就是將訊息儲存在佇列中。 在專案的實際開發過程中,可以將一些無需即時返回結果且耗時的操作提取出來,進行非同步處理。這種處理方式能夠大大節省伺服器的請求響應時間,從而提高系統的吞吐量。

比如:以去年雙十一淘寶成交額為例

2017年雙十一成交額

當天每秒下訂單筆數超過32.5萬筆,支付筆數超過25.6萬筆。 也就是說,需要阿里的伺服器每秒進行32.5萬個“生成訂單”的操作,還要進行25.6萬個“支付訂單”的操作。 而這些與money相關的,都是一些耗時的操作。如果要求即時返回這些操作的處理結果,伺服器的壓力太大。 實際能夠進行優化後的做法是:把兩種不同的操作,放到兩個不同的佇列中延後進行處理。 如:“剁手黨”之一的我在當天進行了一個下單操作,伺服器會將這個操作的具體邏輯放到“order_queue”佇列中,就告知我下單成功,然後伺服器會在空閒時處理並生成相應的訂單。這種做法,不僅提升了使用者的使用體驗(我覺得很快就下單成功了),還能夠緩解伺服器的壓力。

常用的訊息佇列還有:ZeroMQ,ActiveMQ,Kafka等等,有興趣的可以自行了解。這篇部落格主要是學習RabbitMQ的簡單應用,這裡就不探討這幾種訊息佇列的優劣勢了。

在Windows下安裝RabbitMQ

RabbitMQ是使用Erlang語言編寫的一個開源的訊息佇列。

  1. 首先,在Erlang官網下載對應版本的Erlang平臺。 erlang依賴
  2. 然後執行可執行檔案(otp_win64_21.1)。按預設配置進行安裝,更改一下檔案的存放目錄即可。出現如下圖說明Erlang平臺安裝成功。 erlang平臺安裝成功
  3. 再然後在RabbitMQ官網下載最新版本的RabbitMQ Server。 RabbitMQ Server下載
  4. 繼續按預設配置進行安裝,出現如下圖說明RabbitMQ Server安裝成功。 RabbitMQ Server安裝成功
  5. 這種做法,是將RabbitMQ暴露為Window的一個服務。在這裡啟動RabbitMQ後,可以在15672埠用預設賬戶密碼進行登入(guest/guest)。 預設賬戶guest登入RabbitMQ Server
  6. 出現如下圖說明登入成功。 RabbitMQ Server主頁

RabbitMQ中的幾個重要概念

  • Producer

Producer,生產者。訊息的傳送方即為生產者。

  • Consumer

Consumer,消費者。訊息的接收方即為消費者。

  • Connection

Connection,獲取RabbitMQ(訊息佇列)服務的長連線。得到長連線例項的方式有兩種:

  1. 手動設定相關引數(host/port etc…)
/**
  * 不設定的話,會使用預設的引數(userName:guest,password:guest,virtualHost:/,hostName:localhost,portNumber:5672)
  */
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection conn = factory.newConnection();
  1. URI設定相關引數
/**
  * 不設定的話,會使用預設的引數(userName:guest,password:guest,virtualHost:/,hostName:localhost,portNumber:5672)
  */
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection connection = factory.newConnection();
  • Channel

Channel,通道。通過通道來決定“生產者如何往佇列中傳送訊息”、“消費者如何從佇列中接收訊息”。獲取通道例項的方式如下:

Channel channel = connection.createChannel();
  • Exchange

Exchange,交換機。生產者往佇列中傳送訊息,實際不是直接傳送給佇列,而是先發送給交換機,由交換機決定實際將訊息傳送給哪個佇列。

  • Queue

Queue,佇列。儲存訊息的地方。交換機和佇列都是通過通道例項來宣告,而且交換機與佇列直接是存在對應關係的。具體的原始碼如下:

channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName, exchangeName, routingKey);

具體的程式碼解釋如下:

通過ChannelIN.exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)來宣告交換機例項,第一個引數是交換機的名稱,第二個引數是交換機的型別(direct/fanout/topic/headers),第三個引數是該交換機是否持久化。 通過ChannelIN.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)來宣告佇列例項,第一個引數是佇列的名稱,第二個引數是該佇列是否持久化,第三個引數是是否只有自己能夠看到該對列(排他性),第四個引數是當沒有消費者佔用該佇列時是否刪除該佇列。 通過queueBind(String queue, String exchange, String routingKey)繫結該交換機和佇列。routingKey是繫結佇列和交換機之間的路由規則。

這些概念之間的關係結構圖如下:

RabbitMQ的元素結構圖

RabbitMQ在java-web中的應用

  • 必須要匯入RabbitMQ Server的POM依賴
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.0.3</version>
</dependency>
  • 傳送訊息的生產者測試程式碼如下:
/**
 * RabbitMQ中生產者測試程式碼
 * @author zhenye 2018/9/29
 */
@Slf4j
public class ProducerTest {

    private final static String EXCHANGE_NAME = "MY_EXCHANGE";
    private final static String QUEUE_NAME = "MY_QUEUE";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        String routingKey = "123";
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
        String message = "Hello RabbitMQ, I will send some message to the consumer.";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
        log.info("Producer send message, the content is :" + message);
        channel.close();
        connection.close();
    }
}
  • 接收訊息的消費者測試程式碼如下:
/**
 * RabbitMQ中消費者測試程式碼
 * @author zhenye 2018/9/29
 */
@Slf4j
public class ConsumerTest {
    private final static String QUEUE_NAME = "MY_QUEUE";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        log.info("Consumer is waiting!");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                log.info("Consumer received message, the content is:" + message);
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
  • 演示過程

連續執行5次ProducerTest.main(),即有5個消費者往佇列(MY_QUEUE)中傳送了訊息。

RabbitMQ Server主頁顯示如下:

主頁效果圖1

結果表明,在RabbitMQ的所有佇列中,還有5條待處理的訊息。

接著執行ConsumerTest.main(),效果圖如下:

主頁效果圖2

結果表明,該消費者一次性處理完了5條訊息。

  • 流程說明

生產者是通過ChannelIN.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)來發送訊息,第一個引數是交換機名稱,第二個引數是路由規則Key,第三個引數是全域性屬性,第四個引數是訊息實體。第一、二個引數是可以確定要儲存訊息的佇列的。

消費者是通過DefaultConsumer.handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)來接收並處理訊息的。由於消費者是直接從佇列中獲取訊息,只要保證生產者存入訊息的佇列(交換機名稱、路由Key確定的佇列),與消費者接收訊息的佇列(直接指定QUEUE_NAME)相同,就能正確地取出訊息。