1. 程式人生 > >RabbitMQ項目使用之死信隊列

RabbitMQ項目使用之死信隊列

args 超時時間 and title ron asi res listener 形式

消息消費失敗處理方式:

一 進入死信隊列(進入死信的三種方式)

1.消息被拒絕(basic.reject or basic.nack)並且requeue=false

2.消息TTL過期過期時間

3.隊列達到最大長度

DLX也是一下正常的Exchange同一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列, publish可以監聽這個隊列中消息做相應的處理, 這個特性可以彌補R abbitMQ 3.0.0以前支持的immediate參數中的向publish確認的功能。



rabbitmq的三種模式:

一. Fanout Exchange 廣播

所有發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每臺子網內的主機都獲得了一份復制的消息。所以,Fanout Exchange 轉發消息是最快的。

二. Direct Exchange 點對點

所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。

三. Topic Exchange 模糊匹配

所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上,Exchange 將RouteKey 和某Topic 進行模糊匹配。此時隊列需要綁定一個Topic。可以使用通配符進行模糊匹配,符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“log.#”能夠匹配到“log.info.oa”,但是“log.*” 只會匹配到“log.error”。所以,Topic Exchange 使用非常靈活。

死信隊列實現:

詳見官網說明: http://www.rabbitmq.com/ttl.html

在聲明期間使用x參數為隊列定義消息TTL

如:spring 中配置如下:

<rabbit:connection-factory id="connectionFactory" host="47.104.203.101" password="admin"
username="admin" port="5672"
channel-cache-size="30" virtual-host="/citpay"
publisher-confirms="true"
publisher-returns="true"/>


<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true"
confirm-callback="payOrderConfirmCallBack"
return-callback="payOrderReturnCallBack"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue id="pay_order_queue" name="citpay.pay_order_queue" durable="true" auto-delete="false" >
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">5000</value>
</entry>
<entry key="x-dead-letter-exchange">
<value type="java.lang.String">citpay.pay_order_dead_exchange</value>
</entry>
</rabbit:queue-arguments>
</rabbit:queue>

<rabbit:direct-exchange name="citpay.direct_pay_order_exchange" id="direct_pay_order_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="citpay.pay_order_queue" key="pay_order_routekey"/>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:queue id="pay_dead_order_queue" name="citpay.pay_dead_order_queue" durable="true" auto-delete="false"></rabbit:queue>

<rabbit:direct-exchange name="citpay.pay_order_dead_exchange" id="pay_order_dead_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="citpay.pay_dead_order_queue" key="pay_order_routekey"/>
</rabbit:bindings>
</rabbit:direct-exchange>


<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" max-concurrency="1" >
<rabbit:listener queues="citpay.pay_dead_order_queue"
ref="payOrderDeadReceiveConfirmListener" />
</rabbit:listener-container>



使用策略為隊列定義隊列TTL

  命令形式:
rabbitmqctl set_policy expiry“。*”‘{“expires”:1800000}‘ - apply-to queues

管理控制臺設置策略:

技術分享圖片



RabbitMQ項目使用之死信隊列