1. 程式人生 > >使用rabbitmq手動確認訊息的,定時獲取佇列訊息實現

使用rabbitmq手動確認訊息的,定時獲取佇列訊息實現

描述問題

  最近專案中因為有些資料,需要推送到第三方系統中,因為資料會一直增加,並且需要與第三方系統做相關互動。

相關業務

  本著不影響線上執行效率的思想,我們將增加的訊息放入rabbitmq,使用另一個應用獲取消費,因為資料只是推送,並且業務的資料有15分鐘左右的更新策略,對實時性不是很高所以我們需要一個定時任務來主動連結rabbit去消費,然後將資料以網路方式傳送

相關分析

  網路上大致出現了相關的解決辦法,但由於實現相關資料丟失及處理、效能和效率等相關基礎業務的工作量,望而卻步。。。。。。

  還好spring有相關的 org.springframework.amqp 工具包,簡化的大量麻煩>_> 讓我們開始吧

  瞭解rabbit的相關幾個概念

  • Spring RabbitMQ Channel理解  
  • 中介軟體系列二 RabbitMQ之訊息永續性、確認機制、拒絕、預取數量、分配策略
  • topic

 瞭解了這幾個概念的時候你可能已經關注到了我們今天的主題SimpleMessageListenerContainer

 我們使用SimpleMessageListenerContainer容器設定消費佇列監聽,然後設定具體的監聽Listener進行訊息消費具體邏輯的編寫,通過SimpleRabbitListenerContainerFactory我們可以完成相關SimpleMessageListenerContainer容器的管理,

  但對於使用此容器批量消費的方式,官方並沒有相關說明,網路上你可能只找到這篇SimpleMessageListenerContainer批量訊息處理對於問題描述是很清晰,但是回答只是說的比較簡單

  下面我們就對這個問題的答案來個coding

解決辦法

  首先我們因為需要失敗重試,使用spring的RepublishMessageRecoverer可以解決這個問題,這顯然有一個缺點,即將在整個重試期間佔用執行緒。所以我們使用了死信佇列

  相關配置

  1     @Bean
  2     ObjectMapper objectMapper() {
  3         ObjectMapper objectMapper = new ObjectMapper();
  4         DateFormat dateFormat = objectMapper.getDateFormat();
  5         JavaTimeModule javaTimeModule = new JavaTimeModule();
  6 
  7         SimpleModule module = new SimpleModule();
  8         module.addSerializer(new ToStringSerializer(Long.TYPE));
  9         module.addSerializer(new ToStringSerializer(Long.class));
 10         module.addSerializer(new ToStringSerializer(BigInteger.class));
 11 
 12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
 13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
 14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
 15 
 16         objectMapper.registerModule(module);
 17         objectMapper.registerModule(javaTimeModule);
 18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化擴充套件日期格式支援
 19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
 20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 21         return objectMapper;
 22 }
 23 
 24 
 25 
 26   @Bean
 27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
 28     return new RabbitAdmin(aConnectionFactory);
 29   }
 30 
 31   @Bean
 32   MessageConverter jacksonAmqpMessageConverter( ) {
 33     return new Jackson2JsonMessageConverter(objectMapper());
 34   }
 35 
 36 
 37   @Bean
 38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
 39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
 40     rabbitAdmin.declareQueue(queue);
 41     return queue;
 42   }
 43   @Bean
 44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
 45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
 46     rabbitAdmin.declareQueue(queue);
 47     return queue;
 48   }
 49   @Bean
 50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
 51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
 52     rabbitAdmin.declareQueue(queue);
 53     return queue;
 54   }
 55     /**
 56      * 宣告一個交換機
 57      * @return
 58      */
 59   @Bean
 60   TopicExchange controlExchange () {
 61       return new TopicExchange(Exchanges.ExangeTOPIC);
 62   }
 63 
 64 
 65     /**
 66      * 延時重試佇列
 67      */
 68     @Bean
 69     public Queue bcwPayControlRetryQueue() {
 70         Map<String, Object> arguments = new HashMap<>();
 71         arguments.put("x-message-ttl", 10 * 1000);
 72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 73 //        如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key
 74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 75         return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
 76     }
 77     /**
 78      * 延時重試佇列
 79      */
 80     @Bean
 81     public Queue bcwPushControlRetryQueue() {
 82         Map<String, Object> arguments = new HashMap<>();
 83         arguments.put("x-message-ttl", 10 * 1000);
 84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 85 //        如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key
 86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 87         return new Queue("queue_bcw@push@retry", true, false, false, arguments);
 88     }
 89     /**
 90      * 延時重試佇列
 91      */
 92     @Bean
 93     public Queue bcwPullControlRetryQueue() {
 94         Map<String, Object> arguments = new HashMap<>();
 95         arguments.put("x-message-ttl", 10 * 1000);
 96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 97 //        如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key
 98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
 99         return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
100     }
101     @Bean
102     public Binding  bcwPayControlRetryBinding() {
103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104     }
105     @Bean
106     public Binding  bcwPushControlRetryBinding() {
107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108     }
109     @Bean
110     public Binding   bcwPullControlRetryBinding() {
111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112     }
113 
114   /**
115    * 佇列繫結並關聯到RoutingKey
116    *
117    * @param queueMessages 佇列名稱
118    * @param exchange      交換機
119    * @return 繫結
120    */
121   @Bean
122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124   }
125   /**
126    * 佇列繫結並關聯到RoutingKey
127    *
128    * @param queueMessages 佇列名稱
129    * @param exchange      交換機
130    * @return 繫結
131    */
132   @Bean
133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135   }
136   /**
137    * 佇列繫結並關聯到RoutingKey
138    *
139    * @param queueMessages 佇列名稱
140    * @param exchange      交換機
141    * @return 繫結
142    */
143   @Bean
144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146   }
147 
148   @Bean
149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
152           ConnectionFactory connectionFactory) {
153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154     configurer.configure(factory, connectionFactory);
155     factory.setMessageConverter(jacksonAmqpMessageConverter());
156     return factory;
157   }

