SpringBoot學習筆記05——SpringBoot整合RabbitMQ(下)
阿新 • • 發佈:2018-11-12
下面我們來學習一下rabbitMQ消費者配置,話不多說直接上程式碼。
1.向application.properties檔案中新增配置
#rabbitMQ的 5672 埠 spring.rabbitmq.addresses=192.168.31.199:32771 #使用者名稱密碼 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 ##springboot整合rabbitMQ 消費端配置 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=15 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=1
2.建立消費者類
package com.youyou.consumer; import com.rabbitmq.client.Channel; import com.youyou.entity.Order; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Component public class OrderReceive { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue" ,durable = "true"), exchange = @Exchange(name = "order-exchange" , durable = "true" , type = "topic"), key = "order.#" ) ) @RabbitHandler public void onMessage(@Payload Order order , // @Headers Map<String ,Object> headers , Channel channel) throws IOException { //消費操作 System.out.println("接收到的訂單::" + order.getName()); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //返回ack應答指令 channel.basicAck(tag,true); } }
啟動專案之後,自動開始監聽佇列訊息。
執行100萬條資料效果如下:
沒有丟失一條記錄