1. 程式人生 > >04.ActiveMQ與Spring JMS整合

04.ActiveMQ與Spring JMS整合

1.SpringJMS核心介面介紹

1.JmsTemplate JmsTemplate: 是Spring自身提供,只需向Spring容器內註冊這個類即可,就可以使用JmsTemplate類物件方便的操作JMS,下面介紹他常用的方法。     注意:JmsTemplate類是執行緒安全的,可以在整個應用範圍使用。但並不是說整個引用只能使用一個JmsTemplate例項,可以根據需要注入多個JmsTemplate例項。
  1. // send - 傳送一個訊息,使用訊息建立介面MessageCreator
  2. publicvoid send(MessageCreator messageCreator
    )
  3. publicvoid send(finalDestination destination,finalMessageCreator messageCreator)
  4. publicvoid send(finalString destinationName,finalMessageCreator messageCreator)
  5. // sendAndReceive - 傳送並接收訊息
  6. publicMessage sendAndReceive(MessageCreator messageCreator)
  7. publicMessage sendAndReceive(finalDestination
    destination,finalMessageCreator messageCreator)
  8. publicMessage sendAndReceive(finalString destinationName,finalMessageCreator messageCreator)
  9. // convertAndSend - 使用MessageConverter介面轉換訊息,先將物件轉換成訊息再發送訊息。與之對應的是receiveAndConvert方法
  10. publicvoid convertAndSend(Object message)throwsJmsException
  11. publicvoid
    convertAndSend(Destination destination,finalObject message)
  12. publicvoid convertAndSend(String destinationName,finalObject message)
  13. publicvoid convertAndSend(Object message,MessagePostProcessor postProcessor)
  14. publicvoid convertAndSend(Destination destination,finalObject message,finalMessagePostProcessor postProcessor)
  15. publicvoid convertAndSend(String destinationName,finalObject message,finalMessagePostProcessor postProcessor)
  16. // receive - 接收訊息
  17. publicMessage receive()
  18. publicMessage receive(Destination destination)
  19. publicMessage receive(String destinationName)
  20. // receiveSelected - 接收訊息,並過濾訊息
  21. publicMessage receiveSelected(String messageSelector)
  22. publicMessage receiveSelected(finalDestination destination,finalString messageSelector)
  23. publicMessage receiveSelected(finalString destinationName,finalString messageSelector)
  24. // receiveAndConvert - 接收訊息,並使用MessageConverter介面轉換訊息,把一個訊息轉換成一個物件
  25. publicObject receiveAndConvert()
  26. publicObject receiveAndConvert(Destination destination)
  27. publicObject receiveAndConvert(String destinationName)
  28. // receiveSelectedAndConvert - 接收訊息,並使用訊息過濾和訊息轉換
  29. publicObject receiveSelectedAndConvert(String messageSelector)
  30. publicObject receiveSelectedAndConvert(Destination destination,String messageSelector)
  31. publicObject receiveSelectedAndConvert(String destinationName,String messageSelector)
  32. // browse - 瀏覽訊息
  33. public<T> T browse(BrowserCallback<T> action)
  34. public<T> T browse(Queue queue,BrowserCallback<T> action)
  35. public<T> T browse(String queueName,BrowserCallback<T> action)
  36. // browseSelected - 瀏覽訊息,並使用過濾
  37. public<T> T browseSelected(String messageSelector,BrowserCallback<T> action)
  38. public<T> T browseSelected(finalQueue queue,finalString messageSelector,finalBrowserCallback<T> action)
  39. public<T> T browseSelected(finalString queueName,finalString messageSelector,finalBrowserCallback<T> action)
  40. // execute - 執行SessionCallback、ProducerCallback、BrowserCallback回撥介面,並得到回撥介面返回值
  41. public<T> T execute(SessionCallback<T> action)
  42. public<T> T execute(SessionCallback<T> action,boolean startConnection)
  43. public<T> T execute(ProducerCallback<T> action)
  44. public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
  45. public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