下面就是我們的主題,定時任務使用的是org.springframework.scheduling

  1 /**
  2  * 手動確認訊息的,定時獲取佇列訊息實現
  3  */
  4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
  5     protected final Logger logger = LoggerFactory.getLogger(getClass());
  6     private List<Message> body = new LinkedList<>();
  7     public long start_time;
  8     private Channel channel;
  9     @Autowired
 10     private ObjectMapper objectMapper;
 11     @Autowired
 12     private RabbitTemplate rabbitTemplate;
 13 
 14     public QuartzSimpleMessageListenerContainer() {
 15         // 手動確認
 16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 17 
 18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
 19             long current_time = System.currentTimeMillis();
 20             int time = (int) ((current_time - start_time)/1000);
 21             logger.info("====接收到{}佇列的訊息=====",message.getMessageProperties().getConsumerQueue());
 22             Long retryCount = getRetryCount(message.getMessageProperties());
 23             if (retryCount > 3) {
 24                 logger.info("====此訊息失敗超過三次{}從佇列的訊息刪除=====",message.getMessageProperties().getConsumerQueue());
 25                 try {
 26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 27                 } catch (IOException ex) {
 28                     ex.printStackTrace();
 29                 }
 30                 return;
 31             }
 32 
 33             this.body.add(message);
 34             /**
 35              * 判斷陣列資料是否滿了,判斷此監聽器時間是否大於執行時間
 36              * 如果在最後延時時間段內沒有業務訊息,此監聽器會一直開著
 37              */
 38             if(body.size()>=3 || time>60){
 39                 this.channel = channel;
 40                 callback();
 41             }
 42         });
 43 
 44 
 45 
 46     }
 47     private void callback(){
 48 //         channel = getChannel(getTransactionalResourceHolder());
 49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
 50             try {
 51                 callbackWork();
 52             }catch (Exception e){
 53                 logger.error("推送資料出錯:{}",e.getMessage());
 54 
 55                 body.stream().forEach(message -> {
 56                     Long retryCount = getRetryCount(message.getMessageProperties());
 57                     if (retryCount <= 3) {
 58                         logger.info("將訊息置入延時重試佇列,重試次數:" + retryCount);
 59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
 60                     }
 61                 });
 62 
 63             } finally{
 64 
 65                 logger.info("flsher too data");
 66 
 67                 body.stream().forEach(message -> {
 68                     //手動acknowledge
 69                     try {
 70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 71                     } catch (IOException e) {
 72                         logger.error("手動確認訊息失敗!");
 73                         e.printStackTrace();
 74                     }
 75                 });
 76 
 77                 body.clear();
 78                 this.stop();
 79 
 80             }
 81         }
 82 
 83     }
 84     abstract void callbackWork() throws Exception;
 85     /**
 86      * 獲取訊息失敗次數
 87      * @param properties
 88      * @return
 89      */
 90     private long getRetryCount(MessageProperties properties){
 91         long retryCount = 0L;
 92         Map<String,Object> header = properties.getHeaders();
 93         if(header != null && header.containsKey("x-death")){
 94             List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
 95             if(deaths.size()>0){
 96                 Map<String,Object> death = deaths.get(0);
 97                 retryCount = (Long)death.get("count");
 98             }
 99         }
100         return retryCount;
101     }
102 
103     @Override
104     @Scheduled(cron = "0 0/2 * * * ? ")
105     public void start() {
106         logger.info("start push data scheduled!");
107         //初始化資料,將未處理的呼叫stop方法,返還至rabbit
108         body.clear();
109         super.stop();
110         start_time = System.currentTimeMillis();
111         super.start();
112 
113         logger.info("end push data scheduled!");
114     }
115 
116     public List<WDNJPullOrder> getBody() {
117 
118         List<WDNJPullOrder> collect = body.stream().map(data -> {
119                     byte[] body = data.getBody();
120                     WDNJPullOrder readValue = null;
121                     try {
122                         readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
123                         });
124                     } catch (IOException e) {
125                         logger.error("處理資料出錯{}",e.getMessage());
126                     }
127                     return readValue;
128                 }
129         ).collect(Collectors.toList());
130 
131         return collect;
132 
133 
134     }
135 
136 }

 

後續

 

當然定時任務的啟動,你可以寫到相關rabbit容器實現的裡面,但是這裡並不是很需要,所以對於這個的小改動,同學你可以自己實現

 @Scheduled(cron = "0 0/2 * * * ? ")

public void start()