1. 程式人生 > >Springboot+rabbitmq如何實現高併發的rpc呼叫

Springboot+rabbitmq如何實現高併發的rpc呼叫

        一.背景

        公司專案的收單前置服務A與收單服務B之間是通過rabbitmq來通訊的,而且A服務在給B服務傳送資訊後,必須收到返回結果(同步通知),於是使用了此方法:

rabbitTemplate.convertSendAndReceive(reqDto);

        但是,由於B服務的後續處理時間過長,導致A服務接收B服務處理結果的時間大幅增加,TPS只有每秒近2個,遠遠不達標。故對rabbitmq進行優化。

        二.分析

        首先如何實現springboot整合rabbitmq,可以 檢視這篇文章點選開啟連結

        眾所周知,rabbitmq解決的是非同步服務之間的呼叫,即生產者不會關心消費者做了什麼,反之亦然。但是此處的情況有些特殊,生產者必須要等待消費者處理完成並且拿到返回結果,才會結束執行緒。所以,要實現高併發的rpc,必須要能夠開啟多個讓消費者消費的執行緒,用以同時消費多條訊息,用空間換時間。

        三.實現

        首先在收單前置服務模組中,配置rabbitmq。

        用一個RabbitMqConfig類,配置connectionfactory,cacheconnectoionfactory,rabbittemplate,directexchange,此外,除了配置請求佇列和請求佇列的binding以外,必須還需要配置返回佇列(reply-to)以及返回佇列的binding,以及用來監聽返回佇列的監聽器simplemessagelistenercontainer,用以接收消費者返回的訊息。而在這個simplemessagelistenercontainer中,可以配置消費者數量,每個消費者一次性拉取的訊息數,最重要的,它可以配置一個executorservice執行緒池,給你配置的消費者使用。如果不配置這個執行緒池,你配置的消費者將不起作用或者作用很小,此處比較關鍵,是由這篇文章有感而來

點選開啟連結。全部配置程式碼如下:

@Configuration
@PropertySource(value = {"classpath:application.properties"})
public class FixedReplyQueueConfig {
    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory connectionFactory(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(env.getProperty("mq.host"));
        connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
        connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
        connectionFactory.setUsername(env.getProperty("mq.username").trim());
        connectionFactory.setPassword(env.getProperty("mq.password").trim());
//        ExecutorService service= Executors.newFixedThreadPool(20);//500個執行緒的執行緒池
//        connectionFactory.setSharedExecutor(service);
        return connectionFactory;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory(){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory());
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    //定義rabbitmqTemplate
    @Bean
    public RabbitTemplate fixedReplyQRabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
        template.setExchange(requestExchange().getName());
        template.setRoutingKey("PRE_RPC");
        template.setReplyAddress(requestExchange().getName() + "/" + replyQueue().getName());
        //template.setReceiveTimeout(60000);
        template.setReplyTimeout(60000);
        return template;
    }


    //rabbitmqTemplate監聽返回佇列的訊息
    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(fixedReplyQRabbitTemplate());
        ExecutorService executorService=Executors.newFixedThreadPool(300);  //300個執行緒的執行緒池
        container.setTaskExecutor(executorService);
        container.setConcurrentConsumers(200);
        container.setPrefetchCount(5);
        return container;
    }

    //請求佇列和交換器繫結
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(requestQueue()).to(requestExchange()).with("PRE_RPC");
    }

    //返回佇列和交換器繫結
    @Bean
    public Binding replyBinding() {
        return BindingBuilder.bind(replyQueue())
                .to(requestExchange())
                .with(replyQueue().getName());
    }

    //請求佇列
    @Bean
    public Queue requestQueue() {
        String queueName = env.getProperty("mq.pre.queue");
        boolean durable = Boolean.valueOf(env.getProperty("mq.pre.queue.durable"));
        boolean exclusive = Boolean.valueOf(env.getProperty("mq.pre.queue.exclusive"));
        boolean autoDelete = Boolean.valueOf(env.getProperty("mq.pre.queue.autoDelete"));
        return new Queue(queueName,durable,exclusive,autoDelete);
    }

    //每個應用例項監聽的返回佇列
    @Bean
    public Queue replyQueue() {
        String queueName = env.getProperty("mq.prereply.queue")+UUID.randomUUID().toString();
        boolean durable = Boolean.valueOf(env.getProperty("mq.prereply.queue.durable"));
        boolean exclusive = Boolean.valueOf(env.getProperty("mq.prereply.queue.exclusive"));
        boolean autoDelete = Boolean.valueOf(env.getProperty("mq.prereply.queue.autoDelete"));
        return new Queue(queueName,durable,exclusive,autoDelete);
    }

    //交換器
    @Bean
    public DirectExchange requestExchange() {
       return new DirectExchange("PRE_DIRECT_EXCHANGE", false, true);
    }


    @Bean
    public RabbitAdmin admin() {
        return new RabbitAdmin(rabbitConnectionFactory());
    }
}

        rabbitmq的perporties檔案根據自己需要編寫。       

         配置完了請求佇列和返回佇列,也配置了監聽返回佇列的simplemessagelistennercontainer,那麼接下來就是為B服務模組配置監聽請求佇列的containner。

        在消費訊息的方法上,加上註解

 @RabbitListener(queues = "PRE_REQUEST", containerFactory = "containerFactory")

    queues是監聽的佇列,containerFactory是為監聽佇列配置的containner工廠,和上面的simplemessagelistennercontainer是一樣的,具體在配置類中配置它。

 @Bean
    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
        ExecutorService service=Executors.newFixedThreadPool(600);
        factory.setTaskExecutor(service);
        factory.setConcurrentConsumers(500);
        factory.setPrefetchCount(5);
        configurer.configure(factory,cachingConnectionFactory());
        return factory;
    }

        這樣,就為請求佇列也分配了多個消費者,以及供消費者使用的執行緒池。

        四.總結

        經過配置之後,這種高消耗的支付效能已經大幅提升能達到70tps,這是單機的並行數量。關鍵點在於,僅僅配置多個消費者是不起作用的,因為rabbitmq有共享執行緒池的機制(預設50個執行緒,以防執行緒浪費)。為什麼會有這個機制呢?是因為按常理來說,一般的消費者收到訊息,執行緒就結束了,50個執行緒是夠用的,但此處必須B服務收到訊息後,還要先發給支付寶進行支付,再把支付結果傳回A服務,如此一來執行緒持續時間大大加長,想要開啟多個執行緒的話,執行緒池就不夠用了。

        此外,還有幾點需要注意:

        1.具體要完完整整配置程式碼較多,需要根據需求制定細節,歡迎與我交流。

        2.此處依然有一處疑問,因為connectionfactory也是可以配置執行緒池的,連結中的那篇文章就是這麼做的,但其實是不能起到為消費者提供執行緒的效果的。