RabbitMQ入門案例

Rabbit 模式

https://www.rabbitmq.com/getstarted.html

實現步驟

  • 構建一個 maven工程
  • 匯入 rabbitmq的依賴
  • 啟動 rabbitmq-server服務
  • 定義生產者
  • 定義消費者
  • 觀察訊息的在 rabbitmq-server服務中的程序

初步實現

前期準備

1.構建專案

2.匯入依賴

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>

簡單模型

在上圖的模型中,有以下概念:

  1. 生產者,也就是要傳送訊息的程式
  2. 消費者:訊息的接受者,會一直等待訊息到來。
  3. 訊息佇列:圖中紅色部分。類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息。

所有的中介軟體技術都是基於tcp/ip協議基礎之上構建新型的協議規範,只不過rabbitmq遵循的是amqp

實現步驟:

  1. 建立連線工程
  2. 建立連線 connection
  3. 通過連接獲取通道 Channel
  4. 通過通道建立交換機,宣告佇列,繫結關係,路由key,傳送訊息,和接收訊息
  5. 準備訊息內容
  6. 傳送訊息給佇列 queue
  7. 關閉連線
  8. 關閉通道

生產者

public class Producer {
public static void main(String[] args) {
//1. 建立連線工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//這裡要使用自己的IP地址
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2. 建立連線 connection
connection = connectionFactory.newConnection("生產者");
//3. 通過連接獲取通道 Channel
channel = connection.createChannel();
//4. 通過通道建立交換機,宣告佇列,繫結關係,路由key,傳送訊息
String quequeName = "queuel";
/**
* @params1 佇列的名稱
* @params2 是否要持久化 durable-false
* @params3 排他性,是否是獨佔獨立
* @params4 是否自動刪除,隨著最後一個消費者訊息完畢以後是否把佇列自動刪除
* @params5 攜帶的附屬引數
*/
channel.queueDeclare(quequeName,false,false,false,null);
//5. 準備訊息內容
String message = "Hello,Consumer";
//6. 傳送訊息給佇列 queue
channel.basicPublish("",quequeName,null,message.getBytes());
System.out.println("訊息傳送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7. 關閉連線
if (channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8. 關閉通道
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

消費者

public class Consumer {

    public static void main(String[] args) {
//1. 建立連線工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//這裡要使用自己的IP地址
connectionFactory.setHost("192.168.57.129");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/"); Connection connection = null;
Channel channel = null;
try {
//2. 建立連線 connection
connection = connectionFactory.newConnection("消費者");
//3. 通過連接獲取通道 Channel
channel = connection.createChannel();
//4. 通過通道建立交換機,宣告佇列,繫結關係,路由key,傳送訊息,和接收訊息
String quequeName = "queue1";
channel.queueDeclare(quequeName,false,false,false,null);
//5.監聽訊息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
consumerTag:訊息者標籤,channel.basicConsume可以指定
envelope:訊息包內容,可從中獲取訊息id,訊息routing key,交換機,訊息和重灌標記(收到訊息失敗後是否需要重新發送)
properties:訊息屬性
body;訊息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:"+ envelope.getRoutingKey());
//交換機
System.out.println("交換機為:"+ envelope.getExchange());
//訊息id
System.out.println("訊息id為:"+ envelope.getDeliveryTag());
//收到的訊息
System.out.println("接收到的訊息:"+ new String(body,"UTF-8"));
System.out.println("");
System.out.println("======================================================");
System.out.println("");
}
};
channel.basicConsume("queue1", true, consumer);
} catch (Exception e) {
e.printStackTrace();
}finally {
//6. 不關閉資源,一直監聽
}
}
}

AMQP

概念介紹

AMQP 一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。

AMQP是一個二進位制協議,擁有一些現代化特點:多通道協商式非同步安全擴平臺中立高效

RabbitMQ 是 AMQP協議 的 Erlang的實現。

概念 說明
連線 Connection 一個網路連線,例如:TCP/IP套接字連線。
會話 Session 端點之間的命名對話。在一個會話上下文中,保證“恰好傳遞一次”。
通道 Channel 多路複用連線中的一條獨立的雙向資料流通道。為會話提供物理傳輸介質。
客戶端 Client AMQP連線或者會話的發起者。AMQP是非對稱的,客戶端生產和消費訊息,伺服器儲存和路由這些訊息。
服務節點Broker 訊息中介軟體的服務節點。一般情況下可以將一個RabbitMQ Broker看作一臺RabbitMQ 伺服器。
端點 AMQP對話的任意一方。一個AMQP連線包括兩個端點(一個是客戶端,一個是伺服器)。
消費者 Consumer 一個從訊息佇列裡請求訊息的客戶端程式。
生產者 Producer 一個向交換機發布訊息的客戶端應用程式。

RabbitMQ運轉流程

入門案例 為例

生產者傳送訊息

  1. 生產者建立連線(Connection),開啟一個通道(Channel),連線到RabbitMQ Broker;
  2. 宣告佇列、設定屬性;如是否排它,是否持久化,是否自動刪除;
  3. 將路由鍵(空字串)與佇列繫結起來;
  4. 傳送訊息至RabbitMQ Broker;
  5. 關閉通道;
  6. 關閉連線;

消費者接收訊息

  1. 消費者建立連線(Connection),開啟一個通道(Channel),連線到RabbitMQ Broker
  2. 向Broker 請求消費相應佇列中的訊息,設定相應的回撥函式;
  3. 等待Broker迴應閉關投遞響應佇列中的訊息,消費者接收訊息;
  4. 確認(ack,自動確認)接收到的訊息;
  5. RabbitMQ從佇列中刪除相應已經被確認的訊息;
  6. 關閉通道;
  7. 關閉連線;

生產者流轉過程解析

  1. 客戶端與代理伺服器Broker建立連線。呼叫 newConnection() 方法 , 會進一步封裝 Protocol Header 0-9-1 的報文頭髮送給 Broker ,以此通知Broker 本次互動採用的是 AMQP 0-9-1 協議,緊接著 Broker 返回 Connection.Start 來建立連線,在連線的過程中涉及 Connection.Start/.Start-OKConnection.Tune/.Tune-OkConnection.Open/ .Open-Ok 這6 個命令的互動。
  2. 客戶端呼叫 connection.createChannel 方法。此方法開啟通道,其包裝的 channel.open 命令傳送給 Broker , 等待 channel.basicPublish 方法,對應的AMQP命令為 Basic.Publish , 這個命令包含了content Headercontent Body() 。content Header 包含了訊息體的屬性,例如:投遞模式,優先順序等,content Body 包含了訊息體本身。
  3. 客戶端傳送完訊息需要關閉資源時,涉及到Channel.Close和Channl.Close-OkConnetion.Close和Connection.Close-Ok的命令互動。

消費者流轉過程解析

  1. 消費者客戶端與代理伺服器Broker建立連線。會呼叫 newConnection() 方法,這個方法會進一步封裝 Protocol Header 0-9-1 的報文頭髮送給Broker ,以此通知Broker 本次互動採用的是 AMQP 0-9-1 協議,緊接著Broker 返回Connection.Start 來建立連線,在連線的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的互動。
  2. 消費者客戶端呼叫connection.createChannel方法。和生產者客戶端一樣,協議涉及Channel . Open/Open-Ok命令。
  3. 在真正消費之前,消費者客戶端需要向Broker 傳送Basic.Consume 命令(即呼叫channel.basicConsume 方法〉將Channel 置為接收模式,之後Broker 回執 Basic . Consume - Ok 以告訴消費者客戶端準備好消費訊息。
  4. Broker 向消費者客戶端推送(Push) 訊息,即 Basic.Deliver 命令,這個命令和 Basic.Publish 命令一樣會攜帶 Content Header 和Content Body。
  5. 消費者接收到訊息並正確消費之後,向Broker 傳送確認,即 Basic.Ack 命令。
  6. 客戶端傳送完訊息需要關閉資源時,涉及到 Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok 的命令互動。

個人部落格為:

MoYu's HomePage