1. 程式人生 > >PHP使用QPM實現多程序並行任務處理程式

PHP使用QPM實現多程序並行任務處理程式

考慮用PHP實現以下場景: 有一個抓站的URL列表儲存在佇列裡,後臺程式讀取這個佇列,然後轉交給子程序去抓取HTML存放到檔案裡。 為了提高效率,允許多工並行執行,但為了避免機器負載過高,限制了最大的並行任務數(為了測試方便,我們把這個數設為3),當佇列中取到 END標記時,程式結束執行。

這個場景用QPM的Supervisor::taskFactoryMode()實現,非常簡單。

QPM全名是 Quick Process Management Module for PHP. PHP 是強大的web開發語言,以至於大家常常忘記PHP 可以用來開發健壯的命令列(CLI)程式以至於daemon程式。 而編寫daemon程式免不了與各種程序管理打交道。QPM正式為簡化程序管理而開發的類庫。QPM的專案地址是:

https://github.com/Comos/qpm

http://news.sina.com.cn/
http://news.ifeng.com/
http://news.163.com/
http://news.sohu.com/
http://ent.sina.com.cn/
http://ent.ifeng.com/
...
END

使用QPM的taskFactoryMode之前,我們需要準備一個TaskFactory類。 我們將其命名為 SpiderTaskFactory,SpdierTaskFactory 的工廠方法fetchTask 正常返回 Runnable的子類的例項。當碰到END或檔案結束,則throw StopSignal,這樣程式就會終止。

//如果沒有從引數指定輸入,把spider_task_factory_data.txt作為資料來源
$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';

$spiderTaskFactory = new SpiderTaskFactory($input);
$config = [
    //指定taskFactory物件和工廠方法
    'factoryMethod'=>[$spiderTaskFactory, 'fetchTask'],
    //指定最大併發數量為3
    'quantity'
=> 3, ]; //啟動Supervisor qpm\supervisor\Supervisor::taskFactoryMode($config)->start();

SpiderTaskFactory 的實現如下:

/**
 * 任務工廠,必須實現 fetchTask方法。
 * 該方法正常返回
 * 
*/
class SpiderTaskFactory {
private $_fh;
public function __construct($input) {
    $this->_input = $input;
    $this->_fh = fopen($input, 'r');
    if ($this->_fh === false) {
        throw new Exception('fopen failed:'.$input);
    }
}
public function fetchTask() {
    while (true) {
        if (feof($this->_fh)) {
            throw new qpm\supervisor\StopSignal();
        }
        $line = trim(fgets($this->_fh));
        if ($line == 'END') {
            throw new qpm\supervisor\StopSignal();
        }

        if (empty($line)) {
            continue;
        }

        break;
    }

    return new SpiderTask($line);
}
}

SpiderTask 的實現如下:

/**
 * 在子程序中執行任務的類
 * 必須實現 qpm\process\Runnable 介面
 */
class SpiderTask implements qpm\process\Runnable {
private $_target;

public function __construct($target) {
    $this->_target = $target;
}
//在子程序中執行的部分
public function run() {
    $r = @file_get_contents($this->_target);
    if ($r===false) {
        throw new Exception('fail to crawl url:'.$this->_target);
    }
    file_put_contents($this->getLocalFilename(), $r);   
}

private function getLocalFilename() {
    $filename = str_replace('/', '~', $this->_target);
    $filename = str_replace(':', '_', $filename);
    $filename = $filename.'-'.date('YmdHis');
    return __DIR__.'/_spider/'.$filename.'.html';
}
}

真實的生產環境,用佇列替換檔案輸入,即可實現持久執行的生產者/消費者模型的程式。