1. 程式人生 > >ActiveMQ·基礎篇(一)

ActiveMQ·基礎篇(一)

        JMS 叫做 Java 訊息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在通過提供標準的產生、傳送、接收和處理訊息的 API 簡化企業應用的開發,類似於 JDBC 和關係型資料庫通訊方式的抽象。

        ActiveMQ 是一個 MOM,具體來說是一個實現了 JMS 規範的系統間遠端通訊的訊息代理。

        MOM 就是面向訊息中介軟體(Message-oriented middleware),是用於以分散式應用或系統中的非同步、鬆耦合、可靠、可擴充套件和安全通訊的一類軟體。MOM 的總體思想是它作為訊息傳送器和訊息接收器之間的訊息中介,這種中介提供了一個全新水平的鬆耦合。

        即相當於人與人通訊之間的郵局,人們信件通訊的時候不需要以兩人直接進行通訊 ,而是通過一個第三方中介寄存信件,一個取,一個存。

        以上圖片顯示的是整個JMS的元件

  • Provider:純 Java 語言編寫的 JMS 介面實現(比如 ActiveMQ 就是)
  • Domains:訊息傳遞方式,包括點對點(P2P)、釋出/訂閱(Pub/Sub)兩種
  • Connection factory:客戶端使用連線工廠來建立與 JMS provider 的連線
  • Destination:訊息被定址、傳送以及接收的物件

ActiveMq的兩種主要通訊方式

    1.P2P(點對點的方式)

        點對點:訊息域使用 queue 作為 Destination,訊息可以被同步或非同步的傳送和接收,每個訊息只會給一個 Consumer 傳送一次。

         Consumer 可以使用 MessageConsumer.receive() 同步地接收訊息,也可以通過註冊一個 MessageListener 實現非同步接收。多個 Consumer 可以註冊到同一個 queue 上,但一個訊息只能被一個 Consumer 所接收,然後由該 Consumer 來確認訊息。並且在這種情況下,Provider 對所有註冊的 Consumer 以輪詢的方式傳送訊息。

以下即為點對點的方式流程圖

    1.sub/pub(釋出/訂閱,Publish/Subscribe)

    訊息域使用 topic 作為 Destination,釋出者向 topic 傳送訊息,訂閱者註冊接收來自 topic 的訊息。傳送到 topic 的任何訊息都將自動傳遞給所有訂閱者。接收方式(同步和非同步)與 P2P 域相同。除非顯式指定,否則 topic 不會為訂閱者保留訊息。當然,這可以通過持久化(Durable)訂閱來實現訊息的儲存。這種情況下,當訂閱者與 Provider 斷開時,Provider 會為它儲存訊息。當持久化訂閱者重新連線時,將會受到所有的斷連期間未消費的訊息。如圖,每個釋出者釋出的資訊,每個訂閱者都會收到,而不是和點對點一樣一條資訊只能由一個接收者接受。

ActiveMq建立應用程式的流程

  • 獲取連線工廠
  • 使用連線工廠建立連線
  • 啟動連線
  • 從連線建立會話
  • 獲取 Destination
  • 建立 Producer,或
    • 建立 Producer
    • 建立 message
  • 建立 Consumer,或傳送或接收message傳送或接收 message
    • 建立 Consumer
    • 註冊訊息監聽器(可選)
  • 傳送或接收 message
  • 關閉資源(connection, session, producer, consumer 等

以下是一個簡單的activeMq傳送P2P的簡單流程樣式。


import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.*;

/**
 * @author pontus
 * @date create in 11:25 2018/11/2
 * @description
 */
@RestController
@Configuration
public class JmsTemplateConfiguration {

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    ConnectionFactory connectionFactory;
    Connection connection;
    Session session;
    Destination destination;
    MessageProducer producer;
    MessageConsumer consumer;
    Message message;
    boolean useTransaction = false;

    @RequestMapping("ds")
    public void ss(){
        try {
            //使用ActiveMQ時:
            connectionFactory = new ActiveMQConnectionFactory(usrName, password, brokerUrl);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE");
            //生產者傳送訊息
            producer = session.createProducer(destination);


            message = session.createTextMessage("this is a test");
            producer.send(message);

            //消費者同步接收
            consumer = session.createConsumer(destination);
            message = (TextMessage) consumer.receive(1000);
            System.out.println("Received message: " + message);
            //消費者非同步接收
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message != null) {
                        System.out.println(message);
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                producer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}