ActiveMQ安裝啟動
ActiveMQ是面向訊息中介軟體(Message-oriented middleware),是用於以分散式應用或系統中的非同步、鬆耦合、可靠、可擴充套件和安全通訊的一類軟體。總體思想是它作為訊息傳送器和訊息接收器之間的訊息中介,這種中介提供了一個全新水平的鬆耦合。
JMS 叫做 Java 訊息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規範,旨在通過提供標準的產生、傳送、接收和處理訊息的 API 簡化企業應用的開發,類似於 JDBC 和關係型資料庫通訊方式的抽象。
首先到網上下載activemq,網站:http://activemq.apache.org/download-archives.html
下載完後解壓後文件結構:
開啟doc命令,進入到activemq的bin目錄,輸入activemq.bat,回車:
開啟瀏覽器訪問網站http://localhost:8161/admin,使用者名稱密碼預設是admin,activemq便啟動好了
接下來我們用idea進行測試:
首先搭建好專案,在pom.xml中匯入所需jar包:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.szxs</groupId> <artifactId>AcviteMQQueueDemo1</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.2.9.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.9.RELEASE</version> </dependency> </dependencies> </project>
activemq中分為兩種模式:
第一種:queue,佇列模式,一對一訊息傳送和接受(先發送,後接受)
第二張:topic,訂閱模式,多對多訊息傳送和接受(先接受,後傳送)
下面先來測試queue模式,分為三類(監聽器,傳送訊息,接受訊息):
queue監聽器listener程式碼:
package com.szxs.queue.listener; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 點對點訊息接收者 使用Listener 監聽方式 在實際專案開發中使用比較多 */ public class QueueReceiver_Listener { // tcp 地址 伺服器器端地址 public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616"; // 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp中可以查詢到傳送的mq訊息 public static final String DESTINATION = "xs.mq.queue"; //測試連線使用預設的使用者名稱 public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null //測試連線使用預設的密碼 public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null public static void run() throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 1、建立連結工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueReceiver_Listener.DEFAULT_USER, QueueReceiver_Listener.DEFAULT_PASSWORD,QueueReceiver_Listener.BROKER_URL); // 2、通過工廠建立一個連線 connection = factory.createQueueConnection(); // 3、啟動連線 connection.start(); // 4、建立一個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 5、建立一個訊息佇列 Queue queue = session.createQueue(DESTINATION); // 建立訊息接收者 javax.jms.QueueReceiver receiver = session.createReceiver(queue); //使用內部類為訊息接收者載入相應的Listener監聽 receiver.setMessageListener(new MessageListener() { //重寫onMessage方法 public void onMessage(Message msg) { if (msg != null) { TextMessage textMessage = (TextMessage) msg; try { System.out.println("接收#" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠10s再關閉 接收生產者傳送的全部的10條訊息 // 需要注意的是這裡使用sleep會使當前正在執行的執行緒進入休眠狀態 // 也就是QueueReceiver_Listener這個類進入休眠狀態了,而接收者的監聽器仍然會繼續執行的哦。 Thread.sleep(1000 * 10); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueReceiver_Listener.run(); } }
queue傳送訊息sender程式碼:
package com.szxs.queue.send;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 點對點訊息傳送者
*/
public class QueueSender {
// 傳送次數
public static final int SEND_NUM = 10;
// tcp 地址 伺服器器端地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616";
// 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp中可以查詢到傳送的mq訊息
public static final String DESTINATION = "xs.mq.queue";
//測試連線使用預設的使用者名稱
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
//測試連線使用預設的密碼
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null
/**
* 傳送訊息
* @param session
* @param sender
* @throws Exception
*/
public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "傳送第" + (i + 1) + "條訊息";
TextMessage textMessage = session.createTextMessage(message);
System.out.println(textMessage.getText());
sender.send(textMessage);
}
}
/**
* 建立連線併發送訊息
* @throws Exception
*/
public static void run() throws Exception {
//點對點佇列連線
QueueConnection connection = null;
//點對點會話Session
QueueSession session = null;
try {
// 1、建立連結工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueSender.DEFAULT_USER, QueueSender.DEFAULT_PASSWORD, QueueSender.BROKER_URL);
// 2、通過工廠建立一個連線
connection = factory.createQueueConnection();
// 3、啟動連線
connection.start();
// 4、建立一個session會話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、建立一個訊息佇列
Queue queue = session.createQueue(DESTINATION);
// 6、建立訊息傳送者
javax.jms.QueueSender sender = session.createSender(queue);
// 設定持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueSender.run();
}
}
queue接受訊息receive程式碼:
package com.szxs.queue.receive;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 點對點訊息接收者 直接Receive 方式
*/
public class QueueReceiver_Receive {
// 接收訊息的個數
public static final int Receive_NUM = 10;
// tcp 地址 伺服器器端地址
public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616";
// 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp中可以查詢到傳送的mq訊息
public static final String DESTINATION = "xs.mq.queue";
//測試連線使用預設的使用者名稱
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
//測試連線使用預設的密碼
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 1、建立連結工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueReceiver_Receive.DEFAULT_USER, QueueReceiver_Receive.DEFAULT_PASSWORD,QueueReceiver_Receive.BROKER_URL);
// 2、通過工廠建立一個連線
connection = factory.createQueueConnection();
// 3、啟動連線
connection.start();
// 4、建立一個session會話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、建立一個訊息佇列
Queue queue = session.createQueue(DESTINATION);
// 建立訊息接收者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
// 直接Receive 方式 接收訊息
for(int i=0;i<QueueReceiver_Receive.Receive_NUM;i++){
TextMessage textMessage=(TextMessage) receiver.receive();
if(textMessage!=null)
System.out.println("接收#" + textMessage.getText());
}
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
// 關閉會話
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver_Receive.run();
}
}
然後分別執行queue程式碼(傳送訊息—>接受訊息—>監聽器),瀏覽器按下圖點選,這就是queue佇列模式
下面測試topic模式,分為三類(監聽器,傳送訊息,接受訊息):
topic監聽器listener程式碼:
package com.szxs.topic.listener;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 釋出訂閱式訊息接收者
*/
public class TopicReceiver_Listener {
// tcp 地址 伺服器器端地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616";
// 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/topics.jsp中可以查詢到傳送的mq訊息
public static final String DESTINATION = "xs.mq.topic";
//測試連線使用預設的使用者名稱
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
//測試連線使用預設的密碼
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、建立連結工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver_Listener.DEFAULT_USER, TopicReceiver_Listener.DEFAULT_PASSWORD, TopicReceiver_Listener.BROKER_URL);
// 2、通過工廠建立一個連線
connection = factory.createTopicConnection();
// 3、啟動連線
connection.start();
// 4、建立一個session會話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、建立一個訊息佇列
Topic topic = session.createTopic(DESTINATION);
// 6、建立訊息製作者
TopicSubscriber subscriber = session.createSubscriber(topic);
//使用監聽器的方式訂閱訊息
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
TextMessage textMessage = (TextMessage) msg;
try {
System.out.println("接收#" + textMessage.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再關閉 接收生產者傳送的全部的10條訊息
// 需要注意的是這裡使用sleep會使當前正在執行的執行緒進入休眠狀態
// 也就是TopicReceiver_Listener這個類進入休眠狀態了,而接收者的監聽器仍然會繼續執行的哦。
Thread.sleep(1000 *100);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver_Listener.run();
}
}
topic傳送訊息sender程式碼:
package com.szxs.topic.send;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 釋出訂閱式訊息傳送者
*/
public class TopicProducer {
// 傳送次數
public static final int SEND_NUM = 10;
// tcp 地址 伺服器器端地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616";
// 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/topics.jsp中可以查詢到傳送的mq訊息
public static final String DESTINATION = "xs.mq.topic";
//測試連線使用預設的使用者名稱
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
//測試連線使用預設的密碼
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null
/**
* 訊息傳送端
* @param session
* @param publisher
* @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "傳送訊息第" + (i + 1) + "條";
TextMessage textMessage = session.createTextMessage(message);
System.out.println(textMessage.getText());
//傳送 Topic訊息
publisher.send(textMessage);
}
}
public void run() throws Exception {
//Topic連線
TopicConnection connection = null;
//Topic會話
TopicSession session = null;
try {
// 1、建立連結工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducer.DEFAULT_USER, TopicProducer.DEFAULT_PASSWORD, TopicProducer.BROKER_URL);
// 2、通過工廠建立一個連線
connection = factory.createTopicConnection();
// 3、啟動連線
connection.start();
// 4、建立一個session會話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、建立一個訊息佇列
Topic topic = session.createTopic(DESTINATION);
// 6、建立訊息傳送者
TopicPublisher publisher = session.createPublisher(topic);
// 設定持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
new TopicProducer().run();
}
}
topic接受訊息receive程式碼:
package com.szxs.topic.receive;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 釋出訂閱式訊息接收者
*/
public class TopicReceiver_Receive {
// tcp 地址 伺服器器端地址
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 其值為 "tcp://localhost:61616";
// 目標地址,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/topics.jsp中可以查詢到傳送的mq訊息
public static final String DESTINATION = "xs.mq.topic";
//測試連線使用預設的使用者名稱
public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
//測試連線使用預設的密碼
public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 1、建立連結工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver_Receive.DEFAULT_USER, TopicReceiver_Receive.DEFAULT_PASSWORD, TopicReceiver_Receive.BROKER_URL);
// 2、通過工廠建立一個連線
connection = factory.createTopicConnection();
// 3、啟動連線
connection.start();
// 4、建立一個session會話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 5、建立一個訊息佇列
Topic topic = session.createTopic(DESTINATION);
// 6、建立訊息製作者
final TopicSubscriber subscriber = session.createSubscriber(topic);
//接收Topic生產者傳送過來的訊息
//需要注意的是此處需要啟動一個新的執行緒來處理問題
new Thread(){
public void run(){
TextMessage textMessage = null;
try {
while(true){//持續接收訊息
textMessage = (TextMessage) subscriber.receive();
if(textMessage==null)
break;
System.out.println("接收#" + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}.start();
// 休眠100s再關閉 接收生產者傳送的全部的10條訊息
// 需要注意的是這裡使用sleep會使當前正在執行的執行緒進入休眠狀態
// 也就是TopicReceiver_Receive這個類進入休眠狀態了,而接收者.start方法剛剛啟動的新執行緒會繼續執行的哦。
Thread.sleep(1000 *100);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver_Receive.run();
}
}
然後分別執行topic程式碼(接受訊息—>傳送訊息—>監聽器),瀏覽器按下圖點選,這就是topic訂閱模式
結合spring使用:
首先建立applicationContext.xml檔案,程式碼如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!--連線池-->
<!--<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
</bean>-->
<!-- 連線工廠 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- 配置訊息目標 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="xs.mq.queue"/>
</bean>
<!-- 訊息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="activeMQConnectionFactory"/>
<property name="defaultDestination" ref="destination"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
然後建立傳送訊息,接受訊息類:
傳送訊息Sender程式碼:
package com.spring.send;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.*;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
* Spring JMSTemplate 訊息傳送者<br>
* 將JMS整合到spring上面進行開發
*/
public class Sender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date=formatter.format(new Date());
String text="current system time: "+date;
message.setText(text);
System.out.println(text);
return message;
}
});
}
}
接受訊息Receiver程式碼:
package com.spring.receive;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
* Spring JMSTemplate 訊息接收者<br>
* 將JMS整合到spring上面進行開發
*/
public class Receiver {
public static void main(String[] args) throws JMSException {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
while(true) {
//設定10s超時時間
jmsTemplate.setReceiveTimeout(1000*10);
TextMessage text = (TextMessage) jmsTemplate.receive();
if(text==null)
break;
//接收到相應的訊息
System.out.println("收到訊息:" + text.getText());
}
}
}
然後執行,瀏覽器點選,這就是結合spring使用:
謝謝大家!