RabbitMQ學習之旅(一)
阿新 • • 發佈:2018-11-06
RabbitMQ學習總結(一)
RabbitMQ簡介
RabbitMQ是一個訊息代理,其接收並轉發訊息。類似於現實生活中的郵局:你把信件投入郵箱的過程,相當於往佇列中新增資訊,因為所有郵箱中的信件最終都會彙集到郵局中;當郵遞員把你的新建傳送給收件人的時候,相當於訊息的轉發。
RabbitMQ中的常見術語
- 生產者(Provider):生產者負責生產訊息,並將其傳送到訊息佇列中
- 佇列(Queue):訊息代理(Proxy)角色,從生產者那裡接收訊息,並將其轉發到消費者進行消費。佇列主要受限於主機的記憶體和磁碟的大小,其本質是一個訊息緩衝區
- 消費者(Consumer):消費者從佇列中接收訊息,並對訊息進行處理
- 交換機(Exchange):交換機位於生產者(Provider)和佇列(Queue)之間,相當於路由(Routing),伺服器會根據路由鍵(RoutingKey)將訊息經由交換機路由到對應的佇列(Queue)上去
- 通道(Channel):通道是生產者與消費者和佇列進行通訊的渠道,其是建立在TCP連線上的虛擬連線;RabbitMQ在一條TCP連線上建立成千上百條通道來達到多執行緒處理,這個TCP被多個執行緒共享,每個執行緒對應一個通道,每個通道對應唯一的ID,保障通道私有性
RabbitMQ簡單使用
STEP1:準備`Maven`依賴
<dependency >
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>RELEASE</version>
</dependency>
STEP2:生產者
public class Provider {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線
ConnectionFactory factory = new ConnectionFactory();
// 設定 RabbitMQ 的主機名
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("username");
factory.setPassword("password");
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個通道
Channel channel = connection.createChannel();
// 指定一個佇列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// 引數1 queue :佇列名
// 引數2 durable :是否持久化
// 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除
// 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列
// 引數5 arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 傳送訊息
String message = "Hello World!";
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
// 引數1 exchange :交換器
// 引數2 routingKey : 路由鍵
// 引數3 props : 訊息的其他引數
// 引數4 body : 訊息體
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 關閉頻道和連線
channel.close();
connection.close();
}
}
注意:宣告佇列是冪等的,代表著只有在佇列不存在的時候才會被建立,多次宣告並不會重複建立。
STEP3:消費者
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線
ConnectionFactory factory = new ConnectionFactory();
// 設定 RabbitMQ 的主機名
factory.setHost("ip");
factory.setPort(5672);
factory.setUsername("username");
factory.setPassword("password");
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個通道
Channel channel = connection.createChannel();
// 指定一個佇列
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
// 引數1 queue :佇列名
// 引數2 durable :是否持久化
// 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除
// 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列
// 引數5 arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 建立佇列消費者,此處是一個回撥函式,當消費者接收到來自訊息佇列的訊息時,會呼叫該介面中被重寫的方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
// basicConsume(String queue, boolean autoAck, Consumer callback)
// 引數1 queue :佇列名
// 引數2 autoAck : 是否自動ACK;為true的時候表示自動回覆,為false的時候表示手動回覆;自動回覆的含義是,當消費
// 者接收到訊息的時候,就自動認定該訊息已經被消費;手動回覆的含義是,不進行自動回覆,只有接收到方法傳送到的確認信
// 息,才確認該訊息已經被消費
// 引數3 callback : 消費者物件的一個介面,用來配置回撥
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}