1. 程式人生 > >基於rabbitMQ 消息延時隊列方案 模擬電商超時未支付訂單處理場景

基於rabbitMQ 消息延時隊列方案 模擬電商超時未支付訂單處理場景

system 方法 只需要 routing new 有效期 conn sta order

前言

傳統處理超時訂單
  • 采取定時任務輪訓數據庫訂單,並且批量處理。其弊端也是顯而易見的;對服務器、數據庫性會有很大的要求,並且當處理大量訂單起來會很力不從心,而且實時性也不是特別好
  • 當然傳統的手法還可以再優化一下,即存入訂單的時候就算出訂單的過期時間插入數據庫,設置定時任務查詢數據庫的時候就只需要查詢過期了的訂單,然後再做其他的業務操作

    jdk延遲隊列 DelayQueue
  • 采取jdk自帶的延遲隊列能很好的優化傳統的處理方案,但是該方案的弊、端也是非常致命的,所有的消息數據都是存於內存之中,一旦宕機或重啟服務隊列中數據就全無了,而且也無法進行擴展。
  • rabbitMQ延時隊列方案
    rabbitmq我就不多介紹了,一臺普通的rabbitmq服務器單隊列容納千萬級別的消息還是沒什麽壓力的,而且rabbitmq集群擴展支持的也是非常好的,並且隊列中的消息是可以進行持久化,即使我們重啟或者宕機也能保證數據不丟失

    術語 (詳情請參照官網文檔:http://www.rabbitmq.com/admin-guide.html)

    存活時間(Time-To-Live 簡稱 TTL),分別有三種TTL的設置模式
  • x-message-ttl ,該屬性是在創建隊列的時候 ,在arguments的map中配置;該參數的作用是設置當前隊列中所有的消息的存活時間
  • x-expires 該屬性也是在arguments中配置;其作用是設置當前隊列在N毫秒中(不能為0,且為正整數),就刪除該隊列;“未使用”意味著隊列沒有消費者,隊列尚未重新聲明,並且至少在有效期內未調用basicGet (basicGet 是手動拉取指定隊列中的一條消息)
  • AMQP.BasicProperties配置中的exppiration 屬性,前兩者都是基於隊列的TTL,該屬性是基於單條消息的TLL用於配置每條消息在隊列中的存活時間

    死信交換(Dead Letter Exchanges 簡稱 DLX)
  • ”死信交換“ 可以分開來理解 ;首先是 ”死信“,也就是死亡的信息,無效的信息;造成這樣的信息有以下幾種情況
    消息被拒絕,即消費者沒有成功確認消息被消費
    消息TTL過期
    超出隊列長度限制
    當出現這三種情況的時候,隊列中的消息就會變為“死信”

  • 再來理解”交換“ 也就是說,當出現"死信"的情況下 rabbitmq 可以對該"死信"進行交換到別的隊列上,但是交換的前提是需要為死信配置一個交換機用於死信的交換

    ?

代碼實現

配置類 RabbitmqConfiguration
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * TODO rabbitmq配置類
 */  
public class RabbitmqConfiguration { 
    private final String SERVER_HOST="127.0.0.1";//rabbitmq 服務器地址
    private final int PORT=5672;//端口號 
    private final String USER_NAME="test";//用戶名
    private final String PASSWORD="test";//密碼
    private final boolean QUEUE_SAVE =true;//隊列是否持久化
    private final String MESSAGE_SAVE = "1" ;//消息持久化  1,0 
    //rabbitmq 連接工廠
    private final ConnectionFactory RAB_FACTORY = new ConnectionFactory();
    private Connection connection;
  
    public void init() throws Exception{
        RAB_FACTORY.setHost(SERVER_HOST);
        RAB_FACTORY.setPort(PORT); 
        RAB_FACTORY.setUsername(USER_NAME);
        RAB_FACTORY.setPassword(PASSWORD);
        this.connection=RAB_FACTORY.newConnection();
    } 
    public Connection getConnection() {
        return connection;
    }
 
    public void setConnection(Connection connection) {
        this.connection = connection;
    }
 
    public boolean isQUEUE_SAVE() {
        return QUEUE_SAVE;
    }
 
    public String getMESSAGE_SAVE() {
        return MESSAGE_SAVE;
    }
        
}
功能類 OrderOverTimeQueue
import java.io.IOException; 
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * TODO 超時未支付訂單處理消息隊列
 */
public class OrderOverTimeQueue {
    
    private RabbitmqConfiguration rabConf;
      
    //隊列名稱
    //****==================訂單延時隊列=======================*****//
    //訂單延時隊列
    public final String DELAY_QUEUE_NAME = "delay-queue-orderOverTime";
    //訂單延時消費隊列
    public final String CONSUME_QUEUE_NAME = "consume-queue-orderOverTime";
    //訂單延時隊列死信交換的交換器名稱
    public final String EXCHANGENAME = "exchange-orderOverTime";
    //訂單延時隊列死信的交換器路由key
    public final String ROUTINGKEY = "routingKey-orderOverTime";
    
    private Channel delayChannel;//延時隊列連接通道
    
    private Channel consumerChannel;//消費隊列連接通道
    
    public void init() throws Exception{
        //創建連接通道
        delayChannel=rabConf.getConnection().createChannel();  
        consumerChannel=rabConf.getConnection().createChannel();  
        
        //創建交換器
        consumerChannel.exchangeDeclare(EXCHANGENAME,"direct"); 
        
        /**創建處理延時消息的延時隊列*/
        Map <String,Object> arg = new HashMap <String,Object>();
        //配置死信交換器
        arg.put("x-dead-letter-exchange",EXCHANGENAME); //交換器名稱
        //死信交換路由key (交換器可以將死信交換到很多個其他的消費隊列,可以用不同的路由key 來將死信路由到不同的消費隊列去)
        arg.put("x-dead-letter-routing-key", ROUTINGKEY); 
        delayChannel.queueDeclare(DELAY_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, arg);  
        
        /**創建消費隊列*/
        consumerChannel.queueDeclare(CONSUME_QUEUE_NAME, rabConf.isQUEUE_SAVE(), false, false, null);
        //參數1:綁定的隊列名  參數2:綁定至哪個交換器  參數3:綁定路由key
        consumerChannel.queueBind(CONSUME_QUEUE_NAME, EXCHANGENAME,ROUTINGKEY); 
        //最多接受條數 0為無限制,每次消費消息數(根據實際場景設置),true=作用於整channel,false=作用於具體的消費者
        consumerChannel.basicQos(0,10, false);
        
        //創建消費隊列的消費者
        Consumer consumer = new DefaultConsumer(consumerChannel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) 
              throws IOException  {
                String message = new String(body, "UTF-8");
                try {
                    //業務邏輯處理
                    ConsumeMessage(message);  
                    //確認消息已經消費  參數2(true=設置後續消息為自動確認消費  false=為手動確認)
                    consumerChannel.basicAck(envelope.getDeliveryTag(), false);
                }catch (Exception e) { 
                }  
              }
            }; 
                
        boolean flag=false;//是否手動確認消息  true 是  false否 
        consumerChannel.basicConsume(CONSUME_QUEUE_NAME, flag, consumer);
    }
 
    /**
     * 方法描述: 發送延遲訂單處理消息
     * @param msg 消息內容 (訂單號或者json格式字符串)
     * @param overTime 消息存活時間
     * @throws Exception
     */
    public void sendMessage(String msg,Long overTime) throws Exception{ 
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration(overTime.toString()) //設置消息存活時間(毫秒)
                .build();
        delayChannel.basicPublish("",DELAY_QUEUE_NAME, properties, msg.getBytes("UTF-8"));
    }
    
    /**
     * 
     * 方法描述: 
     * 業務邏輯說明: TODO(總結性的歸納方法業務邏輯) 
     * @param msg 消費消息(訂單號,或特定格式json字符串)
     * @throws InterruptedException
     */
    public void ConsumeMessage(String msg) throws InterruptedException { 
        Thread.sleep(50);//模擬業務邏輯處理
        System.out.println("處理到期消息時間=="+System.currentTimeMillis());
        System.err.println("刪除訂單 order-number  ==  "+msg);
    }
    
    
    public RabbitmqConfiguration getRabConf() {
        return rabConf;
    }
 
 
    public void setRabConf(RabbitmqConfiguration rabConf) {
        this.rabConf = rabConf;
    }
    
    public static void main(String[] args) throws Exception {
        OrderOverTimeQueue ooto=new OrderOverTimeQueue();
        RabbitmqConfiguration rf= new RabbitmqConfiguration();
        rf.init();
        ooto.setRabConf(rf);
        ooto.init();
 
        //模擬用戶產生訂單 消息生存時長為30秒
        ooto.sendMessage("20180907-order-number", 10000l);
        
        System.out.println("創建消息時間=="+System.currentTimeMillis());
        
        
        
    }
    
    
}
最終效果

技術分享圖片

如果消息還存活的話,在延遲隊列中的“ready”和“total”中都會存在相應的消息記錄數

技術分享圖片

寫的比較粗糙 歡迎大家發表自己的觀點 >_< !

基於rabbitMQ 消息延時隊列方案 模擬電商超時未支付訂單處理場景