1. 程式人生 > >SpringBoot學習筆記05——SpringBoot整合RabbitMQ(下)

SpringBoot學習筆記05——SpringBoot整合RabbitMQ(下)

下面我們來學習一下rabbitMQ消費者配置,話不多說直接上程式碼。

1.向application.properties檔案中新增配置

#rabbitMQ的 5672 埠
spring.rabbitmq.addresses=192.168.31.199:32771
#使用者名稱密碼
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

##springboot整合rabbitMQ 消費端配置
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1

2.建立消費者類

package com.youyou.consumer;

import com.rabbitmq.client.Channel;
import com.youyou.entity.Order;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class OrderReceive {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue" ,durable = "true"),
            exchange = @Exchange(name = "order-exchange" , durable = "true" , type = "topic"),
            key = "order.#"
        )
    )
    @RabbitHandler
    public void onMessage(@Payload Order order ,    //
                          @Headers Map<String ,Object> headers ,
                          Channel channel) throws IOException {
        //消費操作
        System.out.println("接收到的訂單::" + order.getName());

        Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //返回ack應答指令
        channel.basicAck(tag,true);
    }
}

啟動專案之後,自動開始監聽佇列訊息。

執行100萬條資料效果如下:

沒有丟失一條記錄