1. 程式人生 > >SPRING 整合 activemq 的 topic 模式

SPRING 整合 activemq 的 topic 模式

概要

activemq 支援兩種模式:

1.佇列模式

2. 釋出訂閱者模式,topic有一個主題可以有多個訂閱者。這種情況可以將一個訊息,分發到多個消費者。

實現程式碼

1.生產者

import java.util.Map;

import javax.annotation.Resource;
import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsTemplate;

public
class TopicProducer { @Resource(name = "topicJmsTemplate") private JmsTemplate jmsTemplate; private Map<String,Destination> topicMap=new java.util.concurrent.ConcurrentHashMap<>(); /** * 說明:傳送的時候如果這裡沒有顯示的指定destination.將用spring xml中配置的destination * @param destination *
@param message */ public void sendMqMessage(String topicName, Object model){ Destination dest=null; if(!topicMap.containsKey(topicName)){ dest=new ActiveMQTopic(topicName); topicMap.put(topicName, dest); } else{ dest
=topicMap.get(topicName); } jmsTemplate.convertAndSend(dest, model); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }

生產者XML配置

<!-- topic 連線工廠 -->
    <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
        <property name="useAsyncSend" value="true" />
        <property name="clientID" value="providerClienctConnect" />
    </bean>
    
    <bean id="topicDestination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="userOrgTopic"/>
    </bean>

    <!-- 訊息傳送者客戶端 -->
    <bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="providerConnectionFactory" />
        <property name="defaultDestination" ref="topicDestination" />
        <!-- 開啟訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="receiveTimeout" value="10000" />
        <!-- deliveryMode, priority, timeToLive 的開關要生效,必須配置為true,預設false-->
        <property name="explicitQosEnabled" value="true"/>
        <!-- 傳送模式
             DeliveryMode.NON_PERSISTENT=1:非持久 ;
             DeliveryMode.PERSISTENT=2:持久
        -->
        <property name="deliveryMode" value="1"/>
    </bean>
    
    <bean id="topicProducer" class="com.aps.core.jms.TopicProducer"></bean>

deliveryMode :

1:非持久 就是如果訊息傳送後,沒有消費者啟動,重啟服務後,那麼訊息將會消失。

2.持久 訊息傳送後,訊息沒有被消費,重啟服務後,訊息依然存在。

 

2.配置消費者

java 程式碼

import javax.jms.Message;
import javax.jms.MessageListener;

public class ConsumerMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
//        ObjectMessage obj=(ObjectMessage)message;
//        try {
//            OsUser user=(OsUser) obj.getObject();
//            System.out.println(user.getFullname());
//        } catch (JMSException e) {
//            // TODO Auto-generated catch block
//            e.printStackTrace();
//        }
//        System.out.println("ok");
    }

}

xml 配置

<!-- 配置JMS連線工廠 -->
    <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
        <property name="useAsyncSend" value="true" />
        <property name="clientID" value="consumerClienctConnect" />
    </bean>

    <!-- 定義訊息Destination -->
    <bean id="topic1Destination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="mytopic"/>
    </bean>
    
    <bean id="topic2Destination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="mytopic"/>
    </bean>

    <!-- 配置訊息消費監聽者 -->
    <bean id="consumerMessageListener" class="com.aps.jms.ConsumerMessageListener" />

    <!-- 訊息訂閱客戶端1 -->
    <bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerConnectionFactory" />
        <!-- 開啟訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="topic1Destination" />
        <property name="subscriptionDurable" value="true"/>
        <!---這裡是設定接收客戶端的ID,在持久化時,但這個客戶端不線上時,訊息就存在資料庫裡,直到被這個ID的客戶端消費掉-->
        <property name="clientId" value="consumerClient1"/>
        <property name="messageListener" ref="consumerMessageListener" />
        <!-- 訊息應答方式
             Session.AUTO_ACKNOWLEDGE  訊息自動簽收
             Session.CLIENT_ACKNOWLEDGE  客戶端呼叫acknowledge方法手動簽收
             Session.DUPS_OK_ACKNOWLEDGE 不必必須簽收,訊息可能會重複傳送
        -->
        <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

    <!-- 訊息訂閱客戶端2 -->
    <bean id="consumerListenerClient2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerConnectionFactory" />
        <!-- 開啟訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="topic2Destination" />
        <property name="subscriptionDurable" value="true"/>
        <!---這裡是設定接收客戶端的ID,在持久化時,但這個客戶端不線上時,訊息就存在資料庫裡,直到被這個ID的客戶端消費掉-->
        <property name="clientId" value="consumerClient2"/>
        <property name="messageListener" ref="consumerMessageListener" />
        <!-- 訊息應答方式
             Session.AUTO_ACKNOWLEDGE  訊息自動簽收
             Session.CLIENT_ACKNOWLEDGE  客戶端呼叫acknowledge方法手動簽收
             Session.DUPS_OK_ACKNOWLEDGE 不必必須簽收,訊息可能會重複傳送
        -->
        <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

這裡可以看到,我們配置了兩個消費者。

我們測試的時候,發一個訊息,可以看到兩個收到兩次訊息。