1. 程式人生 > >activemq實戰之整合Spring

activemq實戰之整合Spring

前面的理論準備已經很充分,這一節我們來實戰:將activemq整合到Spring框架才行中,因為Spring已經集成了JMS,這也為我們配置activermq帶來了方便。

1. Spring對jms的支援

因為Spring已經將JMS整合到框架裡面了,對jms做了自己的封裝,我們使用起來更加方便,在Spring中使用jms比較麻煩的就是配置,在Spring中配置JMS大體需要8個部分:

  1. ConnectionFactory: 和jms伺服器的連線, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker;
  2. Destination: 有topic和queue兩種方式;
  3. JmsTemplate: spring提供的jms模板;
  4. MessageConverter: 訊息轉換器;
  5. MessageProducer: 訊息生產者;
  6. MessageConsumer: 訊息消費者;
  7. MessageListener: 訊息監聽器;
  8. MessageListenerContainer: 訊息監聽容器。

下面我把完整的配置檔案按照上面的步驟拆開分別講解:

1.1首先我們配置ConnectionFactory:

<
amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
  • 1
  • 2
  • 3
  • 4

brokerURL是指要連線的activeMQ server的地址,該配置即使用activemq獨立的訊息儲存環境,即使伺服器重啟訊息也不會丟失。

<!-- 配置JMS連線工廠 -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
    <property name="sessionCacheSize" value="100" />
</bean>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我們從Spring給我們提供的connectionFactory中獲取Connection,並且把該connectionFactory註冊到上面定義的activemq server中。

1.2 Destination:

由前面我們知道Destination有兩種形式:P2P和Pub/Sub。那麼在配置中表示就是:

 <!-- 定義訊息佇列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- 設定訊息佇列的名字 -->
    <constructor-arg>
        <value>first-queue</value>
    </constructor-arg>
</bean>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

或:

<!-- 定義訊息佇列(topic) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <!-- 設定訊息佇列的名字 -->
    <constructor-arg>
        <value>first-queue</value>
    </constructor-arg>
</bean>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.3 JmsTemplate:

將connectionFactory和defaultDestination注入JmsTemplate中:

<!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它傳送、接收訊息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="demoQueueDestination" />
    <property name="receiveTimeout" value="10000" />
    <!-- true是topic,false是queue,預設是false,此處顯示寫出false -->
    <property name="pubSubDomain" value="false" />
</bean>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在Java相關處理檔案中新增(這裡用的是@Inject註解,當然也可以用@Autowired):

@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;

TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
    
  • 1
  • 2
  • 3
  • 4

1.4 MessageConverter

MessageConverter實現的是org.springframework.jms.support.converter.MessageConverter介面, 提供訊息的轉換功能。

<bean id="defaultMessageConverter" class="cn.edu.hust.activemq.filter.DefaultMessageConverter" />
    
  • 1

1.5 MessageProducer和MessageConsumer

此處靈活使用,可以以服務的形式提供也可以以工具類的形式提供,詳情見下面的示例程式碼。

1.6 MessageListener

訊息的消費者應有的有對應的Listener。

 <!-- 配置訊息佇列監聽者(Queue) -->
 <bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" /> 
    
  • 1
  • 2

1.7 MessageListenerContainer

MessageListenerContainer即Listener的容器,用來對Listener坐一些配置,每一個listener都對應著一個Container:

<!-- 顯示注入訊息監聽容器(Queue),配置連線工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
<bean id="queueListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="demoQueueDestination" />
    <property name="messageListener" ref="queueMessageListener" />
</bean>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Spring為我們聽過了兩種型別的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。

  • SimpleMessageListenerContainer會在一開始的時候就建立一個會話Session和消費者Consumer,並且會適用標準的JMS的MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供呼叫監聽器的回撥函式。它不會動態的適應執行時需要和參與外部的事務管理。相容性方面,它非常接近於獨立的JMS規範,但一般不相容J2EE的JMS限制。大多數情況下,我們還是使用DefaultMessageListenerContainer。
  • DefaultMessageListenerContainer,與SimpleMessageListenerContainer相比,它會動態的適應執行時的需求,並且能夠參與外部的事務管理。

上面就是mq的配置檔案部分,如果從上到下的配置部分都清楚地話使用起來肯定沒有問題,我們再做一個簡要的總結:

  1. 可以有一個或者多個訊息生產者向同一個destination傳送訊息;
  2. queue型別的只能有一個訊息消費者;
  3. topic型別的可以有多個訊息消費者;
  4. 每個消費者對應一個MessageListener和一個MessageListenerContainer。

下面我們看一下整合的全部程式碼:

首先上pom.xml看一下依賴的jar包:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>q</groupId>
  <artifactId>q</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>q Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <properties>
    <springframework>4.3.0.RELEASE</springframework>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
    <!-- spring -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${springframework}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${springframework}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${springframework}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>${springframework}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${springframework}</version>
    </dependency>
    <!-- xbean 如<amq:connectionFactory /> -->
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>

    <!-- activemq -->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.12.1</version>
    </dependency>
  </dependencies>
  <build>
    <finalName>q</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

然後是我們的Spring配置檔案applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查詢最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">

    <!-- 指定Sping元件掃描的基本包路徑 -->
    <context:component-scan base-package="cn.edu.hust.activemq" >
        <!-- 這裡只掃描Controller,不可重複載入Service -->
        <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>
    <!-- 啟用MVC註解 -->
    <mvc:annotation-driven />

    <!-- JSP檢視解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/" />
        <property name="suffix" value=".jsp" />
        <!--  定義其解析檢視的order順序為1 -->
        <property name="order" value="1" />
    </bean>
</beans>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

activemq的配置檔案applicationContext-ActiveMQ.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">

    <context:component-scan base-package="cn.edu.hust.activemq" />
    <mvc:annotation-driven />

    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616"
                           userName="admin"
                           password="admin" />

    <!-- 配置JMS連線工廠 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="