1. 程式人生 > >RabbitMQ使用延遲佇列實現一次性定時任務(php版)

RabbitMQ使用延遲佇列實現一次性定時任務(php版)

本文建立在讀者對RabbitMQ的基礎瞭解上

本文延遲佇列實現參照 https://blog.csdn.net/u012119576/article/details/74677835

對相關概念的理解參照 https://blog.csdn.net/samxx8/article/details/47417133

作為phper在實現諸如“課程開啟後十分鐘推送訊息”,"訂單生成後多少分鐘自動取消"這類問題上會有一些問題,目前我能想到的三種解決方案有:

               1. swoole有settimeout

               2.exec設定linux系統一次性定時器  at命令(atd包) ,windows伺服器上好像直接可以設定,但是最低間隔是一分鐘還是30s,具體忘了。。。

               3.crontab設定定時器檢查(不建議。。。)

最近在弄RabbitMQ時,發現可以使用延遲佇列實現這類需求。具體原理是新建兩條佇列繫結對應的交換機,其中一條設定訊息延遲執行,在到期後使用交換機丟到與客戶端連線的佇列中,傳送給客戶端,具體參見程式碼。

send.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
//給cache傳送  使其過期然後定向到另一個
//宣告兩個佇列
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_exchange', 'direct',false,false,false);

$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', 'delay_exchange');//****很關鍵  表示過期後由哪個exchange處理
$tale->set('x-dead-letter-routing-key','delay_exchange');//****很關鍵  表示過期後由哪個exchange處理
//$tale->set('x-message-ttl',15000);  //存活時長   下面的過期時間不能超過

$channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
$channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');

$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');


$msg = new AMQPMessage('Hello World'.'3000',array(
    'expiration' => intval(18000),
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

));

$channel->basic_publish($msg,'cache_exchange','cache_exchange');
echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;

$channel->close();
$connection->close();

reciever.php

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
$channel->exchange_declare('cache_exchange', 'direct',false,false,false);


$channel->queue_declare('delay_queue',false,true,false,false,false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');

echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;

$callback = function ($msg){
    echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;

    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

};

//只有consumer已經處理並確認了上一條message時queue才分派新的message給它
$channel->basic_qos(null, 1, null);
$channel->basic_consume('delay_queue','',false,false,false,false,$callback);


while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();