1. 程式人生 > >Java 訊息中介軟體

Java 訊息中介軟體

Java 訊息中介軟體

文章目錄


訊息中介軟體:關注於資料的傳送與接收,利用高效可靠的非同步訊息傳遞機制整合分散式系統。

訊息中介軟體

常見訊息中介軟體

1. ActiveMQ

ActiveMQ 是 Apache 出品,最流行,能力強勁的開源訊息匯流排。

  • 完全支援 JMS 1.1 和 J2EE 1.4規範(持久化,XA訊息,事務)
  • 支援多種語言和協議編寫客戶端
  • 虛擬主題、組合目的、映象佇列

下載解壓後,執行 /bin/win64/ 路徑下 的 activemq.bat 批處理檔案,並開啟 http://localhost:8161 檢視是否安裝成功。

activeMQ

2. RabbitMQ

RabbitMQ 是一個開源的 AMQP 實現,伺服器端用 Erlang 語言編寫。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

  • 支援多種客戶端
  • AMQP 的完整實現(vhost、Exchange、Binding、Routing Key 等)
  • 事務支援/釋出確認
  • 訊息持久化

3. Kafka

Kafka 是一種高吞吐量的分散式釋出訂閱訊息系統,是一個分散式的、分割槽的、可靠的分散式日誌儲存服務。

  • 通過O(1)的磁碟資料結構提供訊息的持久化,對以TB的訊息儲存也能夠保持長時間的穩定效能
  • 高吞吐量:即使是非常普通的硬體,Kafka 也可以支援每秒數百萬的訊息
  • Partition、Consumer Group

4. 綜合比較

Propertity ActiveMQ RabbitMQ Kafka
跨語言 支援(Java 優先) 語言無關 支援(Java 優先)
支援協議 OpenWire,Stomp,XMPP,AMQP AMQP
優點 遵循 JMS 規範;安裝部署方便 繼承 Erlang 天生的併發性,穩定性,安全性有保障
缺點 訊息丟失;社群不活躍 Erlang 語言難度較大;不支援動態擴充套件 嚴格的順序機制,不支援訊息優先順序;不支援標準的訊息協議,不利於平臺遷移
綜合評價 適合中小企業級訊息應用場景,不適合上千個佇列的應用場景 適合對穩定性要求高的企業級應用 一般應用在大資料日誌處理或對實時性、可靠性要求稍低的場景

規範與協議

1. JMS 規範

Java 訊息服務(Java Message Service)即 JMS,是一個 Java 平臺關於面向訊息中介軟體的 API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。

  • 提供者:實現 JMS規範的訊息中介軟體伺服器
  • 客戶端:傳送或接收訊息的應用程式
  • 生產者/釋出者:建立併發送訊息的客戶端
  • 消費者/訂閱者:接收並處理訊息的客戶端
  • 訊息:應用層序之間傳遞的資料內容
  • 訊息模式:傳遞訊息的方式,JMS 中定義了佇列和主題兩種模式

1.1 訊息模式

1.1.1 佇列模型
  • 客戶端包括生產者和消費者
  • 佇列中訊息只能被一個消費者消費
  • 消費者可以隨時消費佇列中的訊息

佇列模式
程式碼

  • 生產者
package com.chen.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 佇列模式:生產者
 *
 * @Author LeifChen
 * @Date 2018-11-16
 */
public class QueueProducer {
    private static final String URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue-test";
    private static final int COUNT = 100;

    public static void main(String[] args) throws JMSException {
        // 1.建立 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立 Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連線
        connection.start();

        // 4.建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個釋出者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < COUNT; i++) {
            // 7.建立訊息
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.釋出訊息
            producer.send(textMessage);

            System.out.println("傳送訊息:" + textMessage.getText());
        }

        // 9.關閉連線
        connection.close();
    }
}
  • 消費者
package com.chen.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 佇列模式:消費者
 *
 * @Author LeifChen
 * @Date 2018-11-16
 */
public class QueueConsumer {
    private static final String URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立 Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連線
        connection.start();

        // 4.建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收訊息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}
1.1.2 主題模型
  • 客戶端包括髮布者和訂閱者
  • 主題中的訊息被所有訂閱者消費
  • 消費者不能消費訂閱之前就傳送到主題中的訊息

主題模式
程式碼

  • 釋出者
package com.chen.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 主題模式:釋出者
 *
 * @Author LeifChen
 * @Date 2018-11-16
 */
public class TopicProducer {
    private static final String URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "topic-test";
    private static final int COUNT = 100;

    public static void main(String[] args) throws JMSException {
        // 1.建立 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立 Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連線
        connection.start();

        // 4.建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.建立一個釋出者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < COUNT; i++) {
            // 7.建立訊息
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.釋出訊息
            producer.send(textMessage);

            System.out.println("傳送訊息:" + textMessage.getText());
        }

        // 9.關閉連線
        connection.close();
    }
}
  • 訂閱者
package com.chen.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 主題模式:訂閱者
 *
 * @Author LeifChen
 * @Date 2018-11-16
 */
public class TopicConsumer {
    private static final String URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立 Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連線
        connection.start();

        // 4.建立會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收訊息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

1.2 JMS 編碼介面

  • ConnectionFactory:用於建立連線到訊息中介軟體的連線工廠
  • Connection:代表了應用程式和訊息伺服器之間的通訊鏈路
  • MessageConsumer:由會話建立,用於接收發送到目標的訊息
  • MessageProducer:由會話建立,用於傳送訊息到目標
  • Meeage:是在消費者和生產者之間傳送的物件,訊息頭,一組訊息屬性,一個訊息體
  • Destination:指訊息釋出和接收的地點,包括佇列或主題

JMS編碼介面

2. AMQP 協議

AMQP(advanced message queuing protocol)是一個提供訊息服務的應用層標準協議,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品、不同開發語言等條件的限制。

3. JMS 與 AMQP 比較

Propertity JMS 規範 AMQP 協議
定義 Java API Wire-protocol
跨語言
訊息模型 提供兩種訊息模型:
p2p
pub/sub
提供五種訊息模型:
direct
fanout
topic
headers
system
訊息型別 TextMessage
MapMessage
BytesMessage
StreamMessage
ObjectMessage
Message
byte[]
綜合評價 JMS 定義了 Java API 層面的標準 AMQP 是面向詳細、佇列、路由(包括點對點的釋出/訂閱)、可靠性、安全

參考

  1. GitHub
  2. Java訊息中介軟體