崛起於Springboot2.0.X之整合RabbitMQ企業所有場景開發(46)
阿新 • • 發佈:2019-08-23
1、部落格涵蓋點
1.2 使用ssm xml方式整合rabbitMq,五種模式+死信佇列方案+jdk8
1.3 本部落格專案碼雲地址:==》springboot+RabbitMQ+所有場景
1、fanout:釋出/訂閱模式 2、rounting:路由模式 3、topic:萬用字元模式 4、延遲佇列之使用CustomExchange方案:需要安裝延遲外掛 點選==》安裝詳情 5、延遲佇列之死信佇列
2、場景
引言:(九天部落格實時更新修改,即便你是複製到你的網站部落格,也看不到每一篇部落格的優化,不如關注我哈) RabbitMQ 場景應用: 1、秒殺場景:高併發請求執行緒進入訊息佇列,根據先進先出原則,執行秒殺邏輯 2、延遲佇列【兩種方式 使用外掛延遲 和 死信佇列延遲】: 2.1:使用者下訂單,但是不支付,超過30分鐘訂單自動取消 2.2:使用者註冊成功之後,需要過一段時間比如一週後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。 2.3: 延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段時間後自動重試 3、非同步操作【非同步操作比同步快】: 3.1:非同步記錄使用者操作日誌:使用者的登陸app,傳送到訊息佇列,監聽記錄使用者的登陸時間、裝置,來源ip等資訊... 3.2:非同步傳送郵件:註冊或者忘記密碼的時候,通常某某網站會提示傳送你郵箱一個連結,請點選。 3.3:非同步傳送簡訊驗證碼:使用者忘記密碼或者使用手機驗證碼登陸時,可以執行非同步,沒必要讓程式序列完成所有操作最後才能接受到驗證碼
3、pom檔案
springboot 2.0.X的依賴大家自己加上去吧,應該也適用於 springboot2.1.X。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--工具類--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.6.1</version> </dependency>
4、application.properties
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.simple.concurrency=3 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.acknowledge-mode=manual
5、java配置類
4.1 rabbitmq配置
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class RabbbitConfig { @Value("${spring.rabbitmq.host}") public String host; @Value("${spring.rabbitmq.port}") public int port; @Value("${spring.rabbitmq.username}") public String username; @Value("${spring.rabbitmq.password}") public String password; @Value("${spring.rabbitmq.virtual-host}") public String virtual_host; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPort(port); connectionFactory.setVirtualHost(virtual_host); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } //配置消費者監聽的容器 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//設定確認模式手工確認 return factory; } @Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
4.2 Exchange配置
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @Author:MuJiuTian * @Description:所有的exchange列表 * @Date: Created in 下午11:04 2019/8/19 */ @Component @Configuration public class ExchangeConfig { /** * 建立型別:fanout交換機 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_exchange",true,false,null); } /** * 建立型別:direct交換機 */ @Bean public DirectExchange directExchange() { return new DirectExchange("direct_exchange",true,false,null); } /** * 建立型別:topic交換機 */ @Bean public TopicExchange topicExchange() { return new TopicExchange("IExchange",true,false,null); } /** * 建立型別:custom交換機,該交換機需要安裝delay_rabbitmq外掛才能執行 */ @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("custom_exchange","x-delayed-message",true,false,args); } /** * 建立型別:headers交換機 */ @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers_exchange",true,false,null); } /** * 延遲:immediate交換機 */ @Bean public DirectExchange immediateExchange() { return new DirectExchange("immediate_exchange"); } /** * 延遲:dlx_delay交換機 */ @Bean public DirectExchange dlxExchange() { return new DirectExchange("dlx_delay_exchange"); } }
4.3 Queue配置
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; /** * @Author:MuJiuTian * @Description: 所有的佇列統一配置 * @Date: Created in 下午11:36 2019/8/19 */ @Configuration @Component public class QueueConfig { /** * 針對fanout交換機的佇列 */ @Bean public Queue fanoutQueue1() { return new Queue("fanout_queue_1"); } /** * 針對fanout交換機的佇列 */ @Bean public Queue fanoutQueue2() { return new Queue("fanout_queue_2"); } /** * 針對direct交換機的佇列 */ @Bean public Queue directQueue1() { return new Queue("direct_queue_1"); } /** * 針對direct交換機的佇列 */ @Bean public Queue directQueue2() { return new Queue("direct_queue_2"); } /** * 針對topic交換機的佇列 */ @Bean public Queue topicQueue1() { return new Queue("topic_queue_1"); } /** * 針對topic交換機的佇列 */ @Bean public Queue topicQueue2() { return new Queue("topic_queue_2"); } /** * 延遲佇列 */ @Bean public Queue delayQueue() { return new Queue("delay_queue"); } /** * 死信佇列方式中的立即消費佇列 */ @Bean public Queue immediateQueue() { return new Queue("immediate"); } /** * 死信佇列方式中的延遲佇列 */ @Bean public Queue dlxDelay() { Map<String,Object> map = new HashMap<>(); //map.put("x-message-ttl",6000);,延遲時間,不過我們不需要在這裡配置,在service設定就好了 // x-dead-letter-exchange 聲明瞭佇列裡的死信轉發到的DLX名稱 map.put("x-dead-letter-exchange","immediate_exchange"); // x-dead-letter-routing-key 聲明瞭這些死信在轉發時攜帶的 routing-key 名稱。 map.put("x-dead-letter-routing-key","immediate_road"); return new Queue("dlx_delay_queue",true,false,false,map); } }
4.4 exchange與queue關係繫結配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author:MuJiuTian * @Description: 所有的exchange與queue之間的routing key * @Date: Created in 下午11:39 2019/8/19 */ @Configuration public class BindingConfig { @Autowired ExchangeConfig exchange; @Autowired QueueConfig queue; @Bean public Binding bindFanout1() { return BindingBuilder.bind(queue.fanoutQueue1()).to(exchange.fanoutExchange()); } @Bean public Binding bindFanout2() { return BindingBuilder.bind(queue.fanoutQueue2()).to(exchange.fanoutExchange()); } @Bean public Binding bindDirectOrange() { return BindingBuilder.bind(queue.directQueue1()).to(exchange.directExchange()).with("orange"); } @Bean public Binding bindDirectBlack() { return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("black"); } @Bean public Binding bindDirectGreen() { return BindingBuilder.bind(queue.directQueue2()).to(exchange.directExchange()).with("green"); } @Bean public Binding bindTopic1(){ Binding binding= BindingBuilder.bind(queue.topicQueue1()).to(exchange.topicExchange()).with("*.orange.*"); return binding; } @Bean public Binding bindTopic2(){ Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("*.*.rabbit"); return binding; } @Bean public Binding bindTopic3(){ Binding binding= BindingBuilder.bind(queue.topicQueue2()).to(exchange.topicExchange()).with("lazy.#"); return binding; } @Bean public Binding bindCustom() { return BindingBuilder.bind(queue.delayQueue()).to(exchange.customExchange()).with("delay_queue_road").noargs(); } @Bean public Binding immediate() { return BindingBuilder.bind(queue.immediateQueue()).to(exchange.immediateExchange()).with("immediate_road"); } @Bean public Binding dlxDelay() { return BindingBuilder.bind(queue.dlxDelay()).to(exchange.dlxExchange()).with("dlx_delay_road"); } }
6、實體類
import java.io.Serializable; /** * @Author:MuJiuTian * @Description: * @Date: Created in 下午6:01 2019/8/19 */ public class Mail implements Serializable { private static final long serialVersionUID = -8140693840257585779L; private String mailId; private String country; private Double weight; public Mail() { } public Mail(String mailId, String country, double weight) { this.mailId = mailId; this.country = country; this.weight = weight; } public String getMailId() { return mailId; } public void setMailId(String mailId) { this.mailId = mailId; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public double getWeight() { return weight; } public void setWeight(double weight) { this.weight = weight; } @Override public String toString() { return "Mail [mailId=" + mailId + ", country=" + country + ", weight=" + weight + "]"; } }
7、service層
public interface Producer { void sendMessage(String exchange, String rountingKey, Object object); void delayMessage(String exchange, String rountingKey, long time, Object object); void dlxDelayMessage(String exchange, String rountingKey, long time, Object object); void sendAndReceive(String exchange, String rountingKey, Object object); }
import com.example.rabbit.service.Producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @Author:MuJiuTian * @Description: * @Date: Created in 下午9:52 2019/8/19 */ @Service @Transactional public class ProducerImpl implements Producer { @Autowired RabbitTemplate rabbitTemplate; /** * @Author:MuJiuTian * @Date:2019/8/20 下午4:10 * @Description: */ @Override public void sendMessage(String exchange, String rountingKey, Object object) { rabbitTemplate.convertAndSend(exchange,rountingKey,object); } /** * @Author:MuJiuTian * @Date:2019/8/20 下午4:41 * @Description: */ @Override public void delayMessage(String exchange, String rountingKey, long time, Object object) { rabbitTemplate.convertAndSend(exchange,rountingKey,object,message -> { message.getMessageProperties().setHeader("x-delay",time); return message; }); } @Override public void dlxDelayMessage(String exchange, String rountingKey, long time, Object object) { rabbitTemplate.convertAndSend(exchange, rountingKey, object, message -> { message.getMessageProperties().setExpiration(time + ""); return message; }); } /** * @Author:MuJiuTian * @Date:2019/8/20 下午4:46 * @Description:傳送與消費一步完成,前提是監聽器業務邏輯處理沒有任何異常 */ @Override public void sendAndReceive(String exchange, String rountingKey, Object object) { rabbitTemplate.convertSendAndReceive(exchange,rountingKey,object); } }
8、controller層
import cn.hutool.core.date.DateUtil; import com.example.rabbit.entity.Mail; import com.example.rabbit.service.impl.ProducerImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.Random; /** * @Author:MuJiuTian * @Description: * @Date: Created in 下午10:23 2019/8/19 */ @RestController public class RabbitController { @Autowired ProducerImpl producer; /** * @Author:MuJiuTian * @Date:2019/8/20 上午10:59 * @Description:使用fanout交換機模式測試rabbit,該模式沒有routingKey */ @RequestMapping(value = "/fanout") public void fanout() { Mail mail = randomMail(); producer.sendMessage("fanout_exchange",null,mail); } /** * @Author:MuJiuTian * @Date:2019/8/20 上午11:00 * @Description:使用direct交換機模式測試rabbit,支援routingKey多路由模式 */ @RequestMapping(value = "/direct") public void direct() { Mail mail = randomMail(); producer.sendMessage("direct_exchange","",mail); } /** * @Author:MuJiuTian * @Date:2019/8/20 上午11:00 * @Description:使用topic交換機模式測試rabbit,支援routingKey萬用字元模式 */ @RequestMapping(value = "/topic") @ResponseBody public void topic() { Mail mail = randomMail(); //producer.sendMessage("IExchange","lazy.mm",mail); producer.sendMessage("IExchange","love.orange.hate",mail); } /** * @Author:MuJiuTian * @Date:2019/8/20 下午4:34 * @Description:延遲佇列測試,毫秒為單位 */ @GetMapping(value = "/delay") @ResponseBody public void delay() { Mail mail = randomMail(); String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"); System.out.println("延遲傳送時間:"+now+"資料:"+mail.toString()); producer.delayMessage("custom_exchange","delay_queue_road",3000,mail); } /** * @Author:MuJiuTian * @Date:2019/8/21 上午10:17 * @Description:延遲佇列死信佇列方式 */ @GetMapping(value = "/dlxDelay") public void dlxDelay() { Mail mail = randomMail(); String now = DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss"); System.out.println("延遲傳送時間:"+now+"資料:"+mail.toString()); producer.dlxDelayMessage("dlx_delay_exchange","dlx_delay_road",3000,mail); } /** * 隨機建立一個Mail實體物件,供介面測試 */ public static Mail randomMail() { Mail mail = new Mail(); mail.setMailId(new Random().nextInt(100)+""); mail.setCountry("China"); mail.setWeight(new Random().nextDouble()); return mail; } }
9、監聽器
import cn.hutool.core.date.DateUtil; import com.example.rabbit.entity.Mail; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.text.DateFormat; import java.util.Date; import java.util.Map; /** * @Author:MuJiuTian * @Description: Message包含 @Payload Object obj和@Headers Map<String,Object> heads兩者 * @Payload @Headers @Header(name = "amqp_deliveryTag") @RabbitListener @RabbitHandler 總共5個註解的使用 * @Date: Created in 下午10:06 2019/8/19 */ @Component public class MyListener { @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(queues = "fanout_queue_1") public void fanoutQueue1(Mail mail) throws IOException { System.out.println("fanout_queue_1佇列取出訊息"+mail.toString()); } @RabbitListener(queues = "fanout_queue_2") public void fanoutQueue2(Mail mail) throws IOException { System.out.println("fanout_queue_2佇列取出訊息"+mail.toString()); } @RabbitListener(queues = "direct_queue_1") public void directQueue1(Mail mail) { System.out.println("direct_queue_1佇列取出訊息"+mail.toString()); } @RabbitListener(queues = "direct_queue_2") public void directQueue2(Mail mail) { System.out.println("direct_queue_2佇列取出訊息"+mail.toString()); } @RabbitListener(queues = "topic_queue_1") public void topicQueue1(Mail mail) { System.out.println("從topic_queue_1取出訊息"+mail.toString()); } @RabbitListener(queues = "topic_queue_2") public void topicQueue2(@Payload Mail mail, @Headers Map<String,Object> heads,Channel channel) throws IOException { System.out.println("到達監聽器,準備處理RabbitMQ業務邏輯,從topic_queue_2取出訊息=="+mail.toString()); //第一步:業務邏輯處理,如活動秒殺 //第二部:業務邏輯處理成功之後,消費掉訊息 channel.basicAck(Long.valueOf(heads.get("amqp_deliveryTag").toString()),true); } @RabbitListener(queues = "delay_queue") public void delay(@Payload Mail mail, @Header(name = "amqp_deliveryTag") long deliveryTag,Channel channel) throws IOException { System.out.println("延遲佇列接受時間:"+ DateUtil.format(new Date(),"yyyy-MM-dd HH:mm:ss")); //第一步:業務邏輯處理,如下訂單內30分鐘不支付情況下,自動取消訂單,這裡就不寫了,主要體現rabbitmq的延遲功能 //第二部:業務邏輯處理成功之後,消費掉訊息 channel.basicAck(deliveryTag,false); } @RabbitListener(queues = "immediate") @RabbitHandler public void immediate(@Payload Mail mail) { System.out.println("此刻時間是:"+ DateUtil.format(new Date(), DateFormat.getDateTimeInstance())+"要處理的資料="+mail); } }
10、專案啟動
專案啟動後,開啟localhost:15672,裡面的exchange和queue會自動配置好,不過還是要檢查一下exchange和queue有沒有繫結關係好,都可以了進行測試,如下:
10.1 topic測試:http://localhost:8080/topic
10.2 延遲佇列,使用CustomExchange測試:http://localhost:8080/delay
10.3 延遲佇列,方式二,使用死信佇列方式測試:http://localhost:8080/dlxDelay
喜歡我就關注我吧....嘻嘻嘻。 <