1. 程式人生 > >RabbitMQ進階使用-延時佇列的配置(Spring Boot)

RabbitMQ進階使用-延時佇列的配置(Spring Boot)

依賴
MAVEN配置pom.xml

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Gradle配置build.gradle

compile('org.springframework.boot:spring-boot-starter-amqp')

連線配置
得益於spring boot的約定大於配置,只需要在application.yml加入下面配置即可。

spring:
  rabbitmq:
    host: host
    port: port
    username: admin
    password: passwd

簡單自定義RabbitTemplate和Queue配置
預設的配置還是略顯不足,增加序列化配置如下:

@Configuration
public class QueueConfig {

    /**
     * 自動注入為SimpleRabbitListenerContainerFactory的訊息序列化轉換器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
    * 持久化交換機
    */
    @Bean(name = "exchange")
    public FanoutExchange exchange() {
        return new FanoutExchange("exchange1", true, false);
    }

    /**
    * 持久化佇列
    */
    @Bean
    public Queue queue() {
        return new Queue("queue", true);
    }

    /**
     * 將佇列和exchange繫結
     *
     * @return binding
     */
    @Bean
    Binding bindingSmsExchangeSmsQueue() {
        return BindingBuilder.bind(queue()).to(exchange());
    }

}

特殊延時佇列的配置
延時佇列的用法這裡就不詳細說了,參考Spring Boot與RabbitMQ結合實現延遲佇列的示例,有些場景如未支付訂單30分鐘過期等,可通過延時佇列實現

    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable("delayQueue")                      //佇列名稱
                .withArgument("x-message-ttl",10000)                      //死信時間
                .withArgument("x-dead-letter-exchange", "")            //死信重新投遞的交換機
                .withArgument("x-dead-letter-routing-key", "queue")//路由到佇列的routingKey
                .build();
    }

啟動應用測試一下
啟動應用,在rabbit管理web檢視所有佇列
在這裡插入圖片描述
所有佇列
檢視delayQueue詳情,框框中為延時配置
在這裡插入圖片描述
將"x-message-ttl"引數改成20000重啟發現問題,控制佇列裡面的引數也沒有修改成功
在這裡插入圖片描述
修改帶引數佇列失敗的問題
問題分析
根據日誌提示,佇列已經存在而且引數不一致導致,然後檢視原始碼在RabbitAdmin發現下面程式碼,在建立佇列失敗的時候會呼叫logOrRethrowDeclarationException方法,logOrRethrowDeclarationException方法中釋出了一個DeclarationExceptionEvent事件,到這裡解決思路有,監聽這個事件,然後刪除相應的佇列

	private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
		List<DeclareOk> declareOks = new ArrayList<DeclareOk>(queues.length);
		for (int i = 0; i < queues.length; i++) {
			Queue queue = queues[i];
			if (!queue.getName().startsWith("amq.")) {
				if (this.logger.isDebugEnabled()) {
					this.logger.debug("declaring Queue '" + queue.getName() + "'");
				}
				try {
					try {
						DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
								queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
						declareOks.add(declareOk);
					}
					catch (IllegalArgumentException e) {
						if (this.logger.isDebugEnabled()) {
							this.logger.error("Exception while declaring queue: '" + queue.getName() + "'");
						}
						try {
							if (channel instanceof ChannelProxy) {
								((ChannelProxy) channel).getTargetChannel().close();
							}
						}
						catch (TimeoutException e1) {
						}
						throw new IOException(e);
					}
				}
				catch (IOException e) {
					logOrRethrowDeclarationException(queue, "queue", e);
				}
			}
			else if (this.logger.isDebugEnabled()) {
				this.logger.debug(queue.getName() + ": Queue with name that starts with 'amq.' cannot be declared.");
			}
		}
		return declareOks.toArray(new DeclareOk[declareOks.size()]);
	}

	private <T extends Throwable> void logOrRethrowDeclarationException(Declarable element, String elementType, T t)
			throws T {
		DeclarationExceptionEvent event = new DeclarationExceptionEvent(this, element, t);
		this.lastDeclarationExceptionEvent = event;
		if (this.applicationEventPublisher != null) {
			this.applicationEventPublisher.publishEvent(event);
		}
		if (this.ignoreDeclarationExceptions || (element != null && element.isIgnoreDeclarationExceptions())) {
			if (this.logger.isDebugEnabled()) {
				this.logger.debug("Failed to declare " + elementType
						+ ": " + (element == null ? "broker-generated" : element)
						+ ", continuing...", t);
			}
			else if (this.logger.isWarnEnabled()) {
				Throwable cause = t;
				if (t instanceof IOException && t.getCause() != null) {
					cause = t.getCause();
				}
				this.logger.warn("Failed to declare " + elementType
						+ ": " + (element == null ? "broker-generated" : element)
						+ ", continuing... " + cause);
			}
		}
		else {
			throw t;
		}
	}

解決方法
寫一個DeclarationExceptionEvent事件監聽,處理建立失敗的佇列,既刪除掉

@Component
public class DeclarationExceptionEventListener {

    @Autowired
    private AmqpAdmin rabbitAdmin;

    @EventListener(classes = DeclarationExceptionEvent.class)
    public void listen(DeclarationExceptionEvent event) {
        final Declarable declarable = event.getDeclarable();
        if (declarable instanceof Queue) {
            final Queue queue = (Queue) declarable;
            rabbitAdmin.deleteQueue(queue.getName());
        }
    }
}

改完重啟應用,只有一條異常日誌(原來4條),還有一條的原因是第一次建立失敗釋出事件,我們監聽了事件進行處理。檢視rabbit控制檯,引數修改成功。