1. 程式人生 > >快速搭建基於beanstalk的php訊息佇列服務

快速搭建基於beanstalk的php訊息佇列服務

本專案實現基於beanstalk的php訊息佇列服務,包括生產與消費訊息案例

<?php


/**
 * 訊息生產/接收類
 * @example
 *  // 生產單條訊息,goods管道組
 *  $mq = new MessageQueue();
 *  $mq->product('goods', 111111);
 *  // 生產多條訊息
 *  $mq->product('goods', array(111111, 111112));
 *
 *  // 訊息佇列監聽處理指令碼,goods管道組,solr管道
 *  <?php
 *      $mq = new MessageQueue('solr');
 *      $mq->watch('goods', function($message) {
 *          $goods_id = intval($message);
 *          // 以下為具體業務處理邏輯
 *          // 
 *          // ...
 *          // 返回true表示已處理完畢,伺服器將刪除該條訊息
 *          return true;
 *      });
 */

require_once dirname(__DIR__).'/BeanstalkClient.php';

class MessageQueue {

    // 訂閱者ID
    private $_clientID = null;

    // 訂閱者清單
    private $_subscribers = array();

    // beanstalkd連線配置資訊
    private $_beanstalkdConfig = array();

    /**
     * beanstalk client
     * @var BeanstalkClient
     */
    private $_beanstalk = null;

    /**
     * 初始化訊息客戶端
     * @param string $clientID 分配給訊息接受端的ID標識
     */
    public function __construct($clientID = null) {
        $this->_clientID = $clientID;
        $this->_setConfig();
    }

    /**
     * 生產訊息, 對管道內的所有事件推送訊息
     * @param string $queue 佇列名 -- 管道組
     * @param [string|array] $messages 訊息內容,多條使用陣列
     */
    public function product($queue, $messages) {
        try {
            if(!isset($this->_subscribers[$queue])) {
                throw new Exception('queue of "'.$queue.'" havn\'t configured, '
                    .'go '.__DIR__.'/../config/params.php and configure it');
            }
            $beanstalk = $this->getBeanstalkClient();
            if(!is_array($messages)) {
                $messages = array($messages);
            }
            foreach($this->_subscribers[$queue] as $clientID) {
                $beanstalk->useTube($queue.'.'.$clientID);
                foreach($messages as $message) {
                	if(strlen($message)){
                		$beanstalk->put(11, 0, 60, $message);
                		$this->_log('product', $queue, $clientID, $message);
                	}
                }
            }
        } catch (Exception $e) {
            throw new Exception($e->getMessage());
        }
        return true;
    }

    /**
     * 根據佇列名和事件名投遞訊息, 只對指定管道和事件推送訊息
     * @param  [string] $queue   [佇列名]
     * @param  [string] $event   [事件名]
     * @param  [string] $message [訊息內容]
     * @param  [int] $delay [延時時間]
     * @return [void]
     */
    public function product_conf($queue, $event, $message, $delay = 0) {
        $beanstalk = $this->getBeanstalkClient();
        $beanstalk->useTube("{$queue}.{$event}");
        $beanstalk->put(11, $delay, 60, $message);
    }

    /**
     * 監聽佇列並處理訊息
     * @param string $queue 訂閱的佇列名
     * @param function $callback 回撥方法(訊息處理函式,會將訊息內容作為引數給$callback)
     */
    public function watch($queue, $callback) {
        try {

            if(!$this->_checkQueueExist($queue, $this->_clientID)) {
            	$this->_log('checkQueue', $queue, $this->_clientID, '');
                throw new Exception($this->_clientID.' is not allow to access this queue');
            }
            if(!is_object($callback)) {
            	$this->_log('isObject', $queue, $this->_clientID,'');
                throw new Exception('param of callback is not a function');
            }
            $this->_beanstalkdConfig['persistent'] = false;
            $beanstalk = new BeanstalkClient($this->_beanstalkdConfig);
            $beanstalk->connect();
            $beanstalk->watch($queue.'.'.$this->_clientID);
            $retry = 0;
            for(;;) {
                $job = $beanstalk->reserve();
                if($job) {
                    $result = $callback($job['body']);
                    //處理任務
                    if(true === $result) {
                        $beanstalk->delete($job['id']);
                        $this->_log('consume', $queue, $this->_clientID, $job['body']);
                    }else{
                        $beanstalk->bury($job['id'],'');
                        $this->_log('bury', $queue, $this->_clientID, $job['body']);
                    }
                } else {
                    $this->_log('error', $queue, $this->_clientID, $job['body']);
                    // 設定 error_reporting(0) 時watcher指令碼會陷入死迴圈,這裡設定重連
                    if ($retry++ >= 10) {
                      $retry = 0;
                      $this->_log('error', $queue, $this->_clientID, 'try to reconnect.');
                        sleep(5); // 等待beanstalkd服務恢復
                        $beanstalk->connect();
                        $beanstalk->watch($queue.'.'.$this->_clientID);
                    }
                }
            }
            $beanstalk->disconnect();
        } catch (Exception $e) {
        	$this->_log('error', $queue, $this->_clientID,'');
            throw new Exception($e->getMessage());
        }
    }


    /**
     * 初始化配置資訊
     */
    private function _setConfig() {
        $config = require dirname(__DIR__).'/config/params.php';
        $this->_subscribers = $config['subscribers'];
        $this->_beanstalkdConfig = $config['beanstalkd'];
    }

    /**
     * 檢查當前客戶端監聽的佇列是否存在
     * @param string $queue 佇列名
     * @param string $clientID 客戶端ID
     * @return boolean
     */
    private function _checkQueueExist($queue, $clientID) {
        return isset($this->_subscribers[$queue]) && in_array($clientID, $this->_subscribers[$queue]);
    }

    /**
     * 獲取beanstalk client
     * @param  array  $config 連線配置
     * @return BeanstalkClient
     */
    private function getBeanstalkClient()
    {
        if (is_null($this->_beanstalk)) {
            $this->_beanstalk = new BeanstalkClient($this->_beanstalkdConfig);
            $this->_beanstalk->connect();
        }
        try {
            // 檢查連線
            $this->_beanstalk->stats();
        } catch (Exception $e) {
            // 若出錯則重連
            $this->_beanstalk->connect();
        }
        return $this->_beanstalk;
    }

    /**
     * 記錄日誌
     * @param string $operation 操作型別
     * @param string $queue 佇列名 -- 管道組
     * @param string $clientID 客戶端ID -- 管道
     * @param string $message 訊息體
     */
    public function _log($operation, $queue, $clientID, $message, $folder = 'mqlog') {
    	$dir = MQ_LOG_PATH;
        (file_exists($dir) && is_dir($dir)) || mkdir($dir, 0777, true);
        $file = $dir.'/'.$queue.'.'.$clientID.'.log';
        $mode = (is_file($file) && filesize($file)/1024/1024 < 20) ? "ab+" : "wb"; // 日誌大於20M則清空, 微分銷系統穩定之前先手動清log
        $fp = fopen($file , $mode);
        if(flock($fp , LOCK_EX)){
            fwrite($fp , '['.date('Y-m-d H:i:s').'] '.$operation.': '.$message.PHP_EOL);
            flock($fp , LOCK_UN);
            @chmod($file, 0777);
        }
        fclose($fp);
    }

    /**
     * destruct
     * disconnect
     */
    public function __destruct()
    {
        if (!is_null($this->_beanstalk)) {
            $this->_beanstalk->disconnect();
        }
    }
}



相關推薦

Java語言快速實現簡單MQ訊息佇列服務

目錄 MQ基礎回顧 主要角色 自定義協議 流程順序 專案構建流程 具體使用流程 程式碼演示 訊息處理中心 Broker 訊息處理中心服務 BrokerServer 客戶端 MqClient 測試MQ 小結

快速搭建基於beanstalk的php訊息佇列服務

本專案實現基於beanstalk的php訊息佇列服務,包括生產與消費訊息案例<?php /** * 訊息生產/接收類 * @example * // 生產單條訊息,goods管道組 * $mq = new MessageQueue(); * $mq-

滴滴出行基於RocketMQ構建企業級訊息佇列服務的實踐

本文整理自滴滴出行訊息佇列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。通過本文,您將瞭解到滴滴出行: 在訊息佇列技術選型方面的思考; 為什麼選擇 RocketMQ 作為出行業務的訊息佇列解決方案; 如何構建自己的訊息佇列服務; 在 RocketMQ

HTTPSQS:基於 HTTP協議的輕量級開源簡單訊息佇列服務

HTTPSQS(HTTP Simple Queue Service)是一款基於 HTTP GET/POST 協議的輕量級開源簡單訊息佇列服務,使用 Tokyo Cabinet 的 B+Tree Key/Value 資料庫來做資料的持久化儲存。   專案網址:http://code.google.co

(四)RabbitMQ訊息佇列-服務詳細配置與日常監控管理

RabbitMQ服務管理 啟動服務:rabbitmq-server -detached【 /usr/local/rabbitmq/sbin/rabbitmq-server -detached 】 檢視狀態:rabbitmqctl status 關閉服務:rabbitmqctl stop

在阿里雲伺服器上搭建基於nginx的直播服務

對於沒有接觸過nginx的我,在看了別人搭建的直播服務後心癢癢了,也就照著搭建了一個直播服務,我是在阿里雲伺服器上搭建的,首先來說一下阿里雲伺服器,我買的是一個ECS的雲伺服器,系統是CentOS7 然後用Xshell連線上我的伺服器,發現連線不上,原來是阿里雲伺服器的

騰訊雲+tipask快速搭建基於laravel的CMS網站

一、購買騰訊雲伺服器,服務市場->基礎環境->選擇WordPress平臺映象二、按照tipask教程安裝tipask官方教程地址https://wenda.tipask.com/article/22官方教程對新手不太友好,我整理如下:1.ftp上傳檔案雲伺服器映象

SpringBoot(9) 基於Redis訊息佇列實現非同步操作

什麼是訊息佇列?所謂訊息佇列,就是一個以佇列資料結構為基礎的一個真實存在的實體,如陣列,redis中的佇列集合等等,都可以。為什麼要使用佇列?主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MyS

高可用訊息佇列服務構建-RABBITMQ

Rabbitmq has his own buildin cluster management system. Here, we don’t need Pacemaker, everything is managed by RabbitMQ itself. RabbitMQ or more genera

Spring Boot(二):快速搭建web專案或微服務

上一篇部落格對spring boot的來世今生進行了介紹,這篇就帶領大家快速的建立一個spring boot的web專案或者微服務。 一、.新建專案 1.方法一 (1)自己建立java工程 (2)新建spring boot的application (3

Amazon SQS 訊息佇列服務_訊息佇列mq解決方案

Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲

Amazon SQS價格_SQS訊息佇列服務

如果資料傳輸量超出 500TB/月,請聯絡我們。 除非另行說明,否則我們的價格不包含適用的稅費和關稅(包括增值稅和適用銷售稅)。使用日本賬單地址的客戶若要使用 AWS,則需支付日本消費稅。瞭解更多。 資料傳入和傳出是指傳入和傳出 Amazon

NoSQL初探之人人都愛Redis:(3)使用Redis作為訊息佇列服務場景應用案例

一、訊息佇列場景簡介   “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。   在目前廣泛的Web應用中,都會出現一種場景:在某一個時刻,網站會迎來一個使用者請求的高峰期(

使用SpringBoot快速搭建WebSocket實現訊息推送

本文旨在幫助未掌握此技能的小白掃清障礙,快速搭建websocket訊息推送服務,高手請繞行。謝謝! 首先,筆者的寫作背景也是一名剛剛打通websocket訊息推送服務的小白。在連續幾日的蒐集資料下,最終在沒有找到一個完整的解決方案的情況下。摸索出正確的結果,倍

ubuntu搭建基於pptpd的vpn服務遇到619 800等問題

ubuntu搭建vpn伺服器的文章非常多,但是開啟ufw往往連結失敗。 解決方案如下: 我們需要王before ufw scripts新增iptables命令,新增命令有如下兩種方式: 1: iptables -I INPUT -p 47 -m stat

幾種常見的微服務架構方案簡述——ZeroC IceGrid、Spring Cloud、基於訊息佇列

2017-07-26 http://www.broadview.com.cn/article/348 微服務架構是當前很熱門的一個概念,它不是憑空產生的,是技術發展的必然結果。雖然微服務架構沒有公認的技術標準和規範草案,但業界已經有一些很有影響力的開源微服務架構平臺,架構師可以根據公司的技術實力並結合專案

基於Docker搭建分散式訊息佇列Kafka

本文基於Docker搭建一套單節點的Kafka訊息佇列,Kafka依賴Zookeeper為其管理叢集資訊,雖然本例不涉及叢集,但是該有的元件都還是會有,典型的kafka分散式架構如下圖所示。本例搭建的示例包含Zookeeper + Kafka + Kafka-manger mark &

快速搭建訊息佇列的企業框架

RocketMQ 是阿里團隊應用的強大的訊息中介軟體,而 Spring Boot 是目前企業 Java 流行的框架,如何快速應用二者為企業服務是 Java 工程師必修之課。 本場 Chat 首先會帶大家先熟悉 Spring Boot 專案以及搭建,然後整合 RocketMQ 訊息中介軟體實現單機部

linux實訓第二天總結--快速搭建Httpd服務&部署基於Httpd的網路Yum&搭建NFS共享&兩個終端之間”聊天室”

   DAY02 案例一-->部署網路yum源      1.0快速搭建Httpd服務      1.1部署基於Httpd的網路Yum   案例1.0-->

.NET Core微服務基於EasyNetQ使用RabbitMQ訊息佇列

一、訊息佇列與RabbitMQ 1.1 訊息佇列   “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。   訊息佇列(Message Queue),是分散式系統中重要