2.連線工廠(連線池) JmsTemplate需要引用一個ConnectionFactory,JmsTemlate每次傳送訊息時都會重新建立連線,建立connection,session,建立productor。這是一個非常耗效能的地方,特別是大資料量的情況下。所以出現了PooledConnectionFactory,PooledConnectionFactory是ActiveMQ中的類,它實現了ConnectionFactory。SpringJMS的實現有SingleConnectionFactory、CachingConnectionFactory。
  1. PooledConnectionFactory:會快取connection,session和productor,不會快取consumer。因此只適合於生產者傳送訊息。
  2. SingleConnectionFactory:對於建立JMS伺服器連結的請求會一直返回同一個Connection,並且會忽略Connection的close方法呼叫(包括呼叫createConnection()得到的Connection)
  3. CachingConnectionFactory:繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了快取功能,它可以快取Session、MessageProducer和MessageConsumer。
SpringJMS提供了CachingConnectionFactory,這才是首選的方案。然而CachingConnectionFactory有一個問題必須指出,預設情況下,CachingConnectionFactory只快取一個session,在它的JavaDoc中,它宣告對於低併發情況下這是足夠的。與之相反,PooledConnectionFactory的預設值是500。這些設定,在很多情況下,需要親自去測試並驗證。我將其設定為100,對我來說還是很不錯。 3.介面介紹 以下是SpringJMS的常用介面或類:
  1. MessageCreator -- 訊息建立介面,傳送訊息時需要使用此介面建立訊息
  2. SessionCallback -- 使用JMS Session時的回撥介面
  3. ProducerCallback -- 使用JMS Session和MessageProducer時的回撥介面
  4. BrowserCallback -- 使用JMS Session和QueueBrowser時的回撥介面
  5. MessageListener -- 訊息監聽器介面,註解@JmsListener與其功能相似
  6. ListenerContainer -- 訊息偵聽器容器介面,實現有SimpleMessageListenerContainer、DefaultMessageListenerContainer
  7. MessageConverter -- 訊息轉換介面,用於JMS訊息到Java物件之間的轉換

2.訊息監聽器

