1. 程式人生 > >RabbitMQ 延遲佇列實現訂單支付結果非同步階梯性通知

RabbitMQ 延遲佇列實現訂單支付結果非同步階梯性通知

在第三方支付中,例如支付寶、或者微信,對於訂單請求,第三方支付系統採用的是訊息同步返回、非同步通知+主動補償查詢的補償機制。

由於網際網路通訊的不可靠性,例如雙方網路、伺服器、應用等因素的影響,不管是同步返回、非同步通知、主動查詢報文都可能出現超時無響應、報文丟失等情況,所以像支付業務,對結果的通知一般採用幾種方案結合的補償機制,不能完全依賴某一種機制。
例如一個支付結果的通知,一方面會在支付頁面跳轉時候返回支付結果(一般只用作前端展示使用,非最終狀態),同時會採用後臺非同步通知機制(有前臺、後臺通知的,以後臺非同步通知結果為準),但由於前臺跳轉、後臺結果通知都可能失效,因此還以定時補單+請求方主動查詢介面作為輔助手段。
常見的補單操作,任務排程策略一般設定30秒、60秒、3分鐘、6分鐘、10分鐘排程多次(以自己業務需要),如果排程接收到響應確認報文,補單成功,則中止對應訂單的排程任務;如果超過補單上限次數,則停止補單,避免無謂的資源浪費。請求端隨時可以發起請求報文查詢對應訂單的狀態。
在日常開發中,對於網站前端來說,支付計費中心對於訂單請求資訊的處理也是通過訊息同步返回、非同步通知+主動補償查詢相結合的機制,其中對於訂單的非同步通知,目前的通知策略為3s、30s、60s、120s、180、300s的階梯性通知。返回成功情況下就不繼續通知了,本來打算使用將失敗的訊息寫到資料庫等待發送,然後每秒查詢資料庫獲取訊息通知前端。但覺得這樣的處理方式太粗暴。存在以下缺點:
1 、每秒請求有點兒浪費資源; 2 、通知方式不穩定; 3 、無法承受大資料量等等
所以最終打算使用rabbitmq的訊息延遲+死信佇列來實現。訊息模型如下:
producer釋出訊息,通過exchangeA的訊息會被分發到QueueA,Consumer監聽queueA,一旦有訊息到來就被消費,這邊的消費業務就是通知前端,如果通知失敗,就建立一個延遲佇列declareQueue,設定每個訊息的ttl然後通過declare_exchange將訊息分發到declare_queue,因為declare_queue沒有consumer並且declare_queue中的訊息設定了ttl,當ttl到期後,將通過DEX路由到queueA,被重新消費。
程式碼如下:DeclareQueue.java

    
  1. package org.delayQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class DeclareQueue {
  7. public static String EXCHANGE_NAME = "notifyExchange";
  8. public static void init() {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost( "localhost");
  11. factory.setPort( 5672);
  12. Connection connection = null;
  13. try {
  14. connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  17. String routingKey = "AliPaynotify";
  18. String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043d0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
  19. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
  20. System.out.println( " [x] Sent :" + message);
  21. } catch (Exception e) {
  22. // TODO Auto-generated catch block
  23. e.printStackTrace();
  24. } finally {
  25. if (connection != null) {
  26. try {
  27. connection.close();
  28. } catch (Exception ignore) {
  29. }
  30. }
  31. }
  32. }
  33. public static void main(String args[]) {
  34. init();
  35. }
  36. }

