1. 程式人生 > >訊息佇列中介軟體(二)使用 ActiveMQ

訊息佇列中介軟體(二)使用 ActiveMQ

ActiveMQ 介紹

Active MQ 是由 Apache 出品的一款流行的功能強大的開源訊息中介軟體,它速度快,支援跨語言的客戶端,具有易於使用的企業整合模式和許多的高階功能,同時完全支援 JSM1.1 和 J2EE1.4 。

ActiveMQ 特點

  • 支援Java,C,C ++,C#,Ruby,Perl,Python,PHP等各種跨語言客戶端和協議,如 OpenWire , Stomp , AMQP , MQTT.
  • 完全支援JMS 1.1和 J2EE 1.4,支援瞬態,持久,事務和XA訊息傳遞。
  • 對 Spring 框架的支援以便ActiveMQ可以輕鬆嵌入到Spring應用程式中。
  • 通過了常見的 J2EE 伺服器測試,如 TomEE,Geronimo,JBoss,GlassFish 和 WebLogic 。
  • 連線方式的多樣化,ActiveMQ 提供了多種連線模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。
  • 可以通過使用 JDBC 和 journal 實現訊息的快速持久化。
  • 專為高效能群集,客戶端 - 伺服器,點對點通訊而設計。
  • 提供與語言無關的 REST API。
  • 支援 Ajax 方式呼叫 ActiveMQ。
  • ActiveMQ 可以輕鬆地與 CXF、Axis 等 Web Service 技術整合,以提供可靠的訊息傳遞。
  • 可用作為記憶體中的 JMS 提供者,非常適合 JMS 單元測試。

ActiveMQ 訊息

  1. 點對點佇列模式
    訊息到達訊息系統,被保留在訊息佇列中,然後由一個或者多個消費者消費佇列中的訊息,一個訊息只能被一個消費者消費,然後就會被移除。例如訂單處理系統。
  2. 釋出-訂閱模式
    訊息傳送時指定主題(或者說通道),訊息被保留在指定的主題中,消費者可以訂閱多個主題,並使用主題中的所有的訊息,例如現實中的電視與電視訊道。所有客戶端包括髮布者和訂閱者,主題中的訊息可以被所有的訂閱者消費,消費者只能消費訂閱之後傳送到主題中的訊息。

ActiveMQ 概念

  • Broker,訊息代理,表示訊息佇列伺服器實體,接受客戶端連線,提供訊息通訊的核心服務。
  • Producer,訊息生產者,業務的發起方,負責生產訊息並傳輸給 Broker 。
  • Consumer,訊息消費者,業務的處理方,負責從 Broker 獲取訊息並進行業務邏輯處理。
  • Topic,主題,釋出訂閱模式下的訊息統一彙集地,不同生產者向 Topic 傳送訊息,由 Broker 分發到不同的訂閱者,實現訊息的廣播。
  • Queue,佇列,點對點模式下特定生產者向特定佇列傳送訊息,消費者訂閱特定佇列接收訊息並進行業務邏輯處理。
  • Message,訊息體,根據不同通訊協議定義的固定格式進行編碼的資料包,來封裝業務 資料,實現訊息的傳輸。

ActiveMQ 工程例項

下面是使用 ActiveMQ 的佇列模式和釋出-訂閱模式的 Java 程式碼示例。

POM 依賴

        <!-- Active-MQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>

佇列模式消費者

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

/**
 * <p>
 * 訊息消費者,用於消費訊息
 *
 * @Author niujinpeng
 * @Date 2018/9/4 23:45
 */
public class AppConsumer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動連線
        connection.start();

        // 4.建立會話,false,不使用事務,自動應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.建立一個目標
        Destination destination = session.createQueue(queueName);
        // 6.建立消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收訊息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 8.關閉連線
        //connection.close();
    }
}

佇列模式生產者

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

/**
 * <p>
 * 訊息提供者,用於向訊息中介軟體傳送訊息
 *
 * @Author niujinpeng
 * @Date 2018/9/4 23:28
 */
public class AppProducer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動連線
        connection.start();

        // 4.建立會話,false,不使用事務,自動應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.建立一個目標
        Destination destination = session.createQueue(queueName);
        // 6.建立生產者
        MessageProducer producer = session.createProducer(destination);

        // 7.建立訊息併發送
        for (int i = 0; i < 10; i++) {
            // 建立訊息
            TextMessage textMessage = session.createTextMessage("textMessage" + i);
            // 釋出訊息
            producer.send(textMessage);
            System.out.println("傳送訊息:" + textMessage.getText());
        }

        // 8.關閉連線
        connection.close();

    }
}

釋出訂閱模式生產者

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

/**
 * <p>
 * 主題模式
 * 訊息消費者,用於消費訊息
 *
 * @Author niujinpeng
 * @Date 2018/9/4 23:45
 */
public class AppConsumer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動連線
        connection.start();

        // 4.建立會話,false,不使用事務,自動應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.建立一個目標
        Destination destination = session.createTopic(topicName);
        // 6.建立消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收訊息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 8.關閉連線
        //connection.close();
    }
}

釋出訂閱模式生產者

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

/**
 * <p>
 * 主題模式
 * 訊息提供者,用於向訊息中介軟體傳送訊息
 *
 * @Author niujinpeng
 * @Date 2018/9/4 23:28
 */
public class AppProducer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.建立ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動連線
        connection.start();

        // 4.建立會話,false,不使用事務,自動應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.建立一個目標
        Destination destination = session.createTopic(topicName);
        // 6.建立生產者
        MessageProducer producer = session.createProducer(destination);

        // 7.建立訊息併發送
        for (int i = 0; i < 10; i++) {
            // 建立訊息
            TextMessage textMessage = session.createTextMessage("textMessage" + i);
            // 釋出訊息
            producer.send(textMessage);
            System.out.println("傳送訊息:" + textMessage.getText());
        }

        // 8.關閉連線
        connection.close();

    }
}

GitHub原始碼:https://github.com/niumoo/message-queue

Spring 整合 ActiveMQ

在 Spring 中配置 Active MQ 就像Spring 整合其他功能一樣,我們需要在 XML 配置中配置幾個關鍵的例項即可。在 Active MQ 中有幾個物件的例項是至關重要的,如 Active MQ jms 連線工廠,為了減少連線斷開效能時間消耗的 jms 連線池以及生產者消費者等。

下面是一些詳細說明。

  • ConnectionFactory 用於管理連線的連線工廠(Spring提供)。
    • 一個 Spring 為我們提供的連線池。
    • JmsTemplate 每次傳送都會重新建立連線,會話和 Productor。
    • Spring 中提供了SingleConnectionFactory 和CachingConnectionFactory(增加了快取功能)。
  • JmsTemplate 是用於傳送和接收訊息的模板類。
    • 是spring提供的,只需要向Spring 容器內註冊這個類就可以使用 JmsTemplate 方便的操作jms。
    • JmsTemplate 類是執行緒安全的,可以在整個應用範圍使用。
  • MessageListerner 訊息監聽器
    • 使用一個onMessage方法,該方法只接收一個Message引數。

POM 依賴

<properties>
        <spring.version>5.0.4.RELEASE</spring.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.1.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!-- 這個版本的Spring需要使用JMS 2.0版本,但spring-jms的依賴沒有自動匯入JMS 2.0,而activemq-core會匯入JMS 1.1的依賴 -->
        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

XML 配置

XML 公共配置

為了份檔案配置方便管理,下面是提取出來的公共配置,為了在獨立配置生產者和消費者 XML檔案時引入,當然也可以直接把生產者和消費者以及所有的 XML bean 配置在一個檔案裡。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:content="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <content:annotation-config/>

    <!-- ActiveMQ為我們提供的connection factory -->
    <bean id="targerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>

    <!-- spring jms為我們提供的連線池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targerConnectionFactory"/>
    </bean>

    <!-- 一個佇列模式目的地(佇列名稱),點對點的訊息模式 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue-spring"/>
    </bean>

    <!-- 一個主題模式目的地(主題名稱),釋出訂閱訊息模式 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic-spring"/>
    </bean>

</beans>

XML 消費者

消費者主要是一個訊息監聽器,監聽指定的佇列或者主題的訊息資訊,來有訊息時呼叫回撥監聽處理方法。這裡我註釋掉了監聽的佇列模式,指定了主題模式。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:content="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 匯入公共配置 -->
    <import resource="common.xml"/>

    <!-- 配置訊息監聽器 -->
    <bean id="consumerMessageListener" class="net.codingme.jms.consumer.ConsumerMessageListener"/>

    <!-- 配置訊息監聽容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 佇列模式 -->
        <!--<property name="destination" ref="queueDestination"/>-->
        <!-- 主題模式 -->
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

</beans>

XML 生產者

生成者的配置主要是使用 spring jms 模版物件,建立生產者例項用於生產訊息。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:content="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 匯入公共配置 -->
    <import resource="common.xml"/>

    <!-- jms模版-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <bean class="net.codingme.jms.producer.ProducerServiceImpl"></bean>

</beans>

生產者編寫

1. 定義介面

package net.codingme.jms.producer;

/**
 * <p>
 *
 * @Author niujinpeng
 * @Date 2018/11/2518:19
 */
public interface ProducerService {
    public void sendMessage(String message);
}

2. 主題模式生產者

package net.codingme.jms.producer;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.annotation.Resource;
import javax.jms.*;


/**
 * <p>
 *
 * @Author niujinpeng
 * @Date 2018/11/25 19:24
 */
public class ProducerServiceImpl implements ProducerService {

    @Autowired
    JmsTemplate jmsTemplate;
    /**
     * 主題模式
     */
    @Resource(name = "topicDestination")
    Destination destination;

    @Override
    public void sendMessage(String message) {
        // 使用jmsTemplate傳送訊息
        jmsTemplate.send(destination, new MessageCreator() {
            // 建立訊息
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("傳送訊息:" + message);

    }
}

3. Spring 啟動 生產者

package net.codingme.jms.producer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * <p>
 * 啟動器
 *
 * @Author niujinpeng
 * @Date 2018/11/25 21:48
 */
public class AppProducer {

    public static void main(String[] args) {
        // 裝載配置檔案
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:producer.xml");
        ProducerService service = context.getBean(ProducerService.class);

        for (int i = 0; i < 10; i++) {
            service.sendMessage("test" + i);
        }
        context.close();
    }

}

消費者編寫

Spring啟動和生產者類似。下面是消費者監聽器的實現。

package net.codingme.jms.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * <p>
 * 訊息監聽器
 *
 * @Author niujinpeng
 * @Date 2018/11/25 22:28
 */
public class ConsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接收訊息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

執行測試

首先主題模式下啟動兩個消費者,使用生產者推送10條訊息。

測試

在每個消費者下面都可以看到推送的完整訊息。

測試

文中程式碼已經上傳到GitHub:https://github.com/niumoo/message-queue

<完>
本文原發於個人部落格:https://www.codingme.net 轉載請註明出處