初識ActiveMQ訊息中介軟體
ActiveMQ官方網站:https://activemq.apache.org/
關於ActiveMQ訊息傳遞的方式詳見:https://segmentfault.com/a/1190000014958916
本篇部落格旨在解決的問題:
1.如何在普通Java環境中使用ActiveMQ
2.ActiveMQ如何與Spring的整合
3.在SpringBoot中如何使用ActiveMQ
環境:
1. windows 10 64bit
2. apache-activemq-5.14.4
3. jdk 1.8
4. maven 3.3
前置條件:
1.安裝啟動ActiveMQ:
在官方網站(https://activemq.apache.org/components/classic/download/
解壓後,進入到目錄bin中,根據自己作業系統的位數進入到win64或者win32目錄下,然後點選activemq.bat啟動ActiveMQ。
啟動後在瀏覽器輸入http://localhost:8161/,看到以下畫面表示啟動成功:
點選“Manage ActiveMQ broker”進入到ActiveMQ的後臺管理介面,若要求輸入使用者名稱密碼則初始使用者名稱密碼為admin,admin,如下:
2.本部落格使用Maven構建專案,引入以下依賴:
<!--Activemq訊息中介軟體 start--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.4</version> </dependency>
問題1-如何在普通Java環境中使用ActiveMQ:
採用PTP方式傳遞訊息:
訊息生產者:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; import java.util.Date; /** * PTP方式傳遞訊息 */ public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 構造ConnectionFactory例項物件 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 從工廠獲取連線物件 Connection connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連線,一個傳送或接收訊息的執行緒 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取訊息目的地,訊息傳送給誰 Destination destination = session.createQueue("test-queue"); // 獲取訊息生產者 MessageProducer producer = session.createProducer(destination); // 設定不持久化,此處學習,實際根據專案決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 構造訊息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("學生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 傳送訊息到目的地方 producer.send(message); System.out.println(String.format("傳送訊息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); } }
訊息消費者1:
package at.flying.activemq.ptp;
import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;
import javax.jms.*;
/**
* PTP方式接收訊息
*/
public class ActiveMQConsumer1 {
public static void main(String[] args) throws Exception {
// 構造ConnectionFactory例項物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 從工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線,一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取訊息目的地,訊息傳送給誰
Destination destination = session.createQueue("test-queue");
// 消費者,訊息接收者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (null != message) {
Student student = CommonUtils.deserialize(((TextMessage) message).getText());
System.out.println(
String.format("ActiveMQConsumer1-接受訊息:%d-%s-%s", student.getId(), student.getName(),
DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
}
} catch (JMSException e) {
}
}
});
System.in.read();
connection.close();
}
}
訊息消費者2:
package at.flying.activemq.ptp;
import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;
import javax.jms.*;
/**
* PTP方式接收訊息
*/
public class ActiveMQConsumer2 {
public static void main(String[] args) throws Exception {
// 構造ConnectionFactory例項物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 從工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線,一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取訊息目的地,訊息傳送給誰
Destination destination = session.createQueue("test-queue");
// 消費者,訊息接收者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (null != message) {
Student student = CommonUtils.deserialize(((TextMessage) message).getText());
System.out.println(
String.format("ActiveMQConsumer2-接受訊息:%d-%s-%s", student.getId(), student.getName(),
DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
}
} catch (JMSException e) {
}
}
});
System.in.read();
connection.close();
}
}
先啟動兩個訊息消費者,再啟動訊息生產者,控制檯輸出資訊如下:
訊息生產者:
訊息消費者1:
訊息消費者2:
這個結果使我們很容易理解PTP的訊息傳遞方式。
採用Pub/Sub方式傳遞訊息:
訊息生產者:
package at.flying.activemq.pubsub;
import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;
import javax.jms.*;
import java.util.Date;
/**
* Pub/Sub方式傳遞訊息
*/
public class ActiveMQProducer {
public static void main(String[] args) throws Exception {
// 構造ConnectionFactory例項物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 從工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線,一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取訊息目的地,訊息傳送給誰
Destination destination = session.createTopic("test-topic");
// 獲取訊息生產者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,此處學習,實際根據專案決定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 構造訊息
for (int i = 1; i <= 4; i++) {
Student student = new Student();
student.setId((long) i);
student.setName("學生" + i);
student.setBirthday(new Date());
TextMessage message = session.createTextMessage(CommonUtils.serialize(student));
// 傳送訊息到目的地方
producer.send(message);
System.out.println(String.format("傳送訊息:%d-%s-%s", student.getId(), student.getName(),
DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
}
connection.close();
}
}
訊息消費者1:
package at.flying.activemq.pubsub;
import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;
import javax.jms.*;
/**
* Pub/Sub方式接收訊息
*/
public class ActiveMQConsumer1 {
public static void main(String[] args) throws Exception {
// 構造ConnectionFactory例項物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 從工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線,一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取訊息目的地,訊息傳送給誰
Destination destination = session.createTopic("test-topic");
// 消費者,訊息接收者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (null != message) {
Student student = CommonUtils.deserialize(((TextMessage) message).getText());
System.out.println(
String.format("ActiveMQConsumer1-接受訊息:%d-%s-%s", student.getId(), student.getName(),
DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
}
} catch (JMSException e) {
}
}
});
System.in.read();
connection.close();
}
}
訊息消費者2:
package at.flying.activemq.pubsub;
import at.flying.domain.Student;
import com.github.flyinghe.tools.CommonUtils;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.time.DateFormatUtils;
import javax.jms.*;
/**
* Pub/Sub方式接收訊息
*/
public class ActiveMQConsumer2 {
public static void main(String[] args) throws Exception {
// 構造ConnectionFactory例項物件
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 從工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線,一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取訊息目的地,訊息傳送給誰
Destination destination = session.createTopic("test-topic");
// 消費者,訊息接收者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (null != message) {
Student student = CommonUtils.deserialize(((TextMessage) message).getText());
System.out.println(
String.format("ActiveMQConsumer2-接受訊息:%d-%s-%s", student.getId(), student.getName(),
DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd")));
}
} catch (JMSException e) {
}
}
});
System.in.read();
connection.close();
}
}
先啟動兩個訊息消費者,再啟動訊息生產者,控制檯輸出資訊如下:
訊息生產者:
訊息消費者1:
訊息消費者2:
這個結果使我們很容易理解Pub/Sub的訊息傳遞方式。
總結:
從以上程式碼可以看出PTP與Pub/Sub方式的訊息傳遞,只是在建立訊息目的地的時候不一樣:
PTP方式建立的訊息目的地是Queue(佇列),Pub/Sub方式建立的訊息目的地是Topic(主題)。
問題2-ActiveMQ如何與Spring的整合:
待寫...
問題3-在SpringBoot中如何使用ActiveMQ:
待寫...