ActiveMQ整合spring實現持久化訊息接收
在我們生產過程中往往存在兩個專案介面呼叫場景,但是這中場景下我們很難保證百分百的網路問題和服務問題,所在就會導致我們在呼叫介面的時候連線超時或者訪問不到的情況發生,以致我們的資料丟失。
出現以上問題不用擔心,本文就是重點介紹如何通過ActiveMQ的持久化操作解決請求丟失資料。
首先我們需要了解一下ActiveMQ的持久化方式,多的不說我們這裡介紹兩種持久化方式,一個是持久化到檔案;一個是持久化到資料庫,下面就逐一介紹:
1.持久化到檔案
持久化到檔案,預設儲存方式為kahaDB。activemq.xml中持久化配置:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
這是ActiveMQ的預設配置
假如設定傳送訊息時設定訊息為持久化,這時候啟動了broken伺服器,訊息傳送者,但是訊息接收者沒有啟動,那麼持久化檔案中會把訊息儲存到檔案中;直到訊息接受者接收訊息之後會把訊息記錄刪除。以下kahadb目錄為儲存的訊息記錄。
2.持久化到資料庫中,如mysql和oracle;
首先需要下載資料庫驅動包,這裡使用的是mysql資料庫,所以下載了mysql-connector-java-5.0.4-bin.jar,並把jar包放入到apache-activemq-5.8.0\lib目錄下。
修改activemq.xml配置檔案:
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
同時在activemq.xml檔案的broken節點外面新增如下配置:
<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
在資料庫中新建activemq資料庫,這是啟動activemq.bat後,會在資料庫下新建三個表:
這時先啟動訊息傳送者,會在activemq_msgs下看到多了一條記錄。
再啟動訊息接受者,會看到表中記錄已被刪除。
active的配置方式已說完,現在來說一下怎麼整合spring實現功能:
1.生產者的配置
web.xml加入我們activeMQ的配置檔案
applicationContext-activeMQ.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">
<!-- ActiveMQ 真正產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"></property>
<property name="password" value="admin"></property>
</bean>
<!-- 配置spring 管理 connectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="activeMqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- queue(佇列模式),點對點 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="pubSubDomain" value="false" />
<property name="explicitQosEnabled" value="true" /> <!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,預設false-->
<property name="deliveryMode" value="2" /> <!-- 傳送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
</bean>
<!-- topic(主題,釋出/訂閱),一對多 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="pubSubDomain" value="true" />
<!-- 設定接收超時時間 60秒
<property name="receiveTimeout" value="60000"/>
-->
<!-- 訊息不持久化 -->
<property name="explicitQosEnabled" value="false"></property>
</bean>
</beans>
生產者控制層(呼叫傳送訊息):
@Controller
@RequestMapping("test")
public class TestController {
@Autowired
TopicSender topicSender;
@RequestMapping("send")
@ResponseBody
public String sendMessage(){
System.out.println("開始傳送訊息");
topicSender.send("goodsAddTopic", "商品新增成功,開始更新商品索引庫");
return "test success";
}
@RequestMapping("send2")
@ResponseBody
public String sendMessage2(){
System.out.println("開始傳送訊息-------------2");
topicSender.send("goods2AddTopic", "商品新增成功,開始更新商品索引庫---------2");
return "test success";
}
}
TopicSender.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class TopicSender {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
/**
* 傳送一條訊息到指定的佇列(目標)
* @param queueName 佇列名稱
* @param message 訊息內容
*/
public void send(String topicName,final String message){
jmsTemplate.send(topicName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
注:TopicSender一定要被spring掃描不然會報空指標
2.消費者配置
web.xml與生產者一樣
applicationContext-activeMQ.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">
<!-- ActiveMQ 真正產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"></property>
<property name="password" value="admin"></property>
</bean>
<!-- 配置spring 管理 connectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="activeMqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!--這個是PTP目的地,一對多的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="goodsAddTopic,goods2AddTopic" />
</bean>
<!--這個是主題(topic)目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="goodsAddTopic,goods2AddTopic" />
</bean>
<!-- 訊息監聽類 -->
<bean id="goodsAddMessageListener" class="com.hoomsun.message.GoodsESMessageLister"/>
<!-- 訊息監聽器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="queueDestination"></property>
<property name="messageListener" ref="goodsAddMessageListener"></property>
</bean>
</beans>
GoodsESMessageLister.java
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 訊息監聽器,訊息的消費者
* @author back
*/
public class GoodsESMessageLister implements MessageListener{
@Override
public void onMessage(Message message) {
try {
Destination destination=message.getJMSDestination();
String topci=destination.toString();
System.out.println("topc---44444------"+topci);
TextMessage textMessage = (TextMessage) message;
System.out.println("=======商品新增成功,從訊息佇列中讀取同步索引請求================"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
配置完成之後就可以測試,
①、先啟動生產者;
②、訪問生產者方法傳送訊息;
③、啟動消費者,看消費者是否能接受到之前生產者傳送的訊息。
如果接收到證明配置成功。
最後貼上pom.xml中需用到的jar包
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.3.RELEASE</version>
</dependency>
對你有幫助就記得關注一下,我會不定期的更新一些好的技術。