1. 程式人生 > >php rabbitmq延遲佇列示例

php rabbitmq延遲佇列示例

<?php
/**
 * Created by PhpStorm.
 * User: he
 * Date: 17-7-17
 * Time: 下午5:38
 */

namespace AcmeBundle\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitBase
{

    /**
     * 場景死信收容佇列
     * @var array
     */
    private static $scene_out_queue = [
        self::TEST => 'test_queue'
, // 測試佇列 ]; /** * 場景死信收容交換機 * @var array */ private static $scene_out_exchange = [ self::TEST => 'test.exchange', // 測試佇列 ]; /** * 佇列延遲時間 | 毫秒時間 * @var array */ private static $ttl_time = [ self::TEST => 10000, // 86400*3 ]; /** * 場景列表 * @var
array */
private static $scene_list = [ 'TEST' => 'TEST', // 測試佇列 ]; const TEST = 'TEST';// 測試佇列 /** * 管道連線 * @type object * @var AMQPStreamConnection */ private $connection; /** * 交換機 * @type object * @var \PhpAmqpLib\Channel\AMQPChannel */
private $channel; /** * 佇列名 * @var string */ private $queue_name; /** * 交換機名 * @var string */ private $exchange_name; /** * 場景引數 * @var string */ private static $time_scene; /** * 是否持久化 * @var bool */ private static $is_durable = true; /** * 是否延遲 * @var bool */ private static $is_delay = false; /** * 當前交換機 * @var string */ private static $delay_exchange; /** * 當前佇列 * @var string */ private static $delay_queue; /** * 延遲佇列引數 * @var array */ private static $arguments = []; /** * 交換機型別 * @var string */ private static $type = 'fanout'; /** * 0-9-1 SIG * @link http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 * @var string */ private static $T_STRING_SHORT = 'S'; /** * 0-9-1 SIG * @link http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 * @var string */ private static $T_INT_LONG = 'I'; /** * RabbitBase constructor. * @param array $config // mq配置引數 */ public function __construct(array $config) { $this->connection = new AMQPStreamConnection( $config['host']??'', $config['port']??'', $config['user']??'', $config['pwd']??'', $config['vhost']??'' ); if(!$this->getCloseStatus()){ // throw new \Exception('AMQP Connection fail'); echo 'ERROR: AMQP Connection Fail';exit; } $this->channel = $this->connection->channel(); } /** * 設定私有屬性 * @param $name * @param $value */ public function __set($name, $value) { // TODO: Implement __set() method. $this->$name = $value; } /** * 獲取私有屬性 * @param $name // 屬性名 * @return null */ public function __get($name) { // TODO: Implement __get() method. return isset($this->$name)? $this->$name : null; } /** * 獲取場景列表 * @return array */ public function getSceneList() { return self::$scene_list; } /*****************************************************佇列服務******************************************************/ /** * 開啟管道 * * TODO: is_delay 是true時需後面引數 * TODO: is_delay 是true時需要 time_scene引數,否則會返回異常資訊 * TODO: queue_scene 或 exchange_scene 為null時預設使用time_scene * * @param string $queue_name 佇列名 * @param string $exchange_name 交換機名 * @param bool $is_receive 是否是處理程式 * @param bool $is_delay 是否延遲 * @param string|null $time_scene 延遲時間 參考 self::$ttl_time * @param string|null $queue_scene 延遲場景佇列 參考 self::$scene_out_queue * @param string|null $exchange_scene 延遲場景交換機 參考 self:$scene_out_exchange * @throws \Exception * @return mixed|null|string */ public function open( string $queue_name, string $exchange_name, bool $is_receive = false, bool $is_delay = false, string $time_scene = null, string $queue_scene = null, string $exchange_scene = null ) { $this->queue_name = $queue_name; $this->exchange_name = $exchange_name; self::$time_scene = $time_scene; self::$delay_exchange = empty($exchange_scene)?$time_scene:$exchange_scene; self::$delay_queue = empty($queue_scene)?$time_scene:$queue_scene; self::$is_delay = $is_delay; try { // TODO: 設定延遲引數 self::getArguments(); // TODO: 當開始處理管道內訊息時,防止管道未建立引起異常 if (true === $is_receive) { // 建立交換機 $this->getExchangeDeclare($this->exchange_name, self::$type, self::$is_durable); // 建立佇列 $this->getQueueDeclare($this->queue_name, self::$is_durable, self::$arguments); // 佇列和交換機繫結 $this->getQueueBind($this->queue_name, $this->exchange_name); } return true; } catch (\Exception $e){ return 'Info:'.$e->getMessage().' Line:'.$e->getLine().' File:'.$e->getFile(); } } /** * 加入佇列 * @param array $data * @throws \Exception * @return bool|string */ public function send(array $data) { try{ if (empty($this->queue_name) || empty($this->exchange_name)) { throw new \Exception('arguments queue name or exchange name error'); } // TODO: Implement 建立超時收容佇列和交換機 empty(self::$time_scene)?:$this->createOutQueue( self::$scene_out_queue[self::$time_scene]??'', self::$scene_out_exchange[self::$time_scene]??'' ); // TODO: Implement 建立交換機 $this->getExchangeDeclare($this->exchange_name, self::$type, self::$is_durable); // TODO: Implement 建立佇列 $this->getQueueDeclare($this->queue_name, self::$is_durable, self::$arguments); // TODO: Implement 佇列和交換機繫結 $this->getQueueBind($this->queue_name, $this->exchange_name); // TODO: Implement 加入訊息到佇列 $this->getBasicPublish($data, $this->queue_name); return $this->close(); } catch (\Exception $e){ return 'Info:'.$e->getMessage().' Line:'.$e->getLine().' File:'.$e->getFile(); } } /** * 處理佇列 * @param string $queue * @param null $callback * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param null $ticket * @param array $arguments */ public function receive( $queue = '', $callback = null, $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $ticket = null, $arguments = array() ) { $this->channel->basic_consume( $queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments ); } /** * * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. */ public function wait() { while(count($this->channel->callbacks)) { $this->channel->wait(); } } /*****************************************************私有服務******************************************************/ /** * 關閉連線 * @return bool */ private function close() { $this->closeConnection(); $this->closeChannel(); return !$this->getCloseStatus(); } /** * 建立超時佇列和交換機 * @param string $queue_name * @param string $exchange_name * @throws \Exception */ private function createOutQueue(string $queue_name = '', string $exchange_name = '') { if(empty($queue_name) || empty($exchange_name)){ throw new \Exception('queue name or exchange name is empty'); } if(true === self::$is_delay){ // TODO: 建立延遲交換機 $this->getExchangeDeclare($exchange_name, self::$type, self::$is_durable); // TODO: 建立延遲佇列 $this->getQueueDeclare($queue_name, self::$is_durable); // TODO: 佇列和交換機繫結 $this->getQueueBind($queue_name, $exchange_name); } } /** * 設定超時轉移佇列引數 * @throws \Exception */ private static function getArguments() { if(true === self::$is_delay){ if(!empty(self::$delay_queue) && !empty(self::$delay_exchange)){ self::$arguments = array( "x-message-ttl" => array(self::$T_INT_LONG, self::$ttl_time[self::$time_scene]), ); self::setDelayExchange(); self::setDelayQueue(); } else { throw new \Exception(__METHOD__."delay arguments error"); } } } /** * 設定延遲佇列queue * @throws \Exception */ private static function setDelayQueue() { $p = ["x-dead-letter-routing-key" => array( self::$T_STRING_SHORT, self::$scene_out_queue[self::$delay_queue]) ]; if(true === self::$is_delay && !empty(self::$delay_queue)){ self::$arguments = empty(self::$arguments)?:array_merge(self::$arguments, $p); } else { throw new \Exception(__METHOD__.'delay arguments error'); } } /** * 設定延遲佇列exchange * @throws \Exception */ private static function setDelayExchange() { $p = ["x-dead-letter-exchange" => array(self::$T_STRING_SHORT, self::$scene_out_exchange[self::$delay_exchange])]; if(true === self::$is_delay && !empty(self::$delay_exchange)){ self::$arguments = empty(self::$arguments)?:array_merge(self::$arguments, $p); } else { throw new \Exception(__METHOD__.'delay arguments error'); } } /** * 關閉連線 * @return mixed|null */ private function closeConnection() { return $this->connection->close(); } /** * 關閉交換機連線 * @return mixed */ private function closeChannel() { return $this->channel->close(); } /** * 獲取管道連線 * @return AMQPStreamConnection */ public function getConnect() : AMQPStreamConnection { return $this->connection; } /** * 獲取交換機 * @return \PhpAmqpLib\Channel\AMQPChannel */ public function getChannel() { return $this->channel; } /** * 獲取連線狀態 * @return bool */ private function getCloseStatus(): bool { return (bool)$this->connection->isConnected(); } /** * 宣告一個佇列,不存在則建立 * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param null $arguments * @param null $ticket * @return mixed|null */ private function getQueueDeclare( $queue = '', $durable = true, $arguments = null, $passive = false, $exclusive = false, $auto_delete = false, $nowait = false, $ticket = null ) { return $this->channel->queue_declare( $queue, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket ); } /** * 宣告一個交換機 * @param $exchange * @param $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param null $arguments * @param null $ticket * @return mixed|null */ private function getExchangeDeclare( $exchange, $type, $durable = true, $passive = false, $auto_delete = false, $internal = false, $nowait = false, $arguments = null, $ticket = null ) { return $this->channel->exchange_declare( $exchange, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket ); } /** * 加入佇列 * @param $msg * @param string $exchange * @param string $routing_key * @param bool $mandatory * @param bool $immediate * @param null $ticket */ private function getBasicPublish( $msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null ) { self::message($msg, $this->queue_name); return $this->channel->basic_publish($msg, $exchange, $routing_key, $mandatory, $immediate, $ticket ); } /** * 訊息轉換物件 * @param $msg * @param string $queue */ private static function message(&$msg, string $queue) { if(is_array($msg)) { $msg['queue'] = $queue; $msg = serialize($msg); } if(self::$is_durable) { $msg = new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); } else { $msg = new AMQPMessage($msg); } } /** * 繫結佇列到交換機 * @param $queue * @param $exchange * @param string $routing_key * @param bool $nowait * @param null $arguments * @param null $ticket * @return mixed|null */ private function getQueueBind( $queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null ) { return $this->channel->queue_bind( $queue, $exchange, $routing_key, $nowait, $arguments, $ticket ); } /** * 獲取佇列內資訊數 * @deprecated * @return int */ public function getMessageNumber(): int { return (int)$this->getQueueDeclare( $this->queue_name, self::$is_durable)->method()->message_count(); } /** * 獲取消費者數量 * @deprecated * @return int */ public function getConsumerNumber(): int { return (int)$this->getQueueDeclare( $this->queue_name, self::$is_durable)->method()->consumer_count(); } }