1. 程式人生 > >spring boot-同時使用jms的Queue(佇列)和Topic(釋出訂閱)

spring boot-同時使用jms的Queue(佇列)和Topic(釋出訂閱)

前言

但是最近遇到一個需求,需要同時使用jms的佇列和topic,於是就有了下面的測試程式碼:

消費者程式碼

@Component // 必須加此註解,該類才會被作為Message Driven POJO使用
public class Consumer {
	@JmsListener(destination = "mytest.queue")
	public void receiveQueue(TextMessage text) throws Exception {
		System.out.println(Thread.currentThread().getName()+":Consumer收到的報文為:"+text.getText());
	}
	
	@JmsListener(destination="mytest.topic")
	public void receiveTopic(TextMessage text) throws JMSException{
		System.out.println(text.getText());
	}
}
生產者程式碼
@Service("producer")
public class Producer {
	@Autowired
	private JmsMessagingTemplate jmsTemplate;
	
	public void sendMessage(Destination destination, final String message){
		jmsTemplate.convertAndSend(destination, message);
	}
}
測試程式碼
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableAsync //開啟對Async的支援,否則非同步任務不啟作用
public class SpringbootJmsApplicationTests {
	
	@Autowired
	private Producer producer;
	
	@Test
	public void contextLoads() throws InterruptedException {
		while(true){
			Destination destination = new ActiveMQQueue("mytest.queue");
			Destination topic = new ActiveMQTopic("mytest.topic");
			
			for(int i=0; i<100; i++){
				producer.sendMessage(destination, "myname is chhliu!!!"+i);
				producer.sendMessage(topic, "i'm the king of the world!");
			}
		}
	}
}
測試結果如下:
DefaultMessageListenerContainer-6:Consumer收到的報文為:myname is chhliu!!!0
DefaultMessageListenerContainer-3:Consumer收到的報文為:myname is chhliu!!!1
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!2
DefaultMessageListenerContainer-4:Consumer收到的報文為:myname is chhliu!!!3
DefaultMessageListenerContainer-5:Consumer收到的報文為:myname is chhliu!!!4
DefaultMessageListenerContainer-2:Consumer收到的報文為:myname is chhliu!!!5
DefaultMessageListenerContainer-6:Consumer收到的報文為:myname is chhliu!!!6
DefaultMessageListenerContainer-3:Consumer收到的報文為:myname is chhliu!!!7
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!8
DefaultMessageListenerContainer-4:Consumer收到的報文為:myname is chhliu!!!9
發現一個奇怪的現象,貌似topic沒有起效果。於是在配置檔案中加了一個配置,配置如下:
spring.jms.pub-sub-domain=true
然後再跑了一遍上面的測試程式碼,測試結果如下:
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
i'm the king of the world!
這次結果是徹底的反了過來,topic生效了,queue卻沒起作用了。

to be or not to be that's a question!怎麼能讓兩者同時生效了!至少從前面的兩次測試結果可以看出,控制哪個生效,是通過設定pubsubdomain來實現的,於是我們解決方案的出發點也在此。

解決方案如下:

@Configuration
@EnableJms
public class JmsConfig {
	@Bean
	public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
	    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	    factory.setPubSubDomain(true);
	    factory.setConnectionFactory(connectionFactory);
	    return factory;
	}

	@Bean
	public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
	    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	    factory.setPubSubDomain(false);
	    factory.setConnectionFactory(connectionFactory);
	    return factory;
	}
}
上面的程式碼的作用是建立了兩個JmsListenerContainerFactory,分別是topicListenerFactory和queueListenerFactory,其中topicListenerFactory建立的時候,將pubSubDomain設定成了true,表示該Listener負責處理Topic;queueListenerFactory建立的時候,將pubSubDomain設定成了false,也就是說,jms預設就是queue模式,該Listener主要負責處理Queue。

修改消費者程式碼:

@Component // 必須加此註解,該類才會被作為Message Driven POJO使用
public class Consumer {
	@JmsListener(destination = "mytest.queue" ,containerFactory="queueListenerFactory")// 增加對應處理的監聽器工程
	public void receiveQueue(TextMessage text) throws Exception {
		System.out.println(Thread.currentThread().getName()+":Consumer收到的報文為:"+text.getText());
	}
	
	@JmsListener(destination="mytest.topic", containerFactory="topicListenerFactory")// 增加對應處理的監聽器工程
	public void receiveTopic(TextMessage text) throws JMSException{
		System.out.println(text.getText());
	}
}
再跑一下前面的測試,結果如下:
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!1
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!2
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!3
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!4
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!5
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!6
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!7
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!8
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!9
i'm the king of the world!
DefaultMessageListenerContainer-1:Consumer收到的報文為:myname is chhliu!!!10
i'm the king of the world!

我們發現,Queue和Topic都生效了。但是卻產生了另外一個問題,併發消費不啟作用了,從上面跑的測試結果可以看出,消費執行緒都是DefaultMessageListenerContainer-1這個執行緒,那怎麼來解決這個問題了,突破口在DefaultJmsListenerContainerFactory上,該類有很多的配置,我們可以根據專案的需求來新增配置,例如,如果想併發消費生效,可以加如下配置

factory.setTaskExecutor(Executors.newFixedThreadPool(6));
	    factory.setConcurrency("6");
其他的配置,可以看原始碼瞭解。