DeclareConsumer.java

    
  1. package org.delayQueue;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.util.ArrayList;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Map.Entry;
  10. import org.apache.http.HttpResponse;
  11. import org.apache.http.client.ClientProtocolException;
  12. import org.apache.http.client.HttpClient;
  13. import org.apache.http.client.methods.HttpPost;
  14. import org.apache.http.impl.client.DefaultHttpClient;
  15. import com.rabbitmq.client.AMQP;
  16. import com.rabbitmq.client.Channel;
  17. import com.rabbitmq.client.Connection;
  18. import com.rabbitmq.client.ConnectionFactory;
  19. import com.rabbitmq.client.Consumer;
  20. import com.rabbitmq.client.DefaultConsumer;
  21. import com.rabbitmq.client.Envelope;
  22. public class DeclareConsumer {
  23. public static String EXCHANGE_NAME = "notifyExchange";
  24. public static String QU_declare_15S = "Qu_declare_15s";
  25. public static String EX_declare_15S = "EX_declare_15s";
  26. public static String ROUTINGKEY = "AliPaynotify";
  27. public static Connection connection = null;
  28. public static Channel channel = null;
  29. public static Channel DECLARE_15S_CHANNEL = null;
  30. public static String declare_queue = "init";
  31. public static String originalExpiration = "0";
  32. public static void init() throws Exception {
  33. ConnectionFactory factory = new ConnectionFactory();
  34. factory.setHost("localhost");
  35. factory.setPort(5672);
  36. connection = factory.newConnection();
  37. channel = connection.createChannel();
  38. DECLARE_15S_CHANNEL = connection.createChannel();
  39. }
  40. public static void consume() {
  41. try {
  42. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  43. final String queueName = channel.queueDeclare().getQueue();
  44. channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
  45. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  46. final Consumer consumer = new DefaultConsumer(channel) {
  47. @Override
  48. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  49. String message = new String(body, "UTF-8");
  50. Map <String, Object> headers = properties.getHeaders();
  51. if (headers != null) {
  52. List <Map<String, Object>> xDeath = (List <Map<String, Object>>) headers.get("x-death");
  53. System.out.println("xDeath--- > " + xDeath);
  54. if (xDeath != null && !xDeath.isEmpty()) {
  55. Map <String, Object> entrys = xDeath.get(0);
  56. // for(Entry <String, Object>
  57. // entry:entrys.entrySet()){
  58. // System.out.println(entry.getKey()+":"+entry.getValue());
  59. // }
  60. originalExpiration = entrys.get("original-expiration").toString();
  61. }
  62. }
  63. System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());
  64. HttpClient httpClient = new DefaultHttpClient();
  65. HttpPost post = new HttpPost(message);
  66. HttpResponse response = httpClient.execute(post);
  67. BufferedReader inreader = null;
  68. if (response.getStatusLine().getStatusCode() == 200) {
  69. inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
  70. StringBuffer responseBody = new StringBuffer();
  71. String line = null;
  72. while ((line = inreader.readLine()) != null) {
  73. responseBody.append(line);
  74. }
  75. if (!responseBody.equals("success")) {
  76. // putDeclre15s(message);
  77. if (originalExpiration.equals("0")) {
  78. putDeclreQueue(message, 3000, QU_declare_15S);
  79. }
  80. if (originalExpiration.equals("3000")) {
  81. putDeclreQueue(message, 30000, QU_declare_15S);
  82. }
  83. if (originalExpiration.equals("30000")) {
  84. putDeclreQueue(message, 60000, QU_declare_15S);
  85. }
  86. if (originalExpiration.equals("60000")) {
  87. putDeclreQueue(message, 120000, QU_declare_15S);
  88. }
  89. if (originalExpiration.equals("120000")) {
  90. putDeclreQueue(message, 180000, QU_declare_15S);
  91. }
  92. if (originalExpiration.equals("180000")) {
  93. putDeclreQueue(message, 300000, QU_declare_15S);
  94. }
  95. if (originalExpiration.equals("300000")) {
  96. // channel.basicConsume(QU_declare_300S,true, this);
  97. System.out.println("finish notify");
  98. }
  99. }
  100. } else {
  101. System.out.println(response.getStatusLine().getStatusCode());
  102. }
  103. }
  104. };
  105. channel.basicConsume(queueName, true, consumer);
  106. } catch (Exception e) {
  107. e.printStackTrace();
  108. } finally {
  109. }
  110. }
  111. static Map <String, Object> xdeathMap = new HashMap <String, Object>();
  112. static List <Map<String, Object>> xDeath = new ArrayList <Map<String, Object>>();
  113. static Map <String, Object> xdeathParam = new HashMap <String, Object>();
  114. public static void putDeclre15s(String message) throws IOException {
  115. channel.exchangeDeclare(EX_declare_15S, "topic");
  116. Map <String, Object> args = new HashMap <String, Object>();
  117. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
  118. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  119. builder.expiration("3000").deliveryMode(2);// 設定訊息TTL
  120. AMQP.BasicProperties properties = builder.build();
  121. channel.queueDeclare(QU_declare_15S, false, false, false, args);
  122. channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
  123. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
  124. System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
  125. }
  126. public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
  127. channel.exchangeDeclare(EX_declare_15S, "topic");
  128. Map <String, Object> args = new HashMap <String, Object>();
  129. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
  130. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  131. builder.expiration(String.valueOf(mis)).deliveryMode(2);// 設定訊息TTL
  132. AMQP.BasicProperties properties = builder.build();
  133. channel.queueDeclare(queue, false, false, false, args);
  134. channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
  135. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
  136. System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
  137. }
  138. public static void main(String args[]) throws Exception {
  139. init();
  140. consume();
  141. }
  142. }
訊息 通過dlx轉發的情況下,header頭部會帶有x-death的一個數組,裡面包含訊息的各項屬性,比如說訊息成為死信的原因reason,original-expiration這個欄位表示訊息在原來佇列中的過期時間,根據這個值來確定下一次通知的延遲時間應該是多少秒。 執行結果如下: