1. 程式人生 > >springboot activemq 2 持久化訊息 與 持久化訂閱

springboot activemq 2 持久化訊息 與 持久化訂閱

改動1.減少springboot重複建立session的問題
jmsTemplate的地方加入了CachingConnectionFactory,這樣配置可以

    @Bean(name = "myJmsTemplate")
    public JmsTemplate getJmsTemplate(ActiveMQConnectionFactory nonXaJmsConnectionFactory, MessageConverter jacksonJmsMessageConverter){
        //使用CachingConnectionFactory可以提高部分效能。
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setSessionCacheSize(100);
        cachingConnectionFactory.setTargetConnectionFactory(nonXaJmsConnectionFactory);
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        //設定deliveryMode(持久化), priority, timeToLive必須開啟
        jmsTemplate.setExplicitQosEnabled(true);
        // 設定訊息是否持久化
        jmsTemplate.setDeliveryPersistent(true);
        // 設定訊息轉換器
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
        // 設定訊息是否以事務
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

改動2 : 加入訂閱持久化
什麼是訂閱持久化?簡單講就是MQ記住了給每個訂閱者傳送到了哪裡,如果中途發生故障又恢復後,可以從記憶的位置繼續傳送給訂閱者。預設是非持久化訂閱,即:訂閱者在宕機重啟之後,佇列中有訊息,訂閱者也是收不到任何訊息的,即便這些訊息已經持久化在日誌檔案或者資料庫中。
將原來預設的訊息監聽容器替換為如下兩個配置Bean。這是兩個分別的監聽Bean,並且他們開啟了持久化訂閱引數setSubscriptionDurable( true ) ,並且擁有各自的ID,factory.setClientId(“10001”); factory.setClientId(“10002”);具體為什麼要這麼設定可以參看這篇博文

https://www.tuicool.com/articles/UfimyuR

    @Bean(name = "topicContainerFactory1")
    public DefaultJmsListenerContainerFactory topicClient1(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){
        DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer);
        factory.setClientId("10001");
        return factory;
    }

    @Bean(name = "topicContainerFactory2")
    public DefaultJmsListenerContainerFactory topicClient2(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){
        DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer);
        factory.setClientId("10002");
        return factory;
    }

    /**
     *
     * @param connectionFactory
     * @param configurer
     * @return
     */

    public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory,connectionFactory);
        factory.setPubSubDomain(true);
        factory.setSessionTransacted(true);
        factory.setAutoStartup(true);
        //開啟持久化訂閱
        factory.setSubscriptionDurable(true);
        return factory;
    }

改動3:對同一個佇列的訊息多添加了一個訂閱者。
MQ可以在訂閱者第一次訂閱的時候記憶每個訂閱者的傳送位置,之後進行分別的推送。

@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory1") // 監聽指定訊息主題
public void receiveMessage1(Message message) throws Exception {
    log.debug("[接收佇列1] [收到訊息] {}",message);
}

@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory2") // 監聽指定訊息主題
public void receiveMessage1(Message message) throws Exception {
    log.debug("[接收佇列2] [收到訊息] {}",message);
}

測試結果:

1>同時開啟發布者和兩個訂閱者,釋出者向MQ推送訊息。訂閱者接收。

2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收佇列1] [收到訊息] Message
2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收佇列1] [收到訊息] Message
2017-10-23 01:09:33.407 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.461 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收佇列1] [收到訊息] Message

2>之後關閉消費者。釋出者再發布3條訊息。

3>此時可以將MQ服務關閉,模擬宕機。然後啟動MQ服務,啟動一個訂閱者,緊接著會受到後續未傳送的3條訊息,然後再啟動另外的訂閱者,緊接著也會受到後續的三條訊息。