幾種實現延時任務的方式(三)
上篇文章介紹了使用Redis來實現延時任務,這是一個比較好的方案,但是這種方式是把Redis作為訊息佇列去使用,而Redis作為訊息佇列還是有一些缺點的:
- Redis本身沒有提供監控、管理介面,需要自己去實現。我們無法方便的知道現在佇列的情況,比如是否有積壓,消費情況是如何的,生產情況又是如何的。
- 訊息可能被重複消費,如果是冪等性操作也沒什麼,但是如果非冪等性操作,就需要其他的解決方案來解決這個問題。
- Redis本身沒有ACK機制,訊息沒有那麼可靠,當然這個缺點在這個案例中,並不是那麼明顯,因為我們可以在該執行的都執行成功了,才去刪除資料。
...
當然最根本的問題是Redis本身就不是為了佇列而生的,它是為了儲存而生的,所以它缺少一些佇列才有的功能也是“情理之中”的。不過,Redis5引進了Stream,據說 這也是一個功能很強大的佇列,但是我還沒去看。這裡就不說了。
在本節中,我將用RabbitMQ來實現延時任務。
關於RabbitMQ的安裝,我就不做介紹了,網上都有,而且沒有什麼難度。
在使用方面,RabbitMQ比Redis難很多,畢竟使用的比較少,而且不少公司都對MQ進行了封裝,使其更好用,但是同時也隱藏了MQ在使用方面的不少細節。
從基本沒有接觸過RabbitMQ,到要使用RabbitMQ來完成延時任務,也是一個"跳躍性"的任務。我們應該先了解RabbitMQ一些基礎概念,基本使用 等等。僅僅靠一兩句話是遠遠不夠的。本文的主題在於“使用RabbitMQ來完成延時任務”。所以在這裡我預設大家都有一定的RabbitMQ使用經驗了。
好了,讓我們開始吧。
首先,讓我們引進兩個名詞:
- TTL、死信:
Time To Live,這個名詞也說不上是一個新名詞,Redis中也有,就是 存活時間,也就是我們經常說的過期時間了,放在MQ裡面,特指 訊息的存活時間。訊息超過了存活時間,就認為這個訊息“死”了,稱之為“死信”。 - Dead Letter Exchange
死信交換器。建立死信交換器和建立其他交換器沒什麼區別,只是我們需要告訴佇列,死信需要被推送到死信交換器上。
對於生產者來說,需要建立一個Connection連線,接著在Connection中建立一個Channel,通過Channel申明兩個交換器,一個是 用來接收訂單資料的交換器,一個是用來接收超時訂單資料的交換機,然後申明兩個佇列,一個是訂單資料佇列,並且需要告訴這個佇列,如果有訊息超時了,需要轉發到 “用來接收超時訂單資料的交換機”,還要申明一個超時訂單資料佇列。然後把 “用來接收訂單資料的交換器”和“訂單資料佇列”進行繫結,把“用來接收超時訂單資料的交換機”和“超時訂單資料佇列”進行繫結。前置準備工作才算完成,下面就是通過Channel往 “用來接收訂單資料的交換器”推資料了。
為了幫助大家更好的理解,我簡單的畫了一張圖:

希望大家看了文字之後,再對照圖片,可以有所理解。
對於生產者來說,就比較簡單了,前置工作就是建立Connection連線,再建立Channel,然後通過Channel,消費 “超時訂單資料佇列” 就OK了。
下面我直接放出程式碼:
需要在pom中引入依賴:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> </dependency>
public class Main { static ConnectionFactory connectionFactory; static Connection connection; static { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try { connection = connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { producer(); Thread thread = new Thread(() -> { try { consume(); } catch (Exception e) { e.printStackTrace(); } }); thread.start(); } private static void producer() throws Exception { Channel channel = connection.createChannel();//建立一個channel,不管是生產資料,還是消費資料,都是通過channel去操作的 channel.exchangeDeclare("orderExchange", "direct", true);//定義一個交換機,路由型別為direct,所有的訂單會塞給此交換機 channel.exchangeDeclare("orderDelayExchange", "direct", true);//定義一個交換機,路由型別為direct,延遲的訂單會塞給此交換機 HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "orderDelayExchange");//申明死信交換機是名稱為orderDelayExchange的交換機 channel.queueDeclare("order_queue", true, false, false, arguments);//定義一個名稱為order_queue的佇列,繫結上面定義的引數,這樣就告訴rabbit此佇列延遲的訊息,傳送給orderDelayExchange交換機 channel.queueDeclare("order_delay_queue", true, false, false, null);//定義一個名稱為order_delay_queue的佇列 channel.queueBind("order_queue", "orderExchange", "delay");//order_queue和orderExchange繫結,路由為delay。路由也為delay的訊息會通過orderExchange進入到order_queue佇列 channel.queueBind("order_delay_queue", "orderDelayExchange", "delay");//order_delay_queue和orderDelayExchange繫結 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("15000");//設定訊息TTL(訊息生存時間) builder.deliveryMode(2);//設定訊息持久化 AMQP.BasicProperties properties = builder.build(); Thread productThread = new Thread(() -> { for (int i = 0; i < 20; i++) { String order = "order" + i; try { channel.basicPublish("orderExchange", "delay", properties, order.getBytes());//通過channel,向orderExchange交換機發送路由為delay的訊息,這樣就可以進入到order_queue佇列 String str = "現在時間是" + new Date().toString() + "" + order + "的訊息產生了"; System.out.println(str); } catch (IOException e) { e.printStackTrace(); } } try { channel.close(); } catch (Exception ex) { } }); productThread.start(); } private static void consume() throws Exception { Channel channel = connection.createChannel();//建立一個channel,不管是生產資料,還是消費資料,都是通過channel去操作的 //消費名稱為order_delay_queue的佇列,且關閉自動應答,需要手動應答 channel.basicConsume("order_delay_queue", false, new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag();//訊息的標記,應答的時候需要傳入這個引數 String str = "現在時間是" + new Date().toString() + "" + new String(body) + "的訊息消費了"; System.out.println(str); channel.basicAck(deliveryTag, false);//手動應答,代表這個訊息處理完成了 } }); } }
下面我們執行一下:

程式碼註釋寫的還是比較清晰的,希望大家可以看懂吧。
這一節,我沒有像上兩節一樣,講的那麼細,因為如果從RabbitMQ的基礎講起,可能需要三四章的內容來做鋪墊,這就脫離主題了。如果有機會的話,我會再花一個系列去介紹RabbitMQ。
好了,實現延時任務系列到這裡就結束了,當然我這裡只是拋磚引玉,大家肯定還有不少更好的實現方式。