1. 程式人生 > >轉載:PHP 協程實現

轉載:PHP 協程實現

新的 做出 操作系統 i++ his golang 空間 復雜 conn

轉自:https://newt0n.github.io/2017/02/10/PHP-%E5%8D%8F%E7%A8%8B%E5%8E%9F%E7%90%86/

實現 PHP 協程需要了解的基本內容。

多進程/線程

最早的服務器端程序都是通過多進程、多線程來解決並發IO的問題。進程模型出現的最早,從Unix 系統誕生就開始有了進程的概念。最早的服務器端程序一般都是 Accept 一個客戶端連接就創建一個進程,然後子進程進入循環同步阻塞地與客戶端連接進行交互,收發處理數據。

多線程模式出現要晚一些,線程與進程相比更輕量,而且線程之間共享內存堆棧,所以不同的線程之間交互非常容易實現。比如實現一個聊天室,客戶端連接之間可以交互,聊天室中的玩家可以任意的其他人發消息。用多線程模式實現非常簡單,線程中可以直接向某一個客戶端連接發送數據。而多進程模式就要用到管道、消息隊列、共享內存等等統稱進程間通信(IPC)復雜的技術才能實現。

最簡單的多進程服務端模型

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) or die("Create server failed"); while(1) { $conn = stream_socket_accept($serv); if (pcntl_fork() == 0) { $request = fread($conn); // do something // $response = "hello world"; fwrite($response); fclose($conn); exit(0); } }

多進程/線程模型的流程是:

創建一個 socket,綁定服務器端口(bind),監聽端口(listen),在 PHP 中用 stream_socket_server 一個函數就能完成上面 3 個步驟,當然也可以使用更底層的sockets 擴展分別實現。

進入 while 循環,阻塞在 accept 操作上,等待客戶端連接進入。此時程序會進入睡眠狀態,直到有新的客戶端發起 connect 到服務器,操作系統會喚醒此進程。accept 函數返回客戶端連接的 socket 主進程在多進程模型下通過 fork(php: pcntl_fork)創建子進程,多線程模型下使用 pthread_create

(php: new Thread)創建子線程。

下文如無特殊聲明將使用進程同時表示進程/線程。

子進程創建成功後進入 while 循環,阻塞在 recv(php:fread)調用上,等待客戶端向服務器發送數據。收到數據後服務器程序進行處理然後使用 send(php: fwrite)向客戶端發送響應。長連接的服務會持續與客戶端交互,而短連接服務一般收到響應就會 close

當客戶端連接關閉時,子進程退出並銷毀所有資源,主進程會回收掉此子進程。

技術分享圖片

這種模式最大的問題是,進程創建和銷毀的開銷很大。所以上面的模式沒辦法應用於非常繁忙的服務器程序。對應的改進版解決了此問題,這就是經典的 Leader-Follower 模型。

$serv = stream_socket_server("tcp://0.0.0.0:8000", $errno, $errstr) or die("Create server failed"); for($i = 0; $i < 32; $i++) { if (pcntl_fork() == 0) { while(1) { $conn = stream_socket_accept($serv); if ($conn == false) continue; // do something $request = fread($conn); // $response = "hello world"; fwrite($response); fclose($conn); } exit(0); } }

它的特點是程序啟動後就會創建 N 個進程。每個子進程進入 Accept,等待新的連接進入。當客戶端連接到服務器時,其中一個子進程會被喚醒,開始處理客戶端請求,並且不再接受新的 TCP 連接。當此連接關閉時,子進程會釋放,重新進入 Accept,參與處理新的連接。

這個模型的優勢是完全可以復用進程,沒有額外消耗,性能非常好。很多常見的服務器程序都是基於此模型的,比如 Apache、PHP-FPM。

多進程模型也有一些缺點。

這種模型嚴重依賴進程的數量解決並發問題,一個客戶端連接就需要占用一個進程,工作進程的數量有多少,並發處理能力就有多少。操作系統可以創建的進程數量是有限的。

啟動大量進程會帶來額外的進程調度消耗。數百個進程時可能進程上下文切換調度消耗占 CPU 不到 1% 可以忽略不計,如果啟動數千甚至數萬個進程,消耗就會直線上升。調度消耗可能占到 CPU 的百分之幾十甚至 100%。

並行和並發

談到多進程以及類似同時執行多個任務的模型,就不得不先談談並行和並發。

並發(Concurrency)

是指能處理多個同時活動的能力,並發事件之間不一定要同一時刻發生。

並行(Parallesim)

是指同時刻發生的兩個並發事件,具有並發的含義,但並發不一定並行。

區別

  • 『並發』指的是程序的結構,『並行』指的是程序運行時的狀態
  • 『並行』一定是並發的,『並行』是『並發』設計的一種
  • 單線程永遠無法達到『並行』狀態

正確的並發設計的標準是:

使多個操作可以在重疊的時間段內進行。
two tasks can start, run, and complete in overlapping time periods

參考:

  • http://www.vaikan.com/docs/Concurrency-is-not-Parallelism
  • https://talks.golang.org/2012/waza.slide

叠代器 & 生成器

在了解 PHP 協程前,還有 叠代器 和 生成器 這兩個概念需要先認識一下。

叠代器

PHP5 開始內置了 Iterator 即叠代器接口,所以如果你定義了一個類,並實現了Iterator 接口,那麽你的這個類對象就是 ZEND_ITER_OBJECT 即可叠代的,否則就是 ZEND_ITER_PLAIN_OBJECT

對於 ZEND_ITER_PLAIN_OBJECT 的類,foreach 會獲取該對象的默認屬性數組,然後對該數組進行叠代。

而對於 ZEND_ITER_OBJECT 的類對象,則會通過調用對象實現的 Iterator 接口相關函數來進行叠代。

任何實現了 Iterator 接口的類都是可叠代的,即都可以用 foreach 語句來遍歷。

Iterator 接口

interface Iterator extends Traversable { // 獲取當前內部標量指向的元素的數據 public mixed current() // 獲取當前標量 public scalar key() // 移動到下一個標量 public void next() // 重置標量 public void rewind() // 檢查當前標量是否有效 public boolean valid() }

常規實現 range 函數

PHP 自帶的 range 函數原型:

range — 根據範圍創建數組,包含指定的元素

array range (mixed $start , mixed $end [, number $step = 1 ])

建立一個包含指定範圍單元的數組。

在不使用叠代器的情況要實現一個和 PHP 自帶的 range 函數類似的功能,可能會這麽寫:

function range ($start, $end, $step = 1) { $ret = []; for ($i = $start; $i <= $end; $i += $step) { $ret[] = $i; } return $ret; }

需要將生成的所有元素放在內存數組中,如果需要生成一個非常大的集合,則會占用巨大的內存。

叠代器實現 xrange 函數

來看看叠代實現的 range,我們叫做 xrange,他實現了 Iterator 接口必須的 5 個方法:

class Xrange implements Iterator { protected $start; protected $limit; protected $step; protected $current; public function __construct($start, $limit, $step = 1) { $this->start = $start; $this->limit = $limit; $this->step = $step; } public function rewind() { $this->current = $this->start; } public function next() { $this->current += $this->step; } public function current() { return $this->current; } public function key() { return $this->current + 1; } public function valid() { return $this->current <= $this->limit; } }

使用時代碼如下:

foreach (new Xrange(0, 9) as $key => $val) { echo $key, ‘ ‘, $val, "\n"; }

輸出:

0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

看上去功能和 range() 函數所做的一致,不同點在於叠代的是一個 對象(Object) 而不是數組:

var_dump(new Xrange(0, 9));

輸出:

object(Xrange)#1 (4) { ["start":protected]=> int(0) ["limit":protected]=> int(9) ["step":protected]=> int(1) ["current":protected]=> NULL }

另外,內存的占用情況也完全不同:

// range $startMemory = memory_get_usage(); $arr = range(0, 500000); echo ‘range(): ‘, memory_get_usage() - $startMemory, " bytes\n"; unset($arr); // xrange $startMemory = memory_get_usage(); $arr = new Xrange(0, 500000); echo ‘xrange(): ‘, memory_get_usage() - $startMemory, " bytes\n";

輸出:

xrange(): 624 bytes range(): 72194784 bytes

range() 函數在執行後占用了 50W 個元素內存空間,而 xrange 對象在整個叠代過程中只占用一個對象的內存。

Yii2 Query

在喜聞樂見的各種 PHP 框架裏有不少生成器的實例,比如 Yii2 中用來構建 SQL 語句的 \yii\db\Query類:

$query = (new \yii\db\Query)->from(‘user‘); // yii\db\BatchQueryResult foreach ($query->batch() as $users) { // 每次循環得到多條 user 記錄 }

來看一下 batch() 做了什麽:

/** * Starts a batch query. * * A batch query supports fetching data in batches, which can keep the memory usage under a limit. * This method will return a [[BatchQueryResult]] object which implements the [[\Iterator]] interface * and can be traversed to retrieve the data in batches. * * For example, * * * $query = (new Query)->from(‘user‘); * foreach ($query->batch() as $rows) { * // $rows is an array of 10 or fewer rows from user table * } * * * @param integer $batchSize the number of records to be fetched in each batch. * @param Connection $db the database connection. If not set, the "db" application component will be used. * @return BatchQueryResult the batch query result. It implements the [[\Iterator]] interface * and can be traversed to retrieve the data in batches. */ public function batch($batchSize = 100, $db = null) { return Yii::createObject([ ‘class‘ => BatchQueryResult::className(), ‘query‘ => $this, ‘batchSize‘ => $batchSize, ‘db‘ => $db, ‘each‘ => false, ]); }

實際上返回了一個 BatchQueryResult 類,類的源碼實現了 Iterator 接口 5 個關鍵方法:

class BatchQueryResult extends Object implements \Iterator { public $db; public $query; public $batchSize = 100; public $each = false; private $_dataReader; private $_batch; private $_value; private $_key; /** * Destructor. */ public function __destruct() { // make sure cursor is closed $this->reset(); } /** * Resets the batch query. * This method will clean up the existing batch query so that a new batch query can be performed. */ public function reset() { if ($this->_dataReader !== null) { $this->_dataReader->close(); } $this->_dataReader = null; $this->_batch = null; $this->_value = null; $this->_key = null; } /** * Resets the iterator to the initial state. * This method is required by the interface [[\Iterator]]. */ public function rewind() { $this->reset(); $this->next(); } /** * Moves the internal pointer to the next dataset. * This method is required by the interface [[\Iterator]]. */ public function next() { if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) { $this->_batch = $this->fetchData(); reset($this->_batch); } if ($this->each) { $this->_value = current($this->_batch); if ($this->query->indexBy !== null) { $this->_key = key($this->_batch); } elseif (key($this->_batch) !== null) { $this->_key++; } else { $this->_key = null; } } else { $this->_value = $this->_batch; $this->_key = $this->_key === null ? 0 : $this->_key + 1; } } /** * Fetches the next batch of data. * @return array the data fetched */ protected function fetchData() { // ... } /** * Returns the index of the current dataset. * This method is required by the interface [[\Iterator]]. * @return integer the index of the current row. */ public function key() { return $this->_key; } /** * Returns the current dataset. * This method is required by the interface [[\Iterator]]. * @return mixed the current dataset. */ public function current() { return $this->_value; } /** * Returns whether there is a valid dataset at the current position. * This method is required by the interface [[\Iterator]]. * @return boolean whether there is a valid dataset at the current position. */ public function valid() { return !empty($this->_batch); } }

以叠代器的方式實現了類似分頁取的效果,同時避免了一次性取出所有數據占用太多的內存空間。

叠代器使用場景

  • 使用返回叠代器的包或庫時(如 PHP5 中的 SPL 叠代器)
  • 無法在一次調用獲取所需的所有元素時
  • 要處理數量巨大的元素時(數據庫中要處理的結果集內容超過內存)

生成器

需要 PHP 5 >= 5.5.0 或 PHP 7

雖然叠代器僅需繼承接口即可實現,但畢竟需要定義一整個類然後實現接口的所有方法,實在是不怎麽方便。

生成器則提供了一種更簡單的方式來實現簡單的對象叠代,相比定義類來實現 Iterator 接口的方式,性能開銷和復雜度大大降低。

PHP Manual

生成器允許在 foreach 代碼塊中叠代一組數據而不需要創建任何數組。一個生成器函數,就像一個普通的有返回值的自定義函數類似,但普通函數只返回一次, 而生成器可以根據需要通過 yield 關鍵字返回多次,以便連續生成需要叠代返回的值。

一個最簡單的例子就是使用生成器來重新實現 xrange() 函數。效果和上面我們用叠代器實現的差不多,但實現起來要簡單的多。

生成器實現 xrange 函數

function xrange($start, $limit, $step = 1) { for ($i = 0; $i < $limit; $i += $step) { yield $i + 1 => $i; } } foreach (xrange(0, 9) as $key => $val) { printf("%d %d \n", $key, $val); } // 輸出 // 1 0 // 2 1 // 3 2 // 4 3 // 5 4 // 6 5 // 7 6 // 8 7 // 9 8

實際上生成器生成的正是一個叠代器對象實例,該叠代器對象繼承了 Iterator 接口,同時也包含了生成器對象自有的接口,具體可以參考 Generator 類的定義以及語法參考。

同時需要註意的是:

一個生成器不可以返回值,這樣做會產生一個編譯錯誤。然而 return 空是一個有效的語法並且它將會終止生成器繼續執行。

yield 關鍵字

需要註意的是 yield 關鍵字,這是生成器的關鍵。通過上面的例子可以看出,yield 會將當前產生的值傳遞給 foreach,換句話說,foreach 每一次叠代過程都會從 yield 處取一個值,直到整個遍歷過程不再能執行到 yield 時遍歷結束,此時生成器函數簡單的退出,而調用生成器的上層代碼還可以繼續執行,就像一個數組已經被遍歷完了。

yield 最簡單的調用形式看起來像一個 return 申明,不同的是 yield 暫停當前過程的執行並返回值,而 return 是中斷當前過程並返回值。暫停當前過程,意味著將處理權轉交由上一級繼續進行,直到上一級再次調用被暫停的過程,該過程又會從上一次暫停的位置繼續執行。這像是什麽呢?如果之前已經在鳥哥的文章中粗略看過,應該知道這很像操作系統的進程調度,多個進程在一個 CPU 核心上執行,在系統調度下每一個進程執行一段指令就被暫停,切換到下一個進程,這樣外部用戶看起來就像是同時在執行多個任務。

但僅僅如此還不夠,yield 除了可以返回值以外,還能接收值,也就是可以在兩個層級間實現雙向通信。

來看看如何傳遞一個值給 yield

function printer() { while (true) { printf("receive: %s\n", yield); } } $printer = printer(); $printer->send(‘hello‘); $printer->send(‘world‘); // 輸出 receive: hello receive: world

根據 PHP 官方文檔的描述可以知道 Generator 對象除了實現 Iterator 接口中的必要方法以外,還有一個 send 方法,這個方法就是向 yield 語句處傳遞一個值,同時從 yield 語句處繼續執行,直至再次遇到 yield 後控制權回到外部。

既然 yield 可以在其位置中斷並返回或者接收一個值,那能不能同時進行接收和返回呢?當然,這也是實現協程的根本。對上述代碼做出修改:

function printer() { $i = 0; while (true) { printf("receive: %s\n", (yield ++$i)); } } $printer = printer(); printf("%d\n", $printer->current()); $printer->send(‘hello‘); printf("%d\n", $printer->current()); $printer->send(‘world‘); printf("%d\n", $printer->current()); // 輸出 1 receive: hello 2 receive: world 3

這是另一個例子:

function gen() { $ret = (yield ‘yield1‘); var_dump($ret); $ret = (yield ‘yield2‘); var_dump($ret); } $gen = gen(); var_dump($gen->current()); // string(6) "yield1" var_dump($gen->send(‘ret1‘)); // string(4) "ret1" (第一個 var_dump) // string(6) "yield2" (繼續執行到第二個 yield,吐出了返回值) var_dump($gen->send(‘ret2‘)); // string(4) "ret2" (第二個 var_dump) // NULL (var_dump 之後沒有其他語句,所以這次 ->send() 的返回值為 null)

current 方法是叠代器 Iterator 接口必要的方法,foreach 語句每一次叠代都會通過其獲取當前值,而後調用叠代器的 next 方法。在上述例子裏則是手動調用了 current 方法獲取值。

上述例子已經足以表示 yield 能夠作為實現雙向通信的工具,也就是具備了後續實現協程的基本條件。

上面的例子如果第一次接觸並稍加思考,不免會疑惑為什麽一個 yield 既是語句又是表達式,而且這兩種情況還同時存在:

  • 對於所有在生成器函數中出現的 yield,首先它都是語句,而跟在 yield 後面的任何表達式的值將作為調用生成器函數的返回值,如果 yield 後面沒有任何表達式(變量、常量都是表達式),那麽它會返回 NULL,這一點和 return 語句一致。
  • yield 也是表達式,它的值就是 send 函數傳過來的值(相當於一個特殊變量,只不過賦值是通過 send 函數進行的)。只要調用send方法,並且生成器對象的叠代並未終結,那麽當前位置的 yield 就會得到 send 方法傳遞過來的值,這和生成器函數有沒有把這個值賦值給某個變量沒有任何關系。

這個地方可能需要仔細品味上面兩個 send() 方法的例子才能理解。但可以簡單的記住:

任何時候 yield 關鍵詞即是語句:可以為生成器函數返回值;也是表達式:可以接收生成器對象發過來的值。

除了 send() 方法,還有一種控制生成器執行的方法是 next() 函數:

  • Next(),恢復生成器函數的執行直到下一個 yield
  • Send(),向生成器傳入一個值,恢復執行直到下一個 yield

協程

對於單核處理器,多進程實現多任務的原理是讓操作系統給一個任務每次分配一定的 CPU 時間片,然後中斷、讓下一個任務執行一定的時間片接著再中斷並繼續執行下一個,如此反復。由於切換執行任務的速度非常快,給外部用戶的感受就是多個任務的執行是同時進行的。

多進程的調度是由操作系統來實現的,進程自身不能控制自己何時被調度,也就是說:

進程的調度是由外層調度器搶占式實現的

而協程要求當前正在運行的任務自動把控制權回傳給調度器,這樣就可以繼續運行其他任務。這與『搶占式』的多任務正好相反, 搶占多任務的調度器可以強制中斷正在運行的任務, 不管它自己有沒有意願。『協作式多任務』在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不過它們後來都切換到『搶占式多任務』了。理由相當明確:如果僅依靠程序自動交出控制的話,那麽一些惡意程序將會很容易占用全部 CPU 時間而不與其他任務共享。

協程的調度是由協程自身主動讓出控制權到外層調度器實現的

回到剛才生成器實現 xrange 函數的例子,整個執行過程的交替可以用下圖來表示:

技術分享圖片

協程可以理解為純用戶態的線程,通過協作而不是搶占來進行任務切換。相對於進程或者線程,協程所有的操作都可以在用戶態而非操作系統內核態完成,創建和切換的消耗非常低。

簡單的說 Coroutine(協程) 就是提供一種方法來中斷當前任務的執行,保存當前的局部變量,下次再過來又可以恢復當前局部變量繼續執行。

我們可以把大任務拆分成多個小任務輪流執行,如果有某個小任務在等待系統 IO,就跳過它,執行下一個小任務,這樣往復調度,實現了 IO 操作和 CPU 計算的並行執行,總體上就提升了任務的執行效率,這也便是協程的意義。

PHP 協程和 yield

PHP 從 5.5 開始支持生成器及 yield 關鍵字,而 PHP 協程則由 yield 來實現。

要理解協程,首先要理解:代碼是代碼,函數是函數。函數包裹的代碼賦予了這段代碼附加的意義:不管是否顯式的指明返回值,當函數內的代碼塊執行完後都會返回到調用層。而當調用層調用某個函數的時候,必須等這個函數返回,當前函數才能繼續執行,這就構成了後進先出,也就是 Stack

而協程包裹的代碼,不是函數,不完全遵守函數的附加意義,協程執行到某個點,協會協程會 yield返回一個值然後掛起,而不是 return 一個值然後結束,當再次調用協程的時候,會在上次 yield 的點繼續執行。

所以協程違背了通常操作系統和 x86 的 CPU 認定的代碼執行方式,也就是 Stack 的這種執行方式,需要運行環境(比如 php,python 的 yield 和 golang 的 goroutine)自己調度,來實現任務的中斷和恢復,具體到 PHP,就是靠 yield 來實現。

堆棧式調用 和 協程調用的對比:

技術分享圖片

結合之前的例子,可以總結一下 yield 能做的就是:

  • 實現不同任務間的主動讓位、讓行,把控制權交回給任務調度器。
  • 通過 send() 實現不同任務間的雙向通信,也就可以實現任務和調度器之間的通信。

yield 就是 PHP 實現協程的方式。

協程多任務調度

下面是雄文 Cooperative multitasking using coroutines (in PHP!) 裏一個簡單但完整的例子,來展示如何具體的在 PHP 裏實現協程任務的調度。

首先是一個任務類:

Task

class Task { // 任務 ID protected $taskId; // 協程對象 protected $coroutine; // send() 值 protected $sendVal = null; // 是否首次 yield protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendVal) { $this->sendVal = $sendVal; } public function run() { // 如之前提到的在send之前, 當叠代器被創建後第一次 yield 之前,一個 renwind() 方法會被隱式調用 // 所以實際上發生的應該類似: // $this->coroutine->rewind(); // $this->coroutine->send(); // 這樣 renwind 的執行將會導致第一個 yield 被執行, 並且忽略了他的返回值. // 真正當我們調用 yield 的時候, 我們得到的是第二個yield的值,導致第一個yield的值被忽略。 // 所以這個加上一個是否第一次 yield 的判斷來避免這個問題 if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendVal); $this->sendVal = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } }

接下來是調度器,比 foreach 是要復雜一點,但好歹也能算個正兒八經的 Scheduler :)

Scheduler

class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } // (使用下一個空閑的任務id)創建一個新任務,然後把這個任務放入任務map數組裏. 接著它通過把任務放入任務隊列裏來實現對任務的調度. 接著run()方法掃描任務隊列, 運行任務.如果一個任務結束了, 那麽它將從隊列裏刪除, 否則它將在隊列的末尾再次被調度。 public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { // 任務入隊 $this->queue->enqueue($task); } public function run() { while (!$this->queue->isEmpty()) { // 任務出隊 $task = $this->queue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } }

隊列可以使每個任務獲得同等的 CPU 使用時間,

Demo

function task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.\n"; yield; } } function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run();

輸出:

This is task 1 iteration 1. This is task 2 iteration 1. This is task 1 iteration 2. This is task 2 iteration 2. This is task 1 iteration 3. This is task 2 iteration 3. This is task 1 iteration 4. This is task 2 iteration 4. This is task 1 iteration 5. This is task 2 iteration 5. This is task 1 iteration 6. This is task 1 iteration 7. This is task 1 iteration 8. This is task 1 iteration 9. This is task 1 iteration 10.

結果正是我們期待的,最初的 5 次叠代,兩個任務是交替進行的,而在第二個任務結束後,只有第一個任務繼續執行到結束。

協程非阻塞 IO

若想真正的發揮出協程的作用,那一定是在一些涉及到阻塞 IO 的場景,我們都知道 Web 服務器最耗時的部分通常都是 socket 讀取數據等操作上,如果進程對每個請求都掛起的等待 IO 操作,那處理效率就太低了,接下來我們看個支持非阻塞 IO 的 Scheduler:

<?php class Scheduler { protected $maxTaskId = 0; protected $tasks = []; // taskId => task protected $queue; // resourceID => [socket, tasks] protected $waitingForRead = []; protected $waitingForWrite = []; public function __construct() { // SPL 隊列 $this->queue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->tasks[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { // 任務入隊 $this->queue->enqueue($task); } public function run() { while (!$this->queue->isEmpty()) { // 任務出隊 $task = $this->queue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->tasks[$task->getTaskId()]); } else { $this->schedule($task); } } } public function waitForRead($socket, Task $task) { if (isset($this->waitingForRead[(int)$socket])) { $this->waitingForRead[(int)$socket][1][] = $task; } else { $this->waitingForRead[(int)$socket] = [$socket, [$task]]; } } public function waitForWrite($socket, Task $task) { if (isset($this->waitingForWrite[(int)$socket])) { $this->waitingForWrite[(int)$socket][1][] = $task; } else { $this->waitingForWrite[(int)$socket] = [$socket, [$task]]; } } /** * @param $timeout 0 represent */ protected function ioPoll($timeout) { $rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // $timeout 為 0 時, stream_select 為立即返回,為 null 時則會阻塞的等,見 http://php.net/manual/zh/function.stream-select.php if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int)$socket]; unset($this->waitingForRead[(int)$socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int)$socket]; unset($this->waitingForWrite[(int)$socket]); foreach ($tasks as $task) { $this->schedule($task); } } } /** * 檢查隊列是否為空,若為空則掛起的執行 stream_select,否則檢查完 IO 狀態立即返回,詳見 ioPoll() * 作為任務加入隊列後,由於 while true,會被一直重復的加入任務隊列,實現每次任務前檢查 IO 狀態 * @return Generator object for newTask * */ protected function ioPollTask() { while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; } } /** * $scheduler = new Scheduler; * $scheduler->newTask(Web Server Generator); * $scheduler->withIoPoll()->run(); * * 新建 Web Server 任務後先執行 withIoPoll() 將 ioPollTask() 作為任務入隊 * * @return $this */ public function withIoPoll() { $this->newTask($this->ioPollTask()); return $this; } }

這個版本的 Scheduler 裏加入一個永不退出的任務,並且通過 stream_select 支持的特性來實現快速的來回檢查各個任務的 IO 狀態,只有 IO 完成的任務才會繼續執行,而 IO 還未完成的任務則會跳過,完整的代碼和例子可以戳這裏。

也就是說任務交替執行的過程中,一旦遇到需要 IO 的部分,調度器就會把 CPU 時間分配給不需要 IO 的任務,等到當前任務遇到 IO 或者之前的任務 IO 結束才再次調度 CPU 時間,以此實現 CPU 和 IO 並行來提升執行效率,類似下圖:

技術分享圖片

單任務改造

如果想將一個單進程任務改造成並發執行,我們可以選擇改造成多進程或者協程:

  • 多進程,不改變任務執行的整體過程,在一個時間段內同時執行多個相同的代碼段,調度權在 CPU,如果一個任務能獨占一個 CPU 則可以實現並行。
  • 協程,把原有任務拆分成多個小任務,原有任務的執行流程被改變,調度權在進程自己,如果有 IO 並且可以實現異步,則可以實現並行。

多進程改造

技術分享圖片

協程改造

技術分享圖片

協程(Coroutines)和 Go 協程(Goroutines)

PHP 的協程或者其他語言中,比如 Python、Lua 等都有協程的概念,和 Go 協程有些相似,不過有兩點不同:

  • Go 協程意味著並行(或者可以以並行的方式部署,可以用 runtime.GOMAXPROCS() 指定可同時使用的 CPU 個數),協程一般來說只是並發。
  • Go 協程通過通道 channel 來通信;協程通過 yield 讓出和恢復操作來通信。

Go 協程比普通協程更強大,也很容易從協程的邏輯復用到 Go 協程,而且在 Go 的開發中也使用的極為普遍,有興趣的話可以了解一下作為對比。

結束

個人感覺 PHP 的協程在實際使用中想要徒手實現和應用並不方便而且場景有限,但了解其概念及實現原理對更好的理解並發不無裨益。

如果想更多的了解協程的實際應用場景不妨試試已經大名鼎鼎的 Swoole,其對多種協議的 client 做了底層的協程封裝,幾乎可以做到以同步編程的寫法實現協程異步 IO 的效果。

參考

    • Cooperative multitasking using coroutines (in PHP!)
    • 在PHP中使用協程實現多任務調度
    • PHP 並發 IO 編程之路

轉載:PHP 協程實現