1. 程式人生 > >Spring整合ActiveMQ教程

Spring整合ActiveMQ教程

寫在前面:這篇文章會涉及二者的整合思路,為了能夠更好地讀懂本文,建議最好你能夠在不整合其他環境的情況下使用ActiveMQ編寫簡單的控制檯Demo,並且能夠了解JMS(Java Message Service)。當然,你也可以直接略過整合思路看整合的過程。

簡介

ActiveMQ是Apache出品的一個訊息佇列(Message Queue)軟體,它可以與諸如C#、C++、PHP、Java等語言進行整合。本文重點敘述的是與Java Web中Spring框架的整合,ActiveMQ很好地實現了JMS介面,為編寫高併發的應用程式提供了高效的解決方案。

整合思路

在整合之前,我想先說一下思路,古人云:知其然知其所以然嘛~

Spring最厲害的地方就是它的Bean了,還有它特有的IOC(控制反轉)和AOP(面向切面程式設計)技術。有了這些,我們就可以不用new關鍵字構造物件,同時,可以方便地使用注入往類中的屬性進行初始化。如果你編寫過ActiveMQ之類的JMS應用程式,無論對於訊息的生產者還是消費者,最重要的介面有以下兩個:
1.ConnectionFactory
2.Destination

ConnectionFactory是一切的基礎,有了它才有了Connection,然後才有Session,只有通過Session物件,我們才能建立訊息佇列、構建生產者/消費者,繼而傳送/接收訊息。
Destination

是一切的歸宿,它就像匯流排一樣,生產者發出訊息要發到它上面,消費者取訊息也要從這上面取。

試想,如果這一切都能借助Spring強大的Bean管理的話,我們在編寫程式的時候會更加的方便簡潔。幸運的是,ActiveMQ官方提供了完美的Spring框架支援,一切只需要在xml檔案中配置即可~

Spring官方提供了一個叫JmsTemplate的類,這個類就專門用來處理JMS的,在該類的Bean配置標籤中有兩個屬性connectionFactory-refdefaultDestination-ref正好對應JMS中的ConnectionFactoryDestination,如果你有興趣檢視原始碼的話,就可以發現JmsTemplate

幫我們做了大量建立的工作,我們只需要用它來進行收發資訊就ok了,而ActiveMQ官方也提供了對應的實現包。

OK,上面是一個基本的思路,還有一些細節會在整合的過程中說明。

整合過程

首先,先說一下我的配置環境:

  • Spring:4.1.3.RELEASE
  • Activemq:5.15.3
  • IDE:IntelliJ IDEA 15.0.5
  • Maven:3.3.9
  • 此次整合是針對訊息佇列中的P2P(點對點)模式

專案結構:

Step1:配置Maven的pom.xml

這裡我只貼出關鍵的依賴選項:
1、JMS依賴

 <dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
 </dependency>

2、 ActiveMQ核心依賴

<dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.5.0</version>
</dependency>
<dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.7.0</version>
</dependency>

3、 Spring依賴(${spring.version}的值為4.1.3.RELEASE)

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-oxm</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-tx</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-aop</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context-support</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring.version}</version>
    </dependency>

4、日誌依賴

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.6.1</version>
    </dependency>

其中,slf4j一定要有,不然的話執行會報錯,如下圖,提示找不到包。

Step2:配置Spring的spring-activemq.xml

1、首先是各種約束

<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:p="http://www.springframework.org/schema/p"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
         http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"
></beans>

2、如果你在專案中使用了註解,請開啟註解的自動掃描(這裡略)

3、連線到ActiveMQ,建立一個ConnectionFactory

<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
          p:brokerURL="tcp://localhost:61616"></bean>

4、對上步建立的ConnectionFactory進行快取包裝,這樣做的目的是提升效能,對sessions, connections 和 producers進行快取複用,減少開銷。

<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
          p:targetConnectionFactory-ref="amqConnectionFactory"
          p:sessionCacheSize="10"></bean>

5、建立訊息目的地,constructor-arg是目的地名稱

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!--訊息佇列名稱-->
        <constructor-arg value="FOO.TEST"/>
    </bean>

6、構建JmsTemplate

<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="cachedConnectionFactory"
          p:defaultDestination-ref="destination"></bean>

7、對於訊息的消費者,Spring官方提供了一個叫 DMLS(DefaultMessageListenerContainer)的容器,它能有效剋制MDB(Message Driven Beans)的缺點。
要使用這個容器,我們需要建立自己的監聽器(下面會提及),並且註冊進容器中,這樣一旦目的地有訊息,就會自動觸發監聽事件。

<jms:listener-container
        container-type="default"
        connection-factory="amqConnectionFactory"
        acknowledge="auto">
        <jms:listener destination="FOO.TEST" ref="simpleMsgListener" method="onMessage"></jms:listener>
    </jms:listener-container>

其中,connection-factory與前面用org.apache.activemq.ActiveMQConnectionFactory包建立的bean的id對應,listener標籤中destination填前面建立的目的地名稱,ref填listener的bean id。

Step3:建立監聽器

我這裡建立了一個叫SimpleMsgListener的監聽器

package com.test.listener;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Martin Huang on 2018/4/20.
 */
//bean id
@Component(value = "simpleMsgListener")
public class SimpleMsgListener implements MessageListener {

    //收到資訊時的動作
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的資訊:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Step4:建立資訊生成器

package com.test.creator;

import org.springframework.jms.core.MessageCreator;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Created by Martin Huang on 2018/4/20.
 */
public class MyMessageCreator implements MessageCreator {

    private int id;

    public MyMessageCreator(int id)
    {
        this.id = id;
    }

    @Override
    public Message createMessage(Session session) throws JMSException {
        TextMessage message = session.createTextMessage("Spring-ActiveMQ傳送的第【"+id+"】條訊息");
        System.out.println("Spring-ActiveMQ傳送的第【"+id+"】條訊息");
        return message;
    }
}

Step5:建立生產者

package com.test.producer;

import com.test.creator.MyMessageCreator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

/**
 * Created by Martin Huang on 2018/4/20.
 */
@Component(value = "producer")
public class SimpleProducer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage() throws Exception
    {
        //每次傳送10條資訊
        for(int i = 0 ; i < 10 ; i++)
        {
            //這裡填入建立好的資訊生成器
            jmsTemplate.send(new MyMessageCreator(i));
        }
    }
}

Step6:測試

首先開啟ActiveMQ服務,然後編寫我們的測試類

    @Test
    public void testAmqProducer()
    {
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-activemq.xml");
        SimpleProducer simpleProducer = (SimpleProducer) context.getBean("producer");
        try {
            simpleProducer.sendMessage();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

如果順利的話,你可能會看到以下結果:

ActiveMQ控制檯:

可以發現這裡傳送了10條訊息,但是隻處理了8條。這個測試便體現了DMLC(DefaultMessageListenerContainer)的特點:它是非同步容器,它不一定要等有訊息處理之後,再發送新的訊息。

提示

listener-container可以同時支援多個監聽器,如果你設定了多個監聽器,那麼這些監聽器會輪流去佇列中獲取資訊處理。比如我定義了兩個監聽器SimpleMsgListenerSimpleMsgListener1,那麼執行的效果是這樣的:

示例程式碼

Spring-ActiveMQ

參考資料