RabbitMQ 實戰教程(一)
阿新 • • 發佈:2019-01-27
MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。RabbitMQ是資訊傳輸的中間者。本質上,他從生產者接收訊息,轉發這些訊息給消費者。換句話說,RabbitMQ能夠按根據你指定的規則進行訊息轉發、緩衝、和持久化。
在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。
常見術語
producer(生產者),consumer(消費者),broker(RabbitMQ服務)並不需要部署在同一臺機器上,實際上在大多數實際的應用中,也不會部署在同一臺機器上。
生產者(Producer)
一個傳送訊息的程式是一個producer(生產者)。
一般用下圖表示生產者:
佇列(Queue)
佇列類似郵箱。依存於RabbitMQ內部。雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在佇列中。佇列不受任何限制,可以儲存任何數量的訊息—本質上是一個無限制的快取。不同的生產者可以通過同一個佇列傳送訊息,此外,不同的消費者也可以從同一個佇列上接收訊息。
一般用下圖表示佇列:
消費者(Consumer)
消費者屬於等待接收訊息的程式。
一般使用下圖表示消費者:
案例實戰
準備工作
使用Maven進行管理
- <dependency>
- <groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.6.3</version>
- </dependency>
傳送端
傳送端,連線到RabbitMQ(此時服務需要啟動),傳送一條資料,然後退出。
- package com.lianggzone.rabbitmq.demo.helloworld;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- publicclassSend{
- privatefinalstaticString QUEUE_NAME ="hello";
- publicstaticvoid main(String[] args)throwsIOException,TimeoutException{
- // 建立連線
- ConnectionFactory factory =newConnectionFactory();
- // 設定MabbitMQ, 主機ip或者主機名
- factory.setHost("localhost");
- // 建立一個連線
- Connection connection = factory.newConnection();
- // 建立一個通道
- Channel channel = connection.createChannel();
- // 指定一個佇列
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- // 傳送訊息
- String message ="Hello World!";
- channel.basicPublish("", QUEUE_NAME,null, message.getBytes());
- System.out.println(" [x] Sent '"+ message +"'");
- // 關閉頻道和連線
- channel.close();
- connection.close();
- }
- }
接受端
接受端,不斷等待伺服器推送訊息,然後在控制檯輸出。
- package com.lianggzone.rabbitmq.demo.helloworld;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- publicclassRecv{
- privatefinalstaticString QUEUE_NAME ="hello";
- publicstaticvoid main(String[] args)throwsIOException,TimeoutException{
- // 建立連線
- ConnectionFactory factory =newConnectionFactory();
- // 設定MabbitMQ, 主機ip或者主機名
- factory.setHost("localhost");
- // 建立一個連線
- Connection connection = factory.newConnection();
- // 建立一個通道
- Channel channel = connection.createChannel();
- // 指定一個佇列
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- // 建立佇列消費者
- Consumer consumer =newDefaultConsumer(channel){
- @Override
- publicvoid handleDelivery(String consumerTag,Envelope envelope, AMQP.BasicProperties properties,
- byte[] body)throwsIOException{
- String message =newString(body,"UTF-8");
- System.out.println(" [x] Received '"+ message +"'");
- }
- };
- channel.basicConsume(QUEUE_NAME,true, consumer);
- }
- }