1. 程式人生 > >設計一個簡單的訊息佇列

設計一個簡單的訊息佇列

由於分散式系統的廣泛應用,越來越多地涉及到系統間通訊。系統間通訊一般有兩種方式,一種是基於遠端過程呼叫的方式,另一種是基於訊息佇列的方式。基於訊息佇列的方式是指由應用中的某個系統負責傳送訊息,由關心這條訊息的系統負責接收訊息,並在接收到訊息後進行各自的業務處理。
目前主流的訊息中介軟體有RabbitMQ、RocketMQ、ActiveMQ、Kafka等

一、訊息佇列的作用

(1)解耦

訊息佇列的各種實現產品又叫,既然是中介軟體,就是用訊息佇列實現兩個模組的遠端呼叫,模組只關心自己的核心流程,而不依賴呼叫的執行結果。

(2)流量削峰

利用訊息佇列可以將短時間高併發請求持久化,然後逐步處理,從而削平高峰期的併發流量,改善系統性能

(3)日誌收集

利用訊息佇列產品在接收和持久化訊息方面的高效能,引入訊息佇列快速收集日誌資訊,避免為寫入日誌時的某些故障導致業務系統訪問阻塞、請求延遲等。

(4)事務最終一致性

二、訊息佇列的功能特點

訊息佇列這個屬於包含訊息和佇列兩個關鍵詞,訊息是指應用間傳遞的資料,可以使簡單的字串,也可以是複雜的結構化物件定義格式;佇列指訊息的進和出,它包含一個容器,至少需實現訊息的傳送、接收和暫存功能。在生產環境中,訊息佇列還需解決諸如訊息堆積、訊息持久化、可靠投遞、訊息重複、嚴格有序、叢集等各種問題。
訊息佇列的簡單模型:
在這裡插入圖片描述
Broker:訊息處理中心,負責訊息的接收、儲存、轉發
Producer:訊息生產者,負責產生和傳送訊息到訊息處理中心
Consumer:訊息消費者,負責從訊息中心獲取訊息,並進行相應的處理

三、用java實現一個簡單的訊息佇列

結構圖:
在這裡插入圖片描述

Broker類:

package com.youzi.MQ;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 訊息處理中心
 */
public class Broker {
    //設定儲存訊息的最大數量
    private final static int MAX_SIZE = 5;
    //儲存訊息的容器
    private static ArrayBlockingQueue<String> MassageQueue = new ArrayBlockingQueue
<String>(MAX_SIZE); //生產訊息 public static void produce(String msg){ if (MassageQueue.offer(msg)){ System.out.println("成功向訊息中心投遞訊息:"+msg+",當前暫存訊息數目為"+MassageQueue.size()); }else{ System.out.println("訊息中心已滿,不能繼續放入訊息!"); } System.out.println("=================================="); } //消費訊息 public static String consume(){ String msg = MassageQueue.poll(); if(msg!=null){ System.out.println("已經消費訊息:"+msg+",當前暫存訊息數目為"+MassageQueue.size()); }else{ System.out.println("訊息處理中心已經沒有訊息可供消費!"); } System.out.println("=================================="); return msg; } }

用BrokerServer類對外提供Broker類的服務:

package com.youzi.MQ;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 訊息佇列服務
 */
public class BrokerServer implements Runnable {
    public static int SERVICE_PORT = 9999;
    private final Socket socket ;

    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream());
            ){
            while (true){
                String str = in.readLine();
                if (str==null){
                    continue;
                }
                System.out.println("接收到的原始資料為:"+str);
                if (str.equals("CONSUME")){//CONSUME表示要消費一條訊息
                    String msg = Broker.consume();
                    out.println(msg);
                    out.flush();
                }else{//其他情況都表示要生產訊息到訊息佇列中
                    Broker.produce(str);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(BrokerServer.SERVICE_PORT);
        while(true){
            BrokerServer bs = new BrokerServer(server.accept());
            new Thread(bs).start();
        }
    }
}

客戶端訪問:

package com.youzi.MQ;

import org.omg.CORBA.portable.UnknownException;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;

public class MQClient {

    //生產訊息
    public static void produce(String msg) throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
        try (
            PrintWriter out = new PrintWriter(socket.getOutputStream());
            ){
            out.println(msg);
            out.flush();
        }
    }

    //消費訊息
    public static String consume() throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
        try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream())){
            //先向訊息佇列傳送CONSUME表示消費訊息
            out.println("CONSUME");
            out.flush();
            //再從佇列獲取一條訊息
            String message = in.readLine();
            return message;
        }
    }
}

生產者客戶端測試類:

package com.youzi.MQ;

public class ProduceClient {
    public static void main(String[] args) throws Exception {
        MQClient client = new MQClient();
        client.produce("Hello World4!!");
    }
}

消費者客戶端測試類:

package com.youzi.MQ;

public class ConsumeClient {
    public static void main(String[] args) throws Exception {
        MQClient client = new MQClient();
        String message = client.consume();
        System.out.println("獲取的訊息為:"+message);
    }
}

先啟動服務端BrokerServer,因為訊息處理中心最大容量設定了5,這裡生產者啟動6次,服務端輸出:
在這裡插入圖片描述
然後啟動6次訊息消費端,服務端輸出:

在這裡插入圖片描述