1. 程式人生 > >ActiveMQ介紹和整合Spring使用

ActiveMQ介紹和整合Spring使用

ActiveMQ介紹和整合Spring使用

一、什麼是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
主要特點:

  1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
  3. 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
  4. 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
  5. 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支援通過JDBC和journal提供高速的訊息持久化
  7. 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
  8. 支援Ajax
  9. 支援與Axis的整合
  10. 可以很容易得呼叫內嵌JMS provider,進行測試

二、ActiveMQ的訊息形式

對於訊息的傳遞有兩種型別:
一種是點對點的,即一個生產者和一個消費者一一對應;
另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。
  • StreamMessage – Java原始值的資料流
  • MapMessage–一套名稱-值對
  • TextMessage–一個字串物件
  • ObjectMessage–一個序列化的 Java物件
  • BytesMessage–一個位元組的資料流

三、ActiveMQ的安裝

在這裡插入圖片描述
安裝環境:

1、需要jdk
2、安裝Linux系統。生產環境都是Linux系統。

安裝步驟

第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啟動。

使用bin目錄下的activemq命令啟動:

[[email protected] bin]# ./activemq start
關閉:
[[email protected] bin]# ./activemq stop
檢視狀態:
[[email protected] bin]# ./activemq status

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2

進入管理後臺:
http://192.168.25.168:8161/admin
使用者名稱:admin
密碼:admin

在這裡插入圖片描述
四、ActiveMQ的使用方法
在這裡插入圖片描述
五、Queue

(1)Producer

生產者:生產訊息,傳送端。
把jar包新增到工程中。使用5.11.2版本的jar包。

在這裡插入圖片描述

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
第六步:使用Session物件建立一個Producer物件。
第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。

@Test
	public void testQueueProducer() throws Exception {
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		//brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
		//引數:佇列的名稱。
		Queue queue = session.createQueue("test-queue");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(queue);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*TextMessage message = new ActiveMQTextMessage();
		message.setText("hello activeMq,this is my first test.");*/
		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}

(2)Consumer

消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
第六步:使用Session物件建立一個Consumer物件。
第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源

@Test
	public void testQueueConsumer() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
		Queue queue = session.createQueue("test-queue");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(queue);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					//取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		//等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

五、Topic

(1)Producer

使用步驟:
第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Topic物件。
第六步:使用Session物件建立一個Producer物件。
第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。

@Test
	public void testTopicProducer() throws Exception {
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		// brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		// 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		// 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
		// 引數:話題的名稱。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(topic);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*
		 * TextMessage message = new ActiveMQTextMessage(); message.setText(
		 * "hello activeMq,this is my first test.");
		 */
		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}

(2)Consumer

消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
第六步:使用Session物件建立一個Consumer物件。
第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源

@Test
	public void testTopicConsumer() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(topic);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {

			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					// 取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("topic的消費端03。。。。。");
		// 等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

六、Activemq整合spring

使用方法

第一步:引用相關的jar包。

<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context-support</artifactId>
		</dependency>

第二步:配置Activemq整合spring。配置ConnectionFactory

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.25.168:61616" />
	</bean>
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>
</beans>

第三步:配置生產者。

使用JMSTemplate物件。傳送訊息。

第四步:在spring容器中配置Destination。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.25.168:61616" />
	</bean>
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>
	<!-- 配置生產者 -->
	<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<!--這個是佇列目的地,點對點的 -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>spring-queue</value>
		</constructor-arg>
	</bean>
	<!--這個是主題目的地,一對多的 -->
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="topic" />
	</bean>
</beans>

第五步:程式碼測試

@Test
	public void testSpringActiveMq() throws Exception {
		//初始化spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
		//從spring容器中獲得JmsTemplate物件
		JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
		//從spring容器中取Destination物件
		Destination destination = (Destination) applicationContext.getBean("queueDestination");
		//使用JmsTemplate物件傳送訊息。
		jmsTemplate.send(destination, new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				//建立一個訊息物件並返回
				TextMessage textMessage = session.createTextMessage("spring activemq queue message");
				return textMessage;
			}
		});
	}

第六步:程式碼測試

傳送訊息

第一步:初始化一個spring容器
第二步:從容器中獲得JMSTemplate物件。
第三步:從容器中獲得一個Destination物件
第四步:使用JMSTemplate物件傳送訊息,需要知道Destination

@Test
	public void testQueueProducer() throws Exception {
		// 第一步:初始化一個spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
		// 第二步:從容器中獲得JMSTemplate物件。
		JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
		// 第三步:從容器中獲得一個Destination物件
		Queue queue = (Queue) applicationContext.getBean("queueDestination");
		// 第四步:使用JMSTemplate物件傳送訊息,需要知道Destination
		jmsTemplate.send(queue, new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage textMessage = session.createTextMessage("spring activemq test");
				return textMessage;
			}
		});
	}

接收訊息

e3-search-Service中接收訊息。
第一步:把Activemq相關的jar包新增到工程中
第二步:建立一個MessageListener的實現類。

public class MyMessageListener implements MessageListener {

	@Override
	public void onMessage(Message message) {
		
		try {
			TextMessage textMessage = (TextMessage) message;
			//取訊息內容
			String text = textMessage.getText();
			System.out.println(text);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

第三步:配置spring和Activemq整合。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://192.168.25.168:61616" />
	</bean>
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory"
		class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="targetConnectionFactory" />
	</bean>
	<!--這個是佇列目的地,點對點的 -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>spring-queue</value>
		</constructor-arg>
	</bean>
	<!--這個是主題目的地,一對多的 -->
	<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg value="topic" />
	</bean>
	<!-- 接收訊息 -->
	<!-- 配置監聽器 -->
	<bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
	<!-- 訊息監聽容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="queueDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>
</beans>

第四步:測試程式碼。

@Test
	public void testQueueConsumer() throws Exception {
		//初始化spring容器
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
		//等待
		System.in.read();
	}