1. 程式人生 > >訊息中介軟體入門「二」:在spring中使用activemq

訊息中介軟體入門「二」:在spring中使用activemq

訊息中介軟體入門「二」:在spring中使用activemq

在普通的java專案使用activemq一般需要經過:1.獲得連線工廠;2.建立連線;2.啟動連線;3.獲取會話(session);4.繫結連線地址(detination)5.獲得消費者/生產者;6.生產/消費訊息。
而在spring中使用activemq則可以將上述一系列的步驟寫入spring的xml檔案中,拓展性和複用性更好

專案結構

專案結構
【非常簡單有木有】

依賴jar包

<dependency>
        <groupId>org.springframework</groupId
>
<artifactId>spring-context</artifactId> <version>4.2.6.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>
5.15.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.18.RELEASE</version> </dependency>

spring配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--注入自定的消費者監聽器bean和生產者,便於在app中測試--> <context:component-scan base-package="queue"/> <!--activeMq的連線工廠配置--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> <property name="trustAllPackages" value="true"/> </bean> <!--整合到spring的連線工廠中--> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!--生產者傳送訊息的bean,相當於之前生產者繫結destination和send方法都整合在這個bean裡面--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> </bean> <!--佇列--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue"/> </bean> <!--主題--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean> <!--現在監聽器由spring託管,這部分相當於之前消費者的監聽器部分程式碼--> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="messageListener"/> </bean> </beans>

說明:
1.trustAllPackages設定為true表示信任所有,可以為所欲為的使用objectMessage傳輸訊息。
2.CachingConnectionFactory不一定非要這個連線工廠,在org.springframework.jms.connection下有很多連線工廠,具體看官方api。
3.jmsTemplate相當於整合了生產者建立連線的過程,在程式碼中傳送Message,只需從IOC中取出這個bean,將傳送的destinationMessage傳入send方法中即可。
4.注入了兩個destination的bean,一個是佇列模式,一個是主題模式方便生產者和消費者繫結
5.由於消費者只需要確定接收訊息的細節,即訊息處理器[messageListener],因此需要在程式碼中書寫接收到訊息後的細節,並將此bean注入到IOC中,而整個監聽器是由spring託管的,因此放在了org.springframework.jms.listener.DefaultMessageListenerContainer中,這了類裡面實現了建立一個消費者具體處理邏輯,只需要將連線工廠[connectionFactory],監聽地址[queueDestination/topicDestination],和訊息處理器[messageListener]傳入即可。其中監聽地址根據佇列/主題模式傳入對應的Destination。

消費者處理器程式碼:

package queue;

import org.springframework.stereotype.Component;

import javax.jms.*;

@Component("messageListener")
public class QueueConstumer implements MessageListener {
    public void onMessage(Message message) {
        ObjectMessage Message= (ObjectMessage) message;
        try {
            System.out.println("收到訊息:"+Message.getObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

這裡的"messageListener"與xml配置檔案中的jmsContainerbean的messageListener屬性value是對應的,並且生產者傳入的是什麼Message型別消費取出時也必須要用與之對應的型別

生產者程式碼:

package queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import pojo.User;

import javax.annotation.Resource;
import javax.jms.*;

@Component
public class QueueProducer {
    @Autowired
    JmsTemplate jmsTemplate;
    @Resource(name = "queueDestination")
    Destination destination;
    public  void  sendMessage(final User msg){
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                final ObjectMessage objectMessage = session.createObjectMessage(msg);
                return objectMessage;
            }
        });
        System.out.println("傳送訊息:"+msg);
    }
}

解釋:JmsTemplate是前面spring配置檔案中注入進去的,用於生產者傳送Message;而註解 @Resource(name = "queueDestination")後面帶引數是因為在配置檔案中定義的destination都屬於jms.Destination的實現。完全可以寫成ActiveMQQueue/ActiveMQTopic即可不用帶@resource中的引數;傳送訊息指定呼叫JmsTemplate的send方法將Destination和Message傳入即可。若是切換模式[topic/queue],則只需要對應傳入不同的destination即可。

主程式APP程式碼:

import org.springframework.context.support.ClassPathXmlApplicationContext;
import pojo.User;
import queue.QueueProducer;

import java.util.UUID;

public class App {
    public static void main(String[] args) throws InterruptedException {
        ClassPathXmlApplicationContext applicationContext= new ClassPathXmlApplicationContext("activeMq-config.xml");
        QueueProducer producer=applicationContext.getBean(QueueProducer.class);
        User user=new User();
        for (int i = 0; i < 100; i++) {
            user.setAge(i);
            user.setName((i%2==0? "Mr":"Mrs")+String.valueOf((char)(int)(Math.random()*26+'A'+1))+String.valueOf((char)(int)(Math.random()*26+'a')));
            user.setSex(i%2==0? "男":"女");
            user.setSalary((int)(Math.random()*10000));
            user.setNumber(UUID.randomUUID().toString());
            producer.sendMessage(user);
            Thread.sleep(50);
        }
        applicationContext.close();
    }
}

輸出結果:

收到訊息:User{name='MrNk', age=0, sex='男', number='b8fede00-2385-4b36-b70e-0b98dad260e2', salary=655}
傳送訊息:User{name='MrNk', age=0, sex='男', number='b8fede00-2385-4b36-b70e-0b98dad260e2', salary=655}
收到訊息:User{name='MrsJm', age=1, sex='女', number='b3036d44-c7b0-4dbb-9ec2-de9b78e043d4', salary=5531}
傳送訊息:User{name='MrsJm', age=1, sex='女', number='b3036d44-c7b0-4dbb-9ec2-de9b78e043d4', salary=5531}
收到訊息:User{name='MrXs', age=2, sex='男', number='68a2444a-8992-49b0-a9bd-cf00fcfb8763', salary=4556}
傳送訊息:User{name='MrXs', age=2, sex='男', number='68a2444a-8992-49b0-a9bd-cf00fcfb8763', salary=4556}
收到訊息:User{name='MrsGf', age=3, sex='女', number='be4f0533-6721-49df-8174-e5cff5b1e24e', salary=2123}
傳送訊息:User{name='MrsGf', age=3, sex='女', number='be4f0533-6721-49df-8174-e5cff5b1e24e', salary=2123}
收到訊息:User{name='MrBj', age=4, sex='男', number='958cdfa3-b97f-439a-b51c-30855b26d506', salary=5288}

可以看出由於都在一個專案中,消費者和生產者同時執行,因此一旦生產一條訊息,立即被消費者取出。

主題/佇列模式的切換:

由於spring簡化了程式碼,因此切換模式也只需要修改兩個地方即可:


  1. 佇列模式

先修改配置中消費者的監聽器配置:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>

只需要將destination的ref指向佇列下的地址:queueDestination

再修改生產者的@resource註解的name值為佇列下的:queueDestination
@Resource(name = "queueDestination")


  1. 主題模式

主題模式也一樣,將生產者和消費者的detionation對應到:topicDestination即可:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="messageListener"/>
    </bean>

修改@resource註解的引數為:topicDestination
@Resource(name = "topicDestination")