1. 程式人生 > >Spring-amqp 1.6.1 生產者與消費者訊息確認配置與使用

Spring-amqp 1.6.1 生產者與消費者訊息確認配置與使用

通過Publisher Confirms and Returns機制,生產者可以判斷訊息是否傳送到了exchange及queue,而通過消費者確認機制,Rabbitmq可以決定是否重發訊息給消費者,以保證訊息被處理。

1.什麼是Publisher Confirms and Returns?

Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms.
地址:

http://www.rabbitmq.com/confirms.html

根據RabbitMq官網定義,rabbitmq代理(broker)對釋出者(publishers)的確認被稱作釋出者確認(publisher confirms),這種機制是Rabbitmq對標準Amqp協議的擴充套件。因此通過這種機制可以確認訊息是否傳送給了目標。

2.如何通過Spring amqp來使用Publisher Confirms and Returns機制?

Confirmed and returned messages are supported by setting the CachingConnectionFactory’s publisherConfirms and publisherReturns properties to ‘true’ respectively.When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections.

http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html#cf-pub-conf-ret

通過Spring amqp文件可以看到,要使用這種機制需要將Template模版的設publisherConfirms 或publisherReturns 屬性設定為true,此外ConnectionFactory要配置為CachingConnectionFactory。

    <bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"
> <property name="host" value="192.168.2.133" /> <property name="port" value="5672" /> <property name="username" value="sun" /> <property name="password" value="123456" /> <property name="publisherConfirms" value="true" /> <property name="publisherReturns" value="true" /> </bean>

2.1 ConfirmCallback的使用及觸發的一種場景


        RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
        int i = 0;
        template.setMandatory(true);
        if(!template.isConfirmListener()){
            template.setConfirmCallback(new ConfirmCallback() {
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
                }
            });
        }

isConfirmListener由RabbitTemplate 提供,用於判斷是否建立了這個物件。

    @Override
    public boolean isConfirmListener() {
        return this.confirmCallback != null;
    }

而ConfirmListener是當訊息無法傳送到Exchange被觸發,此時Ack為False,這時cause包含傳送失敗的原因,例如exchange不存在時,cause的內容如下。

ack: false. correlationData: nullcause : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchange' in vhost '/', class-id=60, method-id=40)

而訊息可以傳送到Exchange時Ack為true。在傳送時可以通過使用RabbitTemplate 下面的方法來進行驗證。

convertAndSend(exchange, routingKey, object, correlationData);

correlationData是回撥時傳入回撥方法的引數,因此通過這個屬性來區分訊息,並進行重發。

2.2 ReturnCallback的使用及觸發的一種場景

ReturnCallBack使用時需要通過RabbitTemplate 的setMandatory方法設定變數mandatoryExpression的值,該值可以是一個表示式或一個Boolean值。當為TRUE時,如果訊息無法傳送到指定的訊息佇列那麼ReturnCallBack回撥方法會被呼叫。

與isConfirmListener類似,也有一個isReturnListener方法,但這個方法在1.6.1版本中返回true。

    @Override
    public boolean isReturnListener() {
        return true;
    }

設定ReturnCallBack回撥。

            template.setReturnCallback(new ReturnCallback() {
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
                }
            });

傳送訊息驗證ReturnCallBack。

convertAndSend(String routingKey, final Object object, CorrelationData correlationData)

template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));

在測試程式碼中指定了一個routingKey,但通過這個routingKey,Exchange無法將訊息路由到任何佇列,因此導致ReturnCallBack被觸發,最後返回的資訊如下。

text: NO_ROUTE code: 312 exchange: myExchange routingKey :not exist routing key abc

此外在vhost中若找不到Exchange時,confirmCallBack會被觸發,而returnCallBack不會被觸發,原因參見下面的回答。

3.消費者對訊息的確認

Rabbitmq 官網教程第2課講述瞭如何通過Rabbitmq 提供的Api來實現消費者確認訊息已經處理。

而在Spring-rabbitmq中,預設啟用的是自動確認機制,當消費端在回撥方法中接收到訊息後,併成功處理,且未丟擲任何異常,那麼訊息會被確認。如果執行過程中產生異常,那麼Rabbitmq會嘗試重複傳送訊息給消費者。這點可以檢視原始碼,並通過在訊息處理方法中丟擲異常來驗證。當產生異常後,通過ip:15672登入Rabbitmq管理的Web頁面,選擇佇列選項可以看到未處理的訊息的數量(Unacked)。這個數量是與
<rabbit:listener-container/>
配置中的prefetchCount(預取數相關)。

因此消費者對訊息的確認最基礎的使用不需要額外進行配置。

若要在消費方配置相關引數,可以參考官方文件3.1.5小節。

4.配置多個消費者並關聯到同一個佇列

如下,若配置多個消費者關聯到一個佇列,那麼當傳送多條訊息時(訊息數量大於消費者數量時),那麼佇列中的訊息會輪流分發給各個消費者。
consumer,consumer2,consumer,consumer2,….的順序。

    <rabbit:listener-container 
        connection-factory="connectionFactory" prefetch="1" receive-timeout="4000">
        <rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
        <rabbit:listener ref="consumer2" method="listen" queue-names="myQueue" />
    </rabbit:listener-container>

如果其中一個消費者接收訊息後,併產生異常,那麼該訊息會發送給下一個消費者。例如consumer,在listen丟擲異常,那麼這條訊息會發送給consumer2。

5.示例中的配置與程式碼

    <bean id="connectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="192.168.2.133" />
        <property name="port" value="5672" />
        <property name="username" value="sun" />
        <property name="password" value="123456" />
        <property name="publisherConfirms" value="true" />
        <property name="publisherReturns" value="true" />
    </bean>

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="myQueue" id="myQueue"  durable="true"
        auto-delete="false" exclusive="false" />

    <rabbit:queue name="myQueue" id="myQueue2" durable="true"
        auto-delete="false" exclusive="false" />

    <rabbit:direct-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue" key="sun1" />
            <rabbit:binding queue="myQueue" key="sun2" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
        exchange="myExchange" routing-key="sunv5" />

    <rabbit:listener-container
        connection-factory="connectionFactory">
        <rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
    </rabbit:listener-container>

    <bean id="consumer" class="springamqp.Foo" />
        ClassPathXmlApplicationContext ctx = 
                new ClassPathXmlApplicationContext("classpath:rabbit-context.xml");

        RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
        template.setMandatory(true);
        if(!template.isConfirmListener()){
            template.setConfirmCallback(new ConfirmCallback() {
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
                }
            });
        }
            template.setReturnCallback(new ReturnCallback() {
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
                }
            });
        int i = 0;
        while(i < 50) {
            template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));
            Thread.sleep(7000);
        }
public class Foo {
     public void listen(String foo) throws InterruptedException {
            System.out.println("get msg");
            System.out.println(foo);
            Thread.sleep(2000);
            // 丟擲異常,觀察訊息重發
            throw new NullPointerException();
     }  
}

一些參考: