Java語言快速實現簡單MQ訊息佇列服務
阿新 • • 發佈:2018-12-12
目錄
MQ基礎回顧
在上一篇訊息通訊之關於訊息佇列MQ必須瞭解的相關概念中 , 我們儘可能地詳細的瞭解了一些關於MQ (訊息佇列)
的相關概念,並且我們上一篇中提到一個最基本的MQ通訊模型
如下所示,所以本章節使用JAVA
語言自己動手來寫一個MQ (類似ActiveMQ,RabbitMQ)
主要角色
首先我們必須需要搞明白MQ (訊息佇列)
中的三個基本角色
Producer
: 訊息生產者,負責生產訊息併發送到BrokerBroker
: 訊息處理中心,負責接受訊息,儲存訊息,轉發訊息Consumer
:訊息消費者,負責消費訊息
整體架構如下所示
自定義協議
首先從上一篇中介紹了協議的相關資訊,具體廠商的MQ(訊息佇列)
需要遵循某種協議或者自定義協議 , 訊息的生產者和消費者需要遵循其協議(約定)才能後成功地生產訊息和生產訊息
,所以在這裡我們自定義一個協議如下.
訊息處理中心 : 如果接收到的資訊包含"SEND"字串,即視為生產者傳送的訊息,訊息處理中心需要將此資訊儲存等待消費者消費
訊息處理中心 : 如果接受到的資訊為CONSUME,既視為消費者傳送消費請求,需要將儲存的訊息佇列頭部的資訊轉發給消費者,然後將此訊息從佇列中移除
訊息處理中心 : 如果訊息處理中心儲存的訊息滿3條仍然沒有消費者進行消費,則不再接受生產者的生產請求
訊息生產者:需要遵循協議將生產的訊息頭部增加 "SEND:" 表示生產訊息
訊息消費者:需要遵循協議向訊息處理中心傳送 "CONSUME"字串表示消費訊息
流程順序
專案構建流程
下面將整個MQ的構建流程過一遍
- 新建一個
Broker
類,內部維護一個ArrayBlockingQueue
佇列,提供生產訊息和消費訊息的方法,僅僅具備儲存服務功能
- 新建一個
BrokerServer
類,將Broker
釋出為服務到本地9999埠,監聽本地9999埠的Socket
連結,在接受的資訊中進行我們的協議校驗, 這裡僅僅具備接受訊息,校驗協議,轉發訊息功能;
- 新建一個
MqClient
類,此類提供與本地埠9999的Socket連結 ,僅僅具備生產訊息和消費訊息的方法
- 測試:新建兩個
MyClient
類物件,分別執行其生產方法和消費方法
具體使用流程
- 生產訊息:客戶端執行生產訊息方法,傳入需要生產的資訊,該資訊需要遵循我們自定義的協議,訊息處理中心服務在接受到訊息會根據自定義的協議校驗該訊息是否合法,如果合法如果合法就會將該訊息儲存到Broker內部維護的
ArrayBlockingQueue
佇列中.如果ArrayBlockingQueue
佇列沒有達到我們協議中的最大長度將將訊息新增到佇列中,否則輸出生產訊息失敗. - 訊息訊息:客戶端執行消費訊息方法,
Broker服務
會校驗請求的資訊的資訊是否等於CONSUME
,如果驗證成功則從Broker內部維護的ArrayBlockingQueue
佇列的Poll
出一個訊息返回給客戶端
程式碼演示
訊息處理中心 Broker
/**
* 訊息處理中心
*/
public class Broker {
// 佇列儲存訊息的最大數量
private final static int MAX_SIZE = 3;
// 儲存訊息資料的容器
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(MAX_SIZE);
// 生產訊息
public static void produce(String msg) {
if (messageQueue.offer(msg)) {
System.out.println("成功向訊息處理中心投遞訊息:" + msg + ",當前暫存的訊息數量是:" + messageQueue.size());
} else {
System.out.println("訊息處理中心內暫存的訊息達到最大負荷,不能繼續放入訊息!");
}
System.out.println("=======================");
}
// 消費訊息
public static String consume() {
String msg = messageQueue.poll();
if (msg != null) {
// 消費條件滿足情況,從訊息容器中取出一條訊息
System.out.println("已經消費訊息:" + msg + ",當前暫存的訊息數量是:" + messageQueue.size());
} else {
System.out.println("訊息處理中心內沒有訊息可供消費!");
}
System.out.println("=======================");
return msg;
}
}
訊息處理中心服務 BrokerServer
/**
* 用於啟動訊息處理中心
*/
public class BrokerServer implements Runnable {
public static int SERVICE_PORT = 9999;
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
)
{
while (true) {
String str = in.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始資料:" + str);
if (str.equals("CONSUME")) { //CONSUME 表示要消費一條訊息
//從訊息佇列中消費一條訊息
String message = Broker.consume();
out.println(message);
out.flush();
} else if (str.contains("SEND:")){
//接受到的請求包含SEND:字串 表示生產訊息放到訊息佇列中
Broker.produce(str);
}else {
System.out.println("原始資料:"+str+"沒有遵循協議,不提供相關服務");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true) {
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
客戶端 MqClient
/**
* 訪問訊息佇列的客戶端
*/
public class MqClient {
//生產訊息
public static void produce(String message) throws Exception {
//本地的的BrokerServer.SERVICE_PORT 建立SOCKET
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
out.println(message);
out.flush();
}
}
//消費訊息
public static String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);
try (
BufferedReader in = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
) {
//先向訊息佇列傳送命令
out.println("CONSUME");
out.flush();
//再從訊息佇列獲取一條訊息
String message = in.readLine();
return message;
}
}
}
測試MQ
public class ProduceClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
client.produce("SEND:Hello World");
}
}
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
String message = client.consume();
System.out.println("獲取的訊息為:" + message);
}
}
我們多執行幾次客戶端的生產方法和消費方法就可以看到一個完整的MQ的通訊過程,下面是我執行了幾次的一些日誌
接收到原始資料:SEND:Hello World
成功向訊息處理中心投遞訊息:SEND:Hello World,當前暫存的訊息數量是:1
=======================
接收到原始資料:SEND:Hello World
成功向訊息處理中心投遞訊息:SEND:Hello World,當前暫存的訊息數量是:2
=======================
接收到原始資料:SEND:Hello World
成功向訊息處理中心投遞訊息:SEND:Hello World,當前暫存的訊息數量是:3
=======================
接收到原始資料:SEND:Hello World
訊息處理中心內暫存的訊息達到最大負荷,不能繼續放入訊息!
=======================
接收到原始資料:Hello World
原始資料:Hello World沒有遵循協議,不提供相關服務
接收到原始資料:CONSUME
已經消費訊息:SEND:Hello World,當前暫存的訊息數量是:2
=======================
接收到原始資料:CONSUME
已經消費訊息:SEND:Hello World,當前暫存的訊息數量是:1
=======================
接收到原始資料:CONSUME
已經消費訊息:SEND:Hello World,當前暫存的訊息數量是:0
=======================
接收到原始資料:CONSUME
訊息處理中心內沒有訊息可供消費!
=======================
小結
本章示例程式碼主要源自分散式訊息中介軟體實踐一書 , 這裡我們自己使用Java語言寫了一個MQ訊息佇列 , 通過這個訊息佇列我們對MQ中的幾個角色 "生產者,消費者,消費處理中心,協議"
有了更深的理解 ; 那麼下一章節我們就來一塊學習具體廠商的MQ RabbitMQ