Springboot2(28)整合rabbitmq實現延時訊息
阿新 • • 發佈:2018-12-29
rabbitmq實現訊息的確認機制和延時訊息的傳送
訊息生產者程式碼實現的主要配置
@Configuration
@Slf4j
public class PrividerRabbitmqConfig {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 定製化amqp模版 可根據需要定製多個
*
*
* 此處為模版類定義 Jackson訊息轉換器
* ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥 即訊息傳送到exchange ack
* ReturnCallback介面用於實現訊息傳送到RabbitMQ 交換器,但無相應佇列與交換器繫結時的回撥 即訊息傳送不到任何一個佇列中 ack
*
* @return the amqp template
*/
@Bean
public AmqpTemplate amqpTemplate() {
//使用jackson 訊息轉換器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setEncoding("UTF-8");
//開啟returncallback yml 需要 配置 publisher-returns: true
rabbitTemplate.setMandatory (true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", replyCode, replyText, exchange, routingKey);
});
//訊息確認 yml 需要配置 publisher-returns: true
rabbitTemplate. setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("訊息傳送到exchange成功,id: {}", correlationData.getId());
} else {
log.info("訊息傳送到exchange失敗,原因: {}", cause);
}
});
return rabbitTemplate;
}
/* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */
/**
* 宣告Direct交換機 支援持久化.
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
}
/**
* 宣告一個佇列 支援持久化.
*
* @return the queue
*/
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable("DIRECT_QUEUE").build();
}
/**
* 通過繫結鍵 將指定佇列繫結到一個指定的交換機 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
}
/* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */
/**
* 宣告 fanout 交換機.
*
* @return the exchange
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
}
/**
* Fanout queue A.
*
* @return the queue
*/
@Bean("fanoutQueueA")
public Queue fanoutQueueA() {
return QueueBuilder.durable("FANOUT_QUEUE_A").build();
}
/**
* Fanout queue B .
*
* @return the queue
*/
@Bean("fanoutQueueB")
public Queue fanoutQueueB() {
return QueueBuilder.durable("FANOUT_QUEUE_B").build();
}
/**
* 繫結佇列A 到Fanout 交換機.
*
* @param queue the queue
* @param fanoutExchange the fanout exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 繫結佇列B 到Fanout 交換機.
*
* @param queue the queue
* @param fanoutExchange the fanout exchange
* @return the binding
*/
@Bean
public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 超時佇列
* 訊息被拒絕(basic.reject/ basic.nack)並且requeue=false
* 訊息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
* 佇列達到最大長度
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");
arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
@Bean
public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY2").noargs();
}
/**
* 優先順序佇列
* @return
*/
/* @Bean("priorityQueue")*/
public Queue priorityQueue(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority",10); //佇列的屬性引數 有10個優先級別
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
/* @Bean
public Binding priorityBinding(@Qualifier("priorityQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
}*/
}
訊息消費者程式碼實現的主要配置
@Configuration
public class ConsumerRabbitmqConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ack
return factory;
}
}
1 傳送確認
application.yml新增配置
spring.rabbitmq.publisher-confirms: true #將channel通道設定成confirm模式
設定訊息確認會影響併發效能,導致訊息掉失。因為每個connection最多支援2048個channel,當channel達到2048時,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later
。
@Bean
public AmqpTemplate amqpTemplate() {
//使用jackson 訊息轉換器
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setEncoding("UTF-8");
//開啟returncallback yml 需要 配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", replyCode, replyText, exchange, routingKey);
});
//訊息確認 yml 需要配置 publisher-returns: true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("訊息傳送到exchange成功,id: {}", correlationData.getId());
} else {
log.info("訊息傳送到exchange失敗,原因: {}", cause);
}
});
return rabbitTemplate;
}
rabbitTemplate.setConfirmCallback()
訊息傳送到 Broker 後觸發回撥,確認訊息是否到達 Broker 伺服器,也就是隻確認是否正確到達 Exchange 中
rabbitTemplate.setReturnCallback()
通過實現 ReturnCallback 介面,啟動訊息失敗返回,比如路由不到佇列時觸發回撥
同一個連線不同channel使用事務和釋出確認
2 消費確認
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
RabbitListenerContainerFactory 中進行開啟手動 ack
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //開啟手動 ack
return factory;
}
確認訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
3 傳送訊息
CorrelationData correlationData = new CorrelationData( UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",new UserData(userName),correlationData);
4 超時佇列
/**
* 超時佇列
* 訊息被拒絕(basic.reject/ basic.nack)並且requeue=false
* 訊息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間))
* 佇列達到最大長度
* @return
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "DIRECT_EXCHANGE");
arguments.put("x-dead-letter-routing-key", "DIRECT_ROUTING_KEY");
Queue queue = new Queue("DEAD_LETTER_QUEUE",true,false,false,arguments);
return queue;
}
/**
* 傳送超時訊息
* @return
*/
@RequestMapping("/deadLetter")
@ResponseBody
public R deadLetterMsg(){
MessagePostProcessor processor = new MessagePostProcessor(){
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000" );//10秒後超時
message.getMessageProperties().setPriority(1);
return message;
}
};
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE","DIRECT_ROUTING_KEY2", "333", processor);
return R.ok();
}
通過超時佇列達到訊息延時傳送的效果。當訊息傳送到超時佇列deadLetterQueue
經過一定時間超時後,訊息會路由到繫結的exchange(DIRECT_EXCHANGE
)。而繫結exchange的佇列會得到訊息。
測試
請求地址:http://127.0.0.1:8080/deadLetter
訊息傳送
到收到訊息