訊息中介軟體入門「二」:在spring中使用activemq
訊息中介軟體入門「二」:在spring中使用activemq
在普通的java專案使用activemq一般需要經過:1.獲得連線工廠;2.建立連線;2.啟動連線;3.獲取會話(session);4.繫結連線地址(detination)5.獲得消費者/生產者;6.生產/消費訊息。
而在spring中使用activemq則可以將上述一系列的步驟寫入spring的xml檔案中,拓展性和複用性更好
專案結構
【非常簡單有木有】
依賴jar包
<dependency>
<groupId>org.springframework</groupId >
<artifactId>spring-context</artifactId>
<version>4.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version> 5.15.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.18.RELEASE</version>
</dependency>
spring配置檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--注入自定的消費者監聽器bean和生產者,便於在app中測試-->
<context:component-scan base-package="queue"/>
<!--activeMq的連線工廠配置-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="trustAllPackages" value="true"/>
</bean>
<!--整合到spring的連線工廠中-->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!--生產者傳送訊息的bean,相當於之前生產者繫結destination和send方法都整合在這個bean裡面-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!--佇列-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue"/>
</bean>
<!--主題-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
<!--現在監聽器由spring託管,這部分相當於之前消費者的監聽器部分程式碼-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>
說明:
1.trustAllPackages
設定為true表示信任所有,可以為所欲為的使用objectMessage傳輸訊息。
2.CachingConnectionFactory
不一定非要這個連線工廠,在org.springframework.jms.connection
下有很多連線工廠,具體看官方api。
3.jmsTemplate
相當於整合了生產者建立連線的過程,在程式碼中傳送Message,只需從IOC中取出這個bean,將傳送的destination
和Message
傳入send方法中即可。
4.注入了兩個destination
的bean,一個是佇列模式,一個是主題模式方便生產者和消費者繫結
5.由於消費者只需要確定接收訊息的細節,即訊息處理器[messageListener
],因此需要在程式碼中書寫接收到訊息後的細節,並將此bean注入到IOC中,而整個監聽器是由spring託管的,因此放在了org.springframework.jms.listener.DefaultMessageListenerContainer
中,這了類裡面實現了建立一個消費者具體處理邏輯,只需要將連線工廠[connectionFactory
],監聽地址[queueDestination/topicDestination
],和訊息處理器[messageListener
]傳入即可。其中監聽地址根據佇列/主題模式傳入對應的Destination。
消費者處理器程式碼:
package queue;
import org.springframework.stereotype.Component;
import javax.jms.*;
@Component("messageListener")
public class QueueConstumer implements MessageListener {
public void onMessage(Message message) {
ObjectMessage Message= (ObjectMessage) message;
try {
System.out.println("收到訊息:"+Message.getObject());
} catch (Exception e) {
e.printStackTrace();
}
}
}
這裡的
"messageListener"
與xml配置檔案中的jmsContainer
bean的messageListener
屬性value是對應的,並且生產者傳入的是什麼Message型別消費取出時也必須要用與之對應的型別
生產者程式碼:
package queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import pojo.User;
import javax.annotation.Resource;
import javax.jms.*;
@Component
public class QueueProducer {
@Autowired
JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
Destination destination;
public void sendMessage(final User msg){
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
final ObjectMessage objectMessage = session.createObjectMessage(msg);
return objectMessage;
}
});
System.out.println("傳送訊息:"+msg);
}
}
解釋:
JmsTemplate
是前面spring配置檔案中注入進去的,用於生產者傳送Message;而註解@Resource(name = "queueDestination")
後面帶引數是因為在配置檔案中定義的destination都屬於jms.Destination的實現。完全可以寫成ActiveMQQueue/ActiveMQTopic即可不用帶@resource
中的引數;傳送訊息指定呼叫JmsTemplate
的send方法將Destination和Message傳入即可。若是切換模式[topic/queue],則只需要對應傳入不同的destination即可。
主程式APP程式碼:
import org.springframework.context.support.ClassPathXmlApplicationContext;
import pojo.User;
import queue.QueueProducer;
import java.util.UUID;
public class App {
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext applicationContext= new ClassPathXmlApplicationContext("activeMq-config.xml");
QueueProducer producer=applicationContext.getBean(QueueProducer.class);
User user=new User();
for (int i = 0; i < 100; i++) {
user.setAge(i);
user.setName((i%2==0? "Mr":"Mrs")+String.valueOf((char)(int)(Math.random()*26+'A'+1))+String.valueOf((char)(int)(Math.random()*26+'a')));
user.setSex(i%2==0? "男":"女");
user.setSalary((int)(Math.random()*10000));
user.setNumber(UUID.randomUUID().toString());
producer.sendMessage(user);
Thread.sleep(50);
}
applicationContext.close();
}
}
輸出結果:
收到訊息:User{name='MrNk', age=0, sex='男', number='b8fede00-2385-4b36-b70e-0b98dad260e2', salary=655}
傳送訊息:User{name='MrNk', age=0, sex='男', number='b8fede00-2385-4b36-b70e-0b98dad260e2', salary=655}
收到訊息:User{name='MrsJm', age=1, sex='女', number='b3036d44-c7b0-4dbb-9ec2-de9b78e043d4', salary=5531}
傳送訊息:User{name='MrsJm', age=1, sex='女', number='b3036d44-c7b0-4dbb-9ec2-de9b78e043d4', salary=5531}
收到訊息:User{name='MrXs', age=2, sex='男', number='68a2444a-8992-49b0-a9bd-cf00fcfb8763', salary=4556}
傳送訊息:User{name='MrXs', age=2, sex='男', number='68a2444a-8992-49b0-a9bd-cf00fcfb8763', salary=4556}
收到訊息:User{name='MrsGf', age=3, sex='女', number='be4f0533-6721-49df-8174-e5cff5b1e24e', salary=2123}
傳送訊息:User{name='MrsGf', age=3, sex='女', number='be4f0533-6721-49df-8174-e5cff5b1e24e', salary=2123}
收到訊息:User{name='MrBj', age=4, sex='男', number='958cdfa3-b97f-439a-b51c-30855b26d506', salary=5288}
可以看出由於都在一個專案中,消費者和生產者同時執行,因此一旦生產一條訊息,立即被消費者取出。
主題/佇列模式的切換:
由於spring簡化了程式碼,因此切換模式也只需要修改兩個地方即可:
- 佇列模式
先修改配置中消費者的監聽器配置:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
只需要將destination
的ref指向佇列下的地址:queueDestination
。
再修改生產者的
@resource
註解的name
值為佇列下的:queueDestination
@Resource(name = "queueDestination")
- 主題模式
主題模式也一樣,將生產者和消費者的detionation對應到:
topicDestination
即可:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
修改@resource註解的引數為:topicDestination
@Resource(name = "topicDestination")