在Spring整合JMS的應用中我們在定義訊息監聽器的時候一共可以定義三種類型的訊息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種型別的區別。
1.MessageListener     MessageListener是最原始的訊息監聽器,它是JMS規範中定義的一個介面。其中定義了一個用於處理接收到的訊息的onMessage方法,該方法只接收一個Message引數。我們前面在講配置消費者的時候用的訊息監聽器就是MessageListener,程式碼如下:
  1. publicclassQueueMessageListenerimplementsMessageListener
  2. {
  3. @Override
  4. publicvoid onMessage(Message message)
  5. {
  6. try
  7. {
  8. if(message instanceofTextMessage)
  9. {
  10. TextMessage tm =(TextMessage) message;
  11. System.out.println("監聽訊息:"+ tm.getText());
  12. }
  13. else
  14. {
  15. System.out.println("訊息型別:"+ message.getClass());
  16. }
  17. }
  18. catch(JMSException e)
  19. {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
在上面程式碼中我們只是簡單的列印了一些訊息相關的資訊。 2.SessionAwareMessageListener SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收訊息的,假如我們在使用MessageListener處理接收到的訊息時我們需要傳送一個訊息通知對方我們已經收到這個訊息了,那麼這個時候我們就需要在程式碼裡面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到訊息後傳送一個回覆的訊息,它同樣為我們提供了一個處理接收到的訊息的onMessage方法,但是這個方法可以同時接收兩個引數,一個是表示當前接收到的訊息Message,另一個就是可以用來發送訊息的Session物件。先來看一段程式碼:
  1. publicclassQueueSessionAwareMessageListenerimplementsSessionAwareMessageListener<TextMessage>
  2. {
  3. /** 回覆訊息的目的地 */
  4. privateDestination destination;
  5. @Override
  6. publicvoid onMessage(TextMessage message,Session session)throwsJMSException
  7. {
  8. System.out.println("監聽訊息內容:"+ message.getText());
  9. MessageProducer messageProducer = session.createProducer(destination);
  10. TextMessage replyMessage = session.createTextMessage("已收到訊息:"+ message.getJMSMessageID());
  11. messageProducer.send(replyMessage);
  12. }
  13. publicDestination getDestination()
  14. {
  15. return destination;
  16. }
  17. publicvoid setDestination(Destination destination)
  18. {
  19. this.destination = destination;
  20. }
  21. }
在上面程式碼中我們定義了一個SessionAwareMessageListener,在這個Listener中我們在接收到了一個訊息之後,利用對應的Session建立了一個到destination的生產者和對應的訊息,然後利用建立好的生產者傳送對應的訊息。 3.MessageListenerAdapter MessageListenerAdapter類實現了MessageListener介面和SessionAwareMessageListener介面,它的主要作用是將接收到的訊息進行型別轉換,然後通過反射的形式把它交給一個普通的Java類進行處理。MessageListenerAdapter會把接收到的訊息做如下轉換:
  1. TextMessage轉換為String物件
  2. BytesMessage轉換為byte陣列
  3. MapMessage轉換為Map物件
  4. ObjectMessage轉換為對應的Serializable物件
既然前面說了MessageListenerAdapter會把接收到的訊息做一個型別轉換,然後利用反射把它交給真正的目標處理器——一個普通的Java類進行處理,如果真正的目標處理器是一個MessageListener或者是一個SessionAwareMessageListener,那麼Spring將直接使用接收到的Message物件作為引數呼叫它們的onMessage方法,而不會再利用反射去進行呼叫。那麼我們在定義一個MessageListenerAdapter的時候就需要為它指定這樣一個目標類。這個目標類我們可以通過MessageListenerAdapter的構造方法引數指定,也可以通過屬性delegate指定,如:
  1. <!--訊息監聽介面卡-->
  2. <bean id="messageListenerAdapter"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  3. <!--通過建構函式指定訊息處理器-->
  4. <constructor-arg>
  5. <bean id="readerMessage"class="com.test.ReaderMessage"/>
  6. </constructor-arg>
  7. <!--指定訊息處理器類處理訊息的方法-->
  8. <property name="defaultListenerMethod" value="receiveMessage"/>
  9. <!--通過delegate屬性指定訊息處理器-->
  10. <property name="delegate">
  11. <bean id="readerMessage"class="com.test.ReaderMessage"/>
  12. </property>
  13. </bean>
前面說了如果我們指定的這個目標處理器是一個MessageListener或者是一個SessionAwareMessageListener的時候Spring將直接利用接收到的Message物件作為方法引數呼叫它們的onMessage方法。但是如果指定的目標處理器是一個普通的Java類時Spring將利用Message進行了型別轉換之後的物件作為引數通過反射去呼叫真正的目標處理器的處理方法,那麼Spring是如何知道該呼叫哪個方法呢?這是通過MessageListenerAdapter的defaultListenerMethod屬性來決定的,當我們沒有指定該屬性時,Spring會預設呼叫目標處理器的handleMessage方法。若使用以下MessageListenerByAdapter類處理訊息,可以配置MessageListenerAdapterdelegate屬性來設定處理訊息的方法。
  1. publicclassMessageListenerByAdapter
  2. {
  3. publicvoid handleMessage(String message)
  4. {
  5. System.out.println("handleMessage方法處理訊息,訊息內容是:"+ message);
  6. }
  7. publicvoid receiveMessage(String message)
  8. {
  9. System.out.println("receiveMessage方法處理訊息,訊息內容是:"+ message);
  10. }
  11. }
MessageListenerAdapter除了會自動的把一個普通Java類當做MessageListener來處理接收到的訊息之外,其另外一個主要的功能是可以自動的傳送返回訊息當我們設定的訊息處理方法的返回值不為空的時候,SpringJMS會自動將它封裝為一個JMS Message,然後自動進行回覆。那麼這個時候這個回覆訊息將傳送到哪裡呢?這主要有兩種方式可以指定。
  1. 第一,可以通過傳送的Message的setJMSReplyTo方法指定該訊息對應的回覆訊息的目的地。這需要生產者傳送訊息之前呼叫setJMSReplyTo方法。
  2. 第二,通過MessageListenerAdapter的defaultResponseDestination屬性來指定。
注意:當兩種方式都指定了訊息的回覆目的地的時候使用傳送訊息的setJMSReplyTo方法指定的目的地將具有較高的優先順序。 4.配置訊息監聽器     配置以上三種訊息監聽器,需要使用訊息偵聽容器(MessageListenerContainer),在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪裡監聽的ConnectionFactory;一個是表示監聽什麼的Destination;一個是接收到訊息以後進行訊息處理的MessageListener。示例如下:
  1. <bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <!--設定連線工廠-->
  3. <property name="connectionFactory" ref="connectionFactory"/>
  4. <!--設定監聽地址-->
  5. <property name="destination" ref="queueDestination"/>
  6. <!--設定訊息監聽處理器,可以是以上三種-->
  7. <property name="messageListener" ref="queueMessageListener"/>
  8. </bean>
Spring提供了三種AbstractMessageListenerContainer的子類,每種各有其特點。
  1. SimpleMessageListenerContainer:這個訊息偵聽容器是三種中最簡單的。它在啟動時建立一個會話session和消費者Consumer,並且會使用標準的JMS MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供者呼叫監聽器的回撥函式。它不會動態的適應執行時需要和參與外部的事務管理。相容性方面,它非常接近於獨立的JMS規範,但一般不相容Java EE的JMS限制。
  2. DefaultMessageListenerContainer:這個訊息偵聽器使用的最多。跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應執行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和相容Java EE環境。
  3. ServerSessionMessageListenerContainer:這個監聽器容器利用JMS ServerSessionPool SPI動態管理JMS Session。使用者各種訊息監聽器可以獲得執行時動態調優功能,但是這也要求JMS提供者支援ServerSessionPool SPI。如果不需要執行時效能調整,請使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。

3.ActiveMQ與SpringJMS整合示例

    本節提供了一個SpringJMS與ActiveMQ整合的示例,本示例可以作為參考。 1.配置spring-context-jms.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beansxmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:jms="http://www.springframework.org/schema/jms"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/jms
  8. http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
  9. http://www.springframework.org/schema/beans
  10. http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  11. http://www.springframework.org/schema/context
  12. http://www.springframework.org/schema/context/spring-context-4.0.xsd"
  13. default-lazy-init="false">
  14. <!-- 配置JMS連線工廠 -->
  15. <beanid="connectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
  16. <propertyname="brokerURL"value="failover:(tcp://localhost:61616)"/>
  17. </bean>
  18. <!-- ActiveMQ連線池配置,ActiveMQ實現 -->
  19. <!--
  20. <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
  21. <property name="connectionFactory" ref="connectionFactory" />
  22. </bean>
  23. -->
  24. <!-- ActiveMQ連線池配置,SpingJMS實現 -->
  25. <beanid="cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
  26. <propertyname="targetConnectionFactory"ref="connectionFactory"/>
  27. <!-- Session快取數量,這裡屬性也可以直接在這裡配置 -->
  28. <propertyname="sessionCacheSize"value="100"/>
  29. </bean>
  30. <!-- 訊息佇列01 -->
  31. <beanid="queueDestination01"class="org.apache.activemq.command.ActiveMQQueue">
  32. <!-- 設定訊息佇列的名字 -->
  33. <constructor-arg>
  34. <value>spring-jms-queue01</value>
  35. </constructor-arg>
  36. </bean>
  37. <!-- 訊息佇列02 -->
  38. <beanid="queueDestination02"class="org.apache.activemq.command.ActiveMQQueue">
  39. <!-- 設定訊息佇列的名字 -->
  40. <constructor-arg>
  41. <value>spring-jms-queue02</value>
  42. </constructor-arg>
  43. </bean>
  44. <!-- 配置JMS模板,Spring提供的JMS工具類,它傳送、接收訊息。 -->
  45. <beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
  46. <propertyname="connectionFactory"ref="cachingConnectionFactory"/>
  47. <propertyname="defaultDestination"ref="queueDestination01"/>
  48. <propertyname="receiveTimeout"value="10000"/>
  49. </bean>
  50. <!-- 訊息監聽容器(Queue),配置連線工廠,監聽的佇列是spring-jms-queue02,監聽器是上面定義的監聽器 -->
  51. <beanid="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  52. <propertyname="connectionFactory"ref="cachingConnectionFactory"/>
  53. <propertyname="destination"ref="queueDestination02"/>
  54. <propertyname="messageListener">
  55. <beanid="queueMessageListener"class="spring.QueueMessageListener"/>
  56. </property>
  57. </bean>
  58. </beans>
注意: