1. 程式人生 > >Spring boot整合RabbitMQ的簡單使用

Spring boot整合RabbitMQ的簡單使用

最近訊息佇列的使用比較頻繁,目前我使用比較多的就是RabbitMQ了,在專案中一般使用訊息佇列的場景有如下幾個地方。

1.非同步的處理:比如在註冊,或者專案中狀態改變需要給對應的角色傳送郵件,簡訊的時候。應該採用訊息佇列把事件放入佇列,讓傳送郵件的服務去做傳送的事件。

2.應用解耦:比如訂單和庫存的系統中,原來是有了訂單就會通知庫存發生改變。如果庫存發生未知錯誤,那麼訂單也會失敗。這樣是不大合理的。現在是將訂單系統和庫存系統分離。將訂單訊息持久化到訊息佇列中,就算庫存系統失效,也能根據持久化的訊息來保持系統的完整性。

3.日誌系統:日誌直接寫入資料庫,當有大量資料的時候,資料庫壓力過大。可以採用Kalfka訊息中介軟體處理。

目前RabbitMQ的抽象訊息模型如下:

生產者(producer)建立訊息,消費者(consumer)訂閱某個佇列。然後釋出到佇列(queue)中,最後將訊息傳送到監聽的消費者。

上面只是一個抽象的模型,由於RabbitMQ實現了AMQP 協議,所以實際上也是AMQP的模型。多了一個exchange,生產者的訊息不會直接傳送給佇列,而是經過exchange去分發給佇列。

生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,生產者就可以在傳送訊息給Exchange時,通過指定routing key來決定訊息流向哪裡。

RabbitMQ常用的Exchange Type有三種:fanout、direct、topic:

    fanout:把所有傳送到該Exchange的訊息投遞到所有與它繫結的佇列中。

     direct:把訊息投遞到那些binding key與routing key完全匹配的佇列中。

topic:將訊息路由到binding key與routing key模式匹配的佇列中。

下面我們直接程式碼實踐一下:

1.先在IDEA建立一個新的spring boot專案

2.配置依賴

<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.在配置檔案中新增配置資訊,我使用的yml檔案。

spring:
 rabbitmq:
    host: (rabbitmq的主機ip)
    port: 5672(埠,預設沒有修改就是5672)
    username: (賬號)
    password: (密碼)
    virtual-host: /        (預設就是/)
    publisher-confirms: true

4.新建java的上生產者配置項

@Configuration
public class RqProducerConfig {
    @Bean
    RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    
    //宣告一個佇列 佇列名queue.string
    @Bean
    Queue queueString(RabbitAdmin rabbitAdmin) {
        Queue queue = new Queue("queue.string", true);
        rabbitAdmin.declareQueue(queue);
        return queue;
    }

   
    //宣告一個exchange 是topic形式的
    @Bean
    TopicExchange exchange(RabbitAdmin rabbitAdmin) {
        TopicExchange topicExchange = new TopicExchange("exchange");
        rabbitAdmin.declareExchange(topicExchange);
        return topicExchange;
    }

    
    //把exchange和佇列繫結起來,生產者傳送訊息到exchange,exchange會將訊息傳送到對應的佇列
    @Bean
    Binding bindingExchangeString(Queue queueString, TopicExchange exchange, RabbitAdmin rabbitAdmin) {
        Binding binding = BindingBuilder.bind(queueString).to(exchange).with("queue.string");
        rabbitAdmin.declareBinding(binding);
        return binding;
    }


    /**
     * 生產者用 可以用來轉化為json
     *
     * @return
     */
    @Bean
    public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
        RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
        rabbitMessagingTemplate.setMessageConverter(jackson2Converter());
        rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
        return rabbitMessagingTemplate;
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        return new MappingJackson2MessageConverter();
    }
}

5.編寫傳送的服務層介面和實現

public interface SendStringRqService {
    void sendString(String message);
}
@Service
public class SendStringRqServiceImpl implements SendStringRqService {
   
    @Autowired
    private RabbitMessagingTemplate rabbitTemplate;

    @Override
    public void sendEmail(String message) {
          rabbitTemplate.convertAndSend("exchange", "queue.string", message);
    }

}

6.客戶端測試類編寫

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoApplicationTests {


    @Autowired
    private SendStringRqService sendStringRqService;

    @Test
    public void SendStringService() {
                sendStringRqService.sendEmail(new Date(System.currentTimeMillis())+" ------> hello world");
    }
}

我們再編寫消費端程式碼

1.首先配置依賴和yml檔案。這裡跟客戶端配置一樣。可以直接看客戶端配置就好了。

2.java配置項

@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
    //這裡是接受訊息,進行處理的類
    @Autowired
    ReceiverService receiverService;

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // factory.setPrefetchCount(5);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

3.最後就是接受配置的類的編寫,ReceiverService的編寫

@Component
public class ReceiverService {


    //監聽佇列的名稱,也就是客戶端宣告的queue.string
    @RabbitListener(queues = "queue.string")
    public void receiveStringQueue(String message) {
        System.out.println("Received string<" + message + ">");
        System.out.println("Received string<" + new Date(System.currentTimeMillis()) + ">");
       
    }
}

配置好之後直接執行消費者端程式碼。這個時候檢視RabbitMQManage的介面,會有個連線可以看到。

再執行客戶端的測試用例

執行成功後看到消費者端的控制檯輸出:

這樣基礎的rabbitmq就用spring boot整合完成了。如果需要更多功能,可以根據這個示例去完成。