1. 程式人生 > >Spring-amqp 1.6.1消費者手工對訊息進行確認

Spring-amqp 1.6.1消費者手工對訊息進行確認

前言

在使用Spring amqp建立消費者並接收訊息時,通常會用到下面兩個介面。

public interface MessageListener {

    void onMessage(Message message);

}

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;

}

我們會實現介面,並通過onMessage方法來接收訊息。在接收訊息後,處理業務時如果出現異常,那麼消費者會不斷接收到重發的訊息。有時候在出現某些異常,無法處理,因此並不希望繼續接收到重發,因此需要用到手工確認模式,來按需進行重發。

1.預設情況下為什麼會自動重發?

在配置消費端時,通常使用下面的配置。而對於rabbit:listener-container標籤並未指定“確認屬性” acknowledge。預設情況下該屬性為auto。

    <rabbit:listener-container 
        connection-factory="connectionFactory" >
        <rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
    </rabbit:listener-container
>

當onMessage方法產生異常後,框架會呼叫下面的方法處理異常。

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.rollbackOnExceptionIfNecessary(Throwable)
    public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {

        boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this
.acknowledgeMode.isManual(); try { if (this.transactional) { if (logger.isDebugEnabled()) { logger.debug("Initiating transaction rollback on application exception: " + ex); } RabbitUtils.rollbackIfNecessary(this.channel); } if (ackRequired) { // We should always requeue if the container was stopping boolean shouldRequeue = this.defaultRequeuRejected || ex instanceof MessageRejectedWhileStoppingException; Throwable t = ex; while (shouldRequeue && t != null) { if (t instanceof AmqpRejectAndDontRequeueException) { shouldRequeue = false; } t = t.getCause(); } if (logger.isDebugEnabled()) { logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")"); } for (Long deliveryTag : this.deliveryTags) { // With newer RabbitMQ brokers could use basicNack here... this.channel.basicReject(deliveryTag, shouldRequeue); } if (this.transactional) { // Need to commit the reject (=nack) RabbitUtils.commitIfNecessary(this.channel); } } } catch (Exception e) { logger.error("Application exception overridden by rollback exception", ex); throw e; } finally { this.deliveryTags.clear(); } }

isAutoAck方法

    public boolean isAutoAck() {
        return this == NONE;
    }

可以看到,如果acknowledge即非manul,也非none時(呼叫處方法名字是isAutoAck,但內部確實判斷是否為NONE),那麼會呼叫this.channel.basicReject。因此傳送否定確認,最終不斷收到重複傳送的訊息。

關於否認可以參考下面的連結。

2.對訊息進行手工確認

為了在Spring-amqp框架中進行手工確認,在接收訊息時需要實現如下的介面。

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;

}

此外消費者的確認模式需要配置為manual,其中確認模式包括NONE,MANUL,與AUTO三種[1]。

    <rabbit:listener-container 
        connection-factory="connectionFactory" acknowledge="manual">

那麼當收到訊息後,如果要否認,或確認則通過呼叫channel物件的下面的兩個方法即可。其中basicAck進行確認,而basicNack進行否認。

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        throw new IllegalArgumentException("Illegal");
        channel.basicAck(deliveryTag, false);
        channel.basicNack(deliveryTag, false, true);

上面程式碼中deliveryTag即訊息消交付的一個標識,其作用域為channel。而basicNAck與basicReject都可以進行否則,二者區別參考下面的官網解釋。

當在onMessage方法中呼叫basicAck確認訊息後,佇列中持久化的訊息會被刪除。而呼叫basicNack後,會收到rabbitmq重發的訊息。若未呼叫basicAck確認則訊息會產生堆積[2]。那麼當消費者下再一次連線rabbitmq時訊息會重發給消費者。