1. 程式人生 > >swoole(3)網路服務模型(單程序阻塞、預派生子程序、單程序阻塞複用模型)

swoole(3)網路服務模型(單程序阻塞、預派生子程序、單程序阻塞複用模型)

一:單程序阻塞

設計流程:

  1. 建立一個socket,繫結埠bind,監聽埠listen
  2. 進入while迴圈,阻塞在accept操作上,等待客戶端連線進入,進入睡眠狀態,直到有新的客戶發起connet到伺服器,accept函式返回客戶端的socket
  3. 利用fread讀取客戶端socket當中的資料,收到資料後伺服器程式程序處理,然後使用fwrite向客戶端傳送響應

程式碼:

<?php
 class Worker{
     //監聽socket
     protected $socket = NULL;
     //連線事件回撥
     public $onConnect = NULL;
     //接收訊息事件回撥
     public $onMessage = NULL;
     public function __construct($socket_address) {
        $this->socket=stream_socket_server($socket_address);
     }

     public function start() {
         while (true) {
             $clientSocket = stream_socket_accept($this->socket);
             if (!empty($clientSocket) && is_callable($this->onConnect)) {
                 //觸發連線事件的回掉
                 call_user_func($this->onConnect, $clientSocket);
             }
            //讀取內容
             $buffer = fread($clientSocket, 65535);
             if (!empty($buffer) && is_callable($this->onMessage)) {
                 call_user_func($this->onMessage, $clientSocket, $buffer);
             }
             fclose($clientSocket);
         }
     }
 }

$worker = new Worker('tcp://0.0.0.0:9810');

$worker->onConnect = function ($args) {
        echo "新的連線來了.{$args}.PHP_EOL";
};
$worker->onMessage = function ($conn, $message) {
        var_dump($conn, $message);
        $content="hello word qwe";
    $http_resonse = "HTTP/1.1 200 OK\r\n";
    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
    $http_resonse .= "Connection: keep-alive\r\n";
    $http_resonse .= "Server: php socket server\r\n";
    $http_resonse .= "Content-length: ".strlen($content)."\r\n\r\n";
    $http_resonse .= $content;
    fwrite($conn, $http_resonse);
};
$worker->start();

cli下執行:

瀏覽器:

 缺點:一次只能處理一個連線,不支援多個連線同時處理 

二:預派生子程序模式

設計流程:

  1. 建立一個socket,繫結伺服器埠(bind),監聽埠(listen)
  2. 通過pcntl_fork函式建立N個子程序
  3. 一個子程序建立成功後都去阻塞監聽新的客戶端連線
  4. 客戶端連線時,其中一個子程序被喚醒,處理客戶端請求
  5. 請求完成後,等待主程序回收子程序pcntl_wait

通過呼叫fork函式來建立子程序,會返回兩個pid(主程序id、子程序id)

顯示規則:

  1. 在父程序:fork函式返回子程序id
  2. 在子程序:fork函式返回0

程式碼:

<?php

class Worker {
    //監聽socket
    protected $socket = NULL;
    //連線事件回撥
    public $onConnect = NULL;
    //接收訊息事件回撥
    public $onMessage = NULL;
    public $workerNum = 10;

    public function __construct($socket_address) {
        $this->socket = stream_socket_server($socket_address);
    }

    //建立子程序
    public function fork() {
        for ($i = 0; $i < $this->workerNum; $i++) {
            $pid = pcntl_fork();
            if ($pid < 0) {
                exit('建立失敗');
            } else if ($pid > 0) {
                //父程序空間,返回子程序id
            } else {
                //子程序空間,返回父程序id 0
                $this->accept();
            }
        }
        $status = 0;
        $pid = pcntl_wait($status);
        echo "子程序" . $pid . PHP_EOL;
    }

    public function accept(){
        while (true) {
            $clientSocket = stream_socket_accept($this->socket);
            var_dump("正在執行任務的pid為:".posix_getpid());
            if (!empty($clientSocket) && is_callable($this->onConnect)) {
                call_user_func($this->onConnect, $clientSocket);
            }

            $buffer = fread($clientSocket, 65535);
            if (!empty($buffer) && is_callable($this->onMessage)) {
                call_user_func($this->onMessage, $clientSocket, $buffer);
            }
            fclose($clientSocket);
        }
    }

    public function start() {
        $this->fork();
    }
}


$worker = new Worker('tcp://0.0.0.0:9801');

$worker->onConnect = function ($args) {
    echo "新的連線來了.{$args}.PHP_EOL";
};
$worker->onMessage = function ($conn, $message) {
//    var_dump($conn, $message);
    $content = "hello word qwe";
    $http_resonse = "HTTP/1.1 200 OK\r\n";
    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
    $http_resonse .= "Connection: keep-alive\r\n";
    $http_resonse .= "Server: php socket server\r\n";
    $http_resonse .= "Content-length: " . strlen($content) . "\r\n\r\n";
    $http_resonse .= $content;
    fwrite($conn, $http_resonse);
};
$worker->start();

cli執行結果:

 

 缺點:嚴重依賴程序的數量解決併發問題,一個客戶端連線就需要佔用一個程序

三:單程序阻塞複用模型

 設計流程:

 

  1. 儲存所有的socket,通過select系統呼叫,監聽socket描述符的可讀事件
  2. socket在核心監控,一旦發現可讀,會從核心空間傳遞給使用者空間,通過邏輯判斷是服務端socket可讀,還是客戶端socket可讀
  3. 如果是服務端socket可讀,說明有新的客戶端建立,將socket保留到監聽陣列中
  4. 如果是客戶端socket可讀,說明當前已經可以去讀取客戶端傳送過來的內容了,讀取了內容,響應給客戶端

程式碼:

<?php

class Worker {
    //監聽socket
    protected $socket = NULL;
    //連線事件回撥
    public $onConnect = NULL;
    //接收訊息事件回撥
    public $onMessage = NULL;
    public $workerNum = 4 ;
    public $allSocket;

    public function __construct($socket_address) {
        $this->socket = stream_socket_server($socket_address);
        stream_set_blocking($this->socket,0);
        $this->allSocket[(int)$this->socket]=$this->socket;
    }

    public function fork() {
//        for ($i = 0; $i < $this->workerNum; $i++) {
//            $pid = pcntl_fork();
//            if ($pid < 0) {
//                exit('建立失敗');
//            } else if ($pid > 0) {
//
//            } else {
                $this->accept();
//            }
//        }
//        $status = 0;
//        $pid = pcntl_wait($status);
//        echo "子程序" . $pid . PHP_EOL;
    }

    public function accept(){
        while (true) {
            $write =$except =[];
            $read= $this->allSocket;
            stream_select($read,$write,$except,60);
            foreach($read as $index =>$val){
                if ($val == $this->socket){
                    $clientSocket = stream_socket_accept($this->socket);
                    var_dump(posix_getpid());
                    if (!empty($clientSocket) && is_callable($this->onConnect)) {
                        call_user_func($this->onConnect, $clientSocket);
                    }
                    $this->allSocket[(int)$clientSocket]=$clientSocket;
                }else{
                    $buffer = fread($val, 65535);
                    if (empty($buffer)){
                        if (feof($val) || is_resource($val)){
                            fclose($val);
                            unset($this->allSocket[(int)$val]);
                            continue;
                        }
                    }
                    if (!empty($buffer) && is_callable($this->onMessage)) {
                        call_user_func($this->onMessage, $val, $buffer);
                    }
                }
            }

        }
    }

    public function start() {
        $this->fork();
    }
}


$worker = new Worker('tcp://0.0.0.0:9800');

$worker->onConnect = function ($args) {
    echo "新的連線來了.{$args}.PHP_EOL";
};
$worker->onMessage = function ($conn, $message) {
//    var_dump($conn, $message);
    $content = "hello word qwe";
    $http_resonse = "HTTP/1.1 200 OK\r\n";
    $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
    $http_resonse .= "Connection: keep-alive\r\n";
    $http_resonse .= "Server: php socket server\r\n";
    $http_resonse .= "Content-length: " . strlen($content) . "\r\n\r\n";
    $http_resonse .= $content;
    fwrite($conn, $http_resonse);
};
$worker->start();

缺點:select模式本身缺點(迴圈遍歷處理事件、核心空間傳遞資料的消耗)、單執行緒對於大量任務處理乏力&n