1. 程式人生 > >初識ActiveMQ訊息中介軟體

初識ActiveMQ訊息中介軟體

ActiveMQ官方網站:https://activemq.apache.org/

關於ActiveMQ訊息傳遞的方式詳見:https://segmentfault.com/a/1190000014958916

本篇部落格旨在解決的問題:

1.如何在普通Java環境中使用ActiveMQ

2.ActiveMQ如何與Spring的整合

3.在SpringBoot中如何使用ActiveMQ

環境:

1. windows 10 64bit

2. apache-activemq-5.14.4

3. jdk 1.8

4. maven 3.3

前置條件:

1.安裝啟動ActiveMQ:

在官方網站(https://activemq.apache.org/components/classic/download/

)上下載ActiveMQ

解壓後,進入到目錄bin中,根據自己作業系統的位數進入到win64或者win32目錄下,然後點選activemq.bat啟動ActiveMQ。

啟動後在瀏覽器輸入http://localhost:8161/,看到以下畫面表示啟動成功:

點選“Manage ActiveMQ broker”進入到ActiveMQ的後臺管理介面,若要求輸入使用者名稱密碼則初始使用者名稱密碼為admin,admin,如下:

2.本部落格使用Maven構建專案,引入以下依賴:

      <!--Activemq訊息中介軟體 start-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.4</version>
        </dependency>

問題1-如何在普通Java環境中使用ActiveMQ:

採用PTP方式傳遞訊息:

訊息生產者:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;
import java.util.Date;

/**
 * PTP方式傳遞訊息
 */
public class ActiveMQProducer {

    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createQueue("test-queue");
        // 獲取訊息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設定不持久化,此處學習,實際根據專案決定
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 構造訊息
        for (int i = 1; i <= 4; i++) {
            Student student = new Student();
            student.setId((long) i);
            student.setName("學生" + i);
            student.setBirthday(new Date());
            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
            // 傳送訊息到目的地方
            producer.send(message);
            System.out.println(String.format("傳送訊息:%d-%s-%s", student.getId(), student.getName(),
                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
        }
        connection.close();
    }
}

訊息消費者1:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * PTP方式接收訊息
 */
public class ActiveMQConsumer1 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createQueue("test-queue");
        // 消費者,訊息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer1-接受訊息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

訊息消費者2:

package at.flying.activemq.ptp;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * PTP方式接收訊息
 */
public class ActiveMQConsumer2 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createQueue("test-queue");
        // 消費者,訊息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer2-接受訊息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

先啟動兩個訊息消費者,再啟動訊息生產者,控制檯輸出資訊如下:

訊息生產者:

訊息消費者1:

訊息消費者2:

這個結果使我們很容易理解PTP的訊息傳遞方式。

採用Pub/Sub方式傳遞訊息:

訊息生產者:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;
import java.util.Date;

/**
 * Pub/Sub方式傳遞訊息
 */
public class ActiveMQProducer {

    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createTopic("test-topic");
        // 獲取訊息生產者
        MessageProducer producer = session.createProducer(destination);
        // 設定不持久化,此處學習,實際根據專案決定
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 構造訊息
        for (int i = 1; i <= 4; i++) {
            Student student = new Student();
            student.setId((long) i);
            student.setName("學生" + i);
            student.setBirthday(new Date());
            TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
            // 傳送訊息到目的地方
            producer.send(message);
            System.out.println(String.format("傳送訊息:%d-%s-%s", student.getId(), student.getName(),
                    DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
        }
        connection.close();
    }
}

訊息消費者1:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * Pub/Sub方式接收訊息
 */
public class ActiveMQConsumer1 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createTopic("test-topic");
        // 消費者,訊息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer1-接受訊息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

訊息消費者2:

package at.flying.activemq.pubsub;

import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;

import javax.jms.*;

/**
 * Pub/Sub方式接收訊息
 */
public class ActiveMQConsumer2 {
    public static void main(String[] args) throws Exception {
        // 構造ConnectionFactory例項物件
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 從工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 啟動
        connection.start();
        // 獲取操作連線,一個傳送或接收訊息的執行緒
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 獲取訊息目的地,訊息傳送給誰
        Destination destination = session.createTopic("test-topic");
        // 消費者,訊息接收者
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    if (null != message) {
                        Student student = CommonUtils.deserialize(((TextMessage) message).getText());
                        System.out.println(
                                String.format("ActiveMQConsumer2-接受訊息:%d-%s-%s", student.getId(), student.getName(),
                                        DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
                    }
                } catch (JMSException e) {
                }
            }
        });
        System.in.read();
        connection.close();
    }
}

先啟動兩個訊息消費者,再啟動訊息生產者,控制檯輸出資訊如下:

訊息生產者:

訊息消費者1:

訊息消費者2:

這個結果使我們很容易理解Pub/Sub的訊息傳遞方式。

總結:

從以上程式碼可以看出PTP與Pub/Sub方式的訊息傳遞,只是在建立訊息目的地的時候不一樣:

PTP方式建立的訊息目的地是Queue(佇列),Pub/Sub方式建立的訊息目的地是Topic(主題)。

問題2-ActiveMQ如何與Spring的整合:

待寫...

問題3-在SpringBoot中如何使用ActiveMQ:

待寫...