Springboot+rabbitmq如何實現高併發的rpc呼叫
一.背景
公司專案的收單前置服務A與收單服務B之間是通過rabbitmq來通訊的,而且A服務在給B服務傳送資訊後,必須收到返回結果(同步通知),於是使用了此方法:
rabbitTemplate.convertSendAndReceive(reqDto);
但是,由於B服務的後續處理時間過長,導致A服務接收B服務處理結果的時間大幅增加,TPS只有每秒近2個,遠遠不達標。故對rabbitmq進行優化。
二.分析
首先如何實現springboot整合rabbitmq,可以 檢視這篇文章點選開啟連結。
眾所周知,rabbitmq解決的是非同步服務之間的呼叫,即生產者不會關心消費者做了什麼,反之亦然。但是此處的情況有些特殊,生產者必須要等待消費者處理完成並且拿到返回結果,才會結束執行緒。所以,要實現高併發的rpc,必須要能夠開啟多個讓消費者消費的執行緒,用以同時消費多條訊息,用空間換時間。
三.實現
首先在收單前置服務模組中,配置rabbitmq。
用一個RabbitMqConfig類,配置connectionfactory,cacheconnectoionfactory,rabbittemplate,directexchange,此外,除了配置請求佇列和請求佇列的binding以外,必須還需要配置返回佇列(reply-to)以及返回佇列的binding,以及用來監聽返回佇列的監聽器simplemessagelistenercontainer,用以接收消費者返回的訊息。而在這個simplemessagelistenercontainer中,可以配置消費者數量,每個消費者一次性拉取的訊息數,最重要的,它可以配置一個executorservice執行緒池,給你配置的消費者使用。如果不配置這個執行緒池,你配置的消費者將不起作用或者作用很小,此處比較關鍵,是由這篇文章有感而來
@Configuration @PropertySource(value = {"classpath:application.properties"}) public class FixedReplyQueueConfig { @Autowired private Environment env; @Bean public ConnectionFactory connectionFactory(){ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(env.getProperty("mq.host")); connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim())); connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim()); connectionFactory.setUsername(env.getProperty("mq.username").trim()); connectionFactory.setPassword(env.getProperty("mq.password").trim()); // ExecutorService service= Executors.newFixedThreadPool(20);//500個執行緒的執行緒池 // connectionFactory.setSharedExecutor(service); return connectionFactory; } @Bean public CachingConnectionFactory rabbitConnectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory()); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } //定義rabbitmqTemplate @Bean public RabbitTemplate fixedReplyQRabbitTemplate() { RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory()); template.setExchange(requestExchange().getName()); template.setRoutingKey("PRE_RPC"); template.setReplyAddress(requestExchange().getName() + "/" + replyQueue().getName()); //template.setReceiveTimeout(60000); template.setReplyTimeout(60000); return template; } //rabbitmqTemplate監聽返回佇列的訊息 @Bean public SimpleMessageListenerContainer replyListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueues(replyQueue()); container.setMessageListener(fixedReplyQRabbitTemplate()); ExecutorService executorService=Executors.newFixedThreadPool(300); //300個執行緒的執行緒池 container.setTaskExecutor(executorService); container.setConcurrentConsumers(200); container.setPrefetchCount(5); return container; } //請求佇列和交換器繫結 @Bean public Binding binding() { return BindingBuilder.bind(requestQueue()).to(requestExchange()).with("PRE_RPC"); } //返回佇列和交換器繫結 @Bean public Binding replyBinding() { return BindingBuilder.bind(replyQueue()) .to(requestExchange()) .with(replyQueue().getName()); } //請求佇列 @Bean public Queue requestQueue() { String queueName = env.getProperty("mq.pre.queue"); boolean durable = Boolean.valueOf(env.getProperty("mq.pre.queue.durable")); boolean exclusive = Boolean.valueOf(env.getProperty("mq.pre.queue.exclusive")); boolean autoDelete = Boolean.valueOf(env.getProperty("mq.pre.queue.autoDelete")); return new Queue(queueName,durable,exclusive,autoDelete); } //每個應用例項監聽的返回佇列 @Bean public Queue replyQueue() { String queueName = env.getProperty("mq.prereply.queue")+UUID.randomUUID().toString(); boolean durable = Boolean.valueOf(env.getProperty("mq.prereply.queue.durable")); boolean exclusive = Boolean.valueOf(env.getProperty("mq.prereply.queue.exclusive")); boolean autoDelete = Boolean.valueOf(env.getProperty("mq.prereply.queue.autoDelete")); return new Queue(queueName,durable,exclusive,autoDelete); } //交換器 @Bean public DirectExchange requestExchange() { return new DirectExchange("PRE_DIRECT_EXCHANGE", false, true); } @Bean public RabbitAdmin admin() { return new RabbitAdmin(rabbitConnectionFactory()); } }
rabbitmq的perporties檔案根據自己需要編寫。
配置完了請求佇列和返回佇列,也配置了監聽返回佇列的simplemessagelistennercontainer,那麼接下來就是為B服務模組配置監聽請求佇列的containner。
在消費訊息的方法上,加上註解
@RabbitListener(queues = "PRE_REQUEST", containerFactory = "containerFactory")
queues是監聽的佇列,containerFactory是為監聽佇列配置的containner工廠,和上面的simplemessagelistennercontainer是一樣的,具體在配置類中配置它。
@Bean
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer){
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
ExecutorService service=Executors.newFixedThreadPool(600);
factory.setTaskExecutor(service);
factory.setConcurrentConsumers(500);
factory.setPrefetchCount(5);
configurer.configure(factory,cachingConnectionFactory());
return factory;
}
這樣,就為請求佇列也分配了多個消費者,以及供消費者使用的執行緒池。
四.總結
經過配置之後,這種高消耗的支付效能已經大幅提升能達到70tps,這是單機的並行數量。關鍵點在於,僅僅配置多個消費者是不起作用的,因為rabbitmq有共享執行緒池的機制(預設50個執行緒,以防執行緒浪費)。為什麼會有這個機制呢?是因為按常理來說,一般的消費者收到訊息,執行緒就結束了,50個執行緒是夠用的,但此處必須B服務收到訊息後,還要先發給支付寶進行支付,再把支付結果傳回A服務,如此一來執行緒持續時間大大加長,想要開啟多個執行緒的話,執行緒池就不夠用了。
此外,還有幾點需要注意:
1.具體要完完整整配置程式碼較多,需要根據需求制定細節,歡迎與我交流。
2.此處依然有一處疑問,因為connectionfactory也是可以配置執行緒池的,連結中的那篇文章就是這麼做的,但其實是不能起到為消費者提供執行緒的效果的。