04.ActiveMQ與Spring JMS整合
阿新 • • 發佈:2019-01-18
1.SpringJMS核心介面介紹
1.JmsTemplate JmsTemplate: 是Spring自身提供,只需向Spring容器內註冊這個類即可,就可以使用JmsTemplate類物件方便的操作JMS,下面介紹他常用的方法。 注意:JmsTemplate類是執行緒安全的,可以在整個應用範圍使用。但並不是說整個引用只能使用一個JmsTemplate例項,可以根據需要注入多個JmsTemplate例項。// send - 傳送一個訊息,使用訊息建立介面MessageCreator
publicvoid send(MessageCreator messageCreator
publicvoid send(finalDestination destination,finalMessageCreator messageCreator)
publicvoid send(finalString destinationName,finalMessageCreator messageCreator)
// sendAndReceive - 傳送並接收訊息
publicMessage sendAndReceive(MessageCreator messageCreator)
publicMessage sendAndReceive(finalDestination
publicMessage sendAndReceive(finalString destinationName,finalMessageCreator messageCreator)
// convertAndSend - 使用MessageConverter介面轉換訊息,先將物件轉換成訊息再發送訊息。與之對應的是receiveAndConvert方法
publicvoid convertAndSend(Object message)throwsJmsException
publicvoid
publicvoid convertAndSend(String destinationName,finalObject message)
publicvoid convertAndSend(Object message,MessagePostProcessor postProcessor)
publicvoid convertAndSend(Destination destination,finalObject message,finalMessagePostProcessor postProcessor)
publicvoid convertAndSend(String destinationName,finalObject message,finalMessagePostProcessor postProcessor)
// receive - 接收訊息
publicMessage receive()
publicMessage receive(Destination destination)
publicMessage receive(String destinationName)
// receiveSelected - 接收訊息,並過濾訊息
publicMessage receiveSelected(String messageSelector)
publicMessage receiveSelected(finalDestination destination,finalString messageSelector)
publicMessage receiveSelected(finalString destinationName,finalString messageSelector)
// receiveAndConvert - 接收訊息,並使用MessageConverter介面轉換訊息,把一個訊息轉換成一個物件
publicObject receiveAndConvert()
publicObject receiveAndConvert(Destination destination)
publicObject receiveAndConvert(String destinationName)
// receiveSelectedAndConvert - 接收訊息,並使用訊息過濾和訊息轉換
publicObject receiveSelectedAndConvert(String messageSelector)
publicObject receiveSelectedAndConvert(Destination destination,String messageSelector)
publicObject receiveSelectedAndConvert(String destinationName,String messageSelector)
// browse - 瀏覽訊息
public<T> T browse(BrowserCallback<T> action)
public<T> T browse(Queue queue,BrowserCallback<T> action)
public<T> T browse(String queueName,BrowserCallback<T> action)
// browseSelected - 瀏覽訊息,並使用過濾
public<T> T browseSelected(String messageSelector,BrowserCallback<T> action)
public<T> T browseSelected(finalQueue queue,finalString messageSelector,finalBrowserCallback<T> action)
public<T> T browseSelected(finalString queueName,finalString messageSelector,finalBrowserCallback<T> action)
// execute - 執行SessionCallback、ProducerCallback、BrowserCallback回撥介面,並得到回撥介面返回值
public<T> T execute(SessionCallback<T> action)
public<T> T execute(SessionCallback<T> action,boolean startConnection)
public<T> T execute(ProducerCallback<T> action)
public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
- PooledConnectionFactory:會快取connection,session和productor,不會快取consumer。因此只適合於生產者傳送訊息。
- SingleConnectionFactory:對於建立JMS伺服器連結的請求會一直返回同一個Connection,並且會忽略Connection的close方法呼叫(包括呼叫createConnection()得到的Connection)
- CachingConnectionFactory:繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了快取功能,它可以快取Session、MessageProducer和MessageConsumer。
- MessageCreator -- 訊息建立介面,傳送訊息時需要使用此介面建立訊息
- SessionCallback -- 使用JMS Session時的回撥介面
- ProducerCallback -- 使用JMS Session和MessageProducer時的回撥介面
- BrowserCallback -- 使用JMS Session和QueueBrowser時的回撥介面
- MessageListener -- 訊息監聽器介面,註解@JmsListener與其功能相似
- ListenerContainer -- 訊息偵聽器容器介面,實現有SimpleMessageListenerContainer、DefaultMessageListenerContainer
- MessageConverter -- 訊息轉換介面,用於JMS訊息到Java物件之間的轉換
2.訊息監聽器
在Spring整合JMS的應用中我們在定義訊息監聽器的時候一共可以定義三種類型的訊息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種型別的區別。1.MessageListener MessageListener是最原始的訊息監聽器,它是JMS規範中定義的一個介面。其中定義了一個用於處理接收到的訊息的onMessage方法,該方法只接收一個Message引數。我們前面在講配置消費者的時候用的訊息監聽器就是MessageListener,程式碼如下:
publicclassQueueMessageListenerimplementsMessageListener
{
@Override
publicvoid onMessage(Message message)
{
try
{
if(message instanceofTextMessage)
{
TextMessage tm =(TextMessage) message;
System.out.println("監聽訊息:"+ tm.getText());
}
else
{
System.out.println("訊息型別:"+ message.getClass());
}
}
catch(JMSException e)
{
e.printStackTrace();
}
}
}
publicclassQueueSessionAwareMessageListenerimplementsSessionAwareMessageListener<TextMessage>
{
/** 回覆訊息的目的地 */
privateDestination destination;
@Override
publicvoid onMessage(TextMessage message,Session session)throwsJMSException
{
System.out.println("監聽訊息內容:"+ message.getText());
MessageProducer messageProducer = session.createProducer(destination);
TextMessage replyMessage = session.createTextMessage("已收到訊息:"+ message.getJMSMessageID());
messageProducer.send(replyMessage);
}
publicDestination getDestination()
{
return destination;
}
publicvoid setDestination(Destination destination)
{
this.destination = destination;
}
}
- TextMessage轉換為String物件
- BytesMessage轉換為byte陣列
- MapMessage轉換為Map物件
- ObjectMessage轉換為對應的Serializable物件
<!--訊息監聽介面卡-->
<bean id="messageListenerAdapter"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!--通過建構函式指定訊息處理器-->
<constructor-arg>
<bean id="readerMessage"class="com.test.ReaderMessage"/>
</constructor-arg>
<!--指定訊息處理器類處理訊息的方法-->
<property name="defaultListenerMethod" value="receiveMessage"/>
<!--通過delegate屬性指定訊息處理器-->
<property name="delegate">
<bean id="readerMessage"class="com.test.ReaderMessage"/>
</property>
</bean>
publicclassMessageListenerByAdapter
{
publicvoid handleMessage(String message)
{
System.out.println("handleMessage方法處理訊息,訊息內容是:"+ message);
}
publicvoid receiveMessage(String message)
{
System.out.println("receiveMessage方法處理訊息,訊息內容是:"+ message);
}
}
- 第一,可以通過傳送的Message的setJMSReplyTo方法指定該訊息對應的回覆訊息的目的地。這需要生產者傳送訊息之前呼叫setJMSReplyTo方法。
- 第二,通過MessageListenerAdapter的defaultResponseDestination屬性來指定。
- <bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--設定連線工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--設定監聽地址-->
<property name="destination" ref="queueDestination"/>
<!--設定訊息監聽處理器,可以是以上三種-->
<property name="messageListener" ref="queueMessageListener"/>
</bean>
- SimpleMessageListenerContainer:這個訊息偵聽容器是三種中最簡單的。它在啟動時建立一個會話session和消費者Consumer,並且會使用標準的JMS MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供者呼叫監聽器的回撥函式。它不會動態的適應執行時需要和參與外部的事務管理。相容性方面,它非常接近於獨立的JMS規範,但一般不相容Java EE的JMS限制。
- DefaultMessageListenerContainer:這個訊息偵聽器使用的最多。跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應執行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和相容Java EE環境。
- ServerSessionMessageListenerContainer:這個監聽器容器利用JMS ServerSessionPool SPI動態管理JMS Session。使用者各種訊息監聽器可以獲得執行時動態調優功能,但是這也要求JMS提供者支援ServerSessionPool SPI。如果不需要執行時效能調整,請使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。
3.ActiveMQ與SpringJMS整合示例
本節提供了一個SpringJMS與ActiveMQ整合的示例,本示例可以作為參考。 1.配置spring-context-jms.xml<?xml version="1.0" encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd"
default-lazy-init="false">
<!-- 配置JMS連線工廠 -->
<beanid="connectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL"value="failover:(tcp://localhost:61616)"/>
</bean>
<!-- ActiveMQ連線池配置,ActiveMQ實現 -->
<!--
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
-->
<!-- ActiveMQ連線池配置,SpingJMS實現 -->
<beanid="cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
<propertyname="targetConnectionFactory"ref="connectionFactory"/>
<!-- Session快取數量,這裡屬性也可以直接在這裡配置 -->
<propertyname="sessionCacheSize"value="100"/>
</bean>
<!-- 訊息佇列01 -->
<beanid="queueDestination01"class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設定訊息佇列的名字 -->
<constructor-arg>
<value>spring-jms-queue01</value>
</constructor-arg>
</bean>
<!-- 訊息佇列02 -->
<beanid="queueDestination02"class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設定訊息佇列的名字 -->
<constructor-arg>
<value>spring-jms-queue02</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板,Spring提供的JMS工具類,它傳送、接收訊息。 -->
<beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<propertyname="connectionFactory"ref="cachingConnectionFactory"/>
<propertyname="defaultDestination"ref="queueDestination01"/>
<propertyname="receiveTimeout"value="10000"/>
</bean>
<!-- 訊息監聽容器(Queue),配置連線工廠,監聽的佇列是spring-jms-queue02,監聽器是上面定義的監聽器 -->
<beanid="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<propertyname="connectionFactory"ref="cachingConnectionFactory"/>
<propertyname="destination"ref="queueDestination02"/>
<propertyname="messageListener">
<beanid="queueMessageListener"class="spring.QueueMessageListener"/>
</property>
</bean>
</beans>