1. 程式人生 > >PHP-Websockets 上傳檔案2 優化支援php socket客戶端和websocket連線websocket伺服器 以守護程序方式執行編碼

PHP-Websockets 上傳檔案2 優化支援php socket客戶端和websocket連線websocket伺服器 以守護程序方式執行編碼

WebsocketServer:

users.php

<?php
class WebSocketUser {
    public $socket;
    public $id;
    public $headers = array();
    public $handshake = false;
    public $handlingPartialPacket = false;
    public $partialBuffer = "";
    public $sendingContinuous = false;
    public $partialMessage 
= ""; public $hasSentClose = false; public $clientFileName ; public $serverFileName ; public $fileHandler ; public $fileSize ; public $recLength = 0 ; function __construct($id, $socket) { $this->id = $id; $this->socket = $socket; } }

daemon.class.php
<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2017/4/19
 * Time: 10:34
 */
class daemon {
    public function init(){
        //建立一個子程序
        $pid = pcntl_fork();
        if ($pid == -1){
            throw new Exception('fork子程序失敗');
        }elseif ($pid > 0){
            //父程序退出,子程序變成孤兒程序被1號程序收養,程序脫離終端
exit(0) ; } //建立一個新的會話,脫離終端控制,更改子程序為組長程序 $sid = posix_setsid(); if ($sid == -1) { throw new Exception('setsid fail'); } /** * 通過上一步,我們建立了一個新的會話組長,程序組長,且脫離了終端,但是會話組長可以申請重新開啟一個終端,為了避免 * 這種情況,我們再次建立一個子程序,並退出當前程序,這樣執行的程序就不再是會話組長。 */ $pid = pcntl_fork(); if ($pid == -1) { throw new Exception('fork子程序失敗'); } elseif ($pid > 0) { //再一次退出父程序,子程序的子程序(孫子程序)成為最終的守護程序 exit(0); } /*由於守護程序用不到標準輸入輸出,關閉標準輸入,輸出,錯誤輸出描述符 **注意:由於這裡已經脫離了終端,所以下面關閉了與終端相關的輸入,輸出以及錯誤輸出描述符, * 所以在後面的程式中凡是初始化該守護程序之後的,想以守護程序的方式執行的php檔案中出現echo等和終端互動的輸入輸出, * 則想以守護程序的方式執行的php檔案並不會再後臺執行。切記:後面的程式碼中一定不能出現echo等。 * */ global $STDERR, $STDOUT ; fclose(STDIN); fclose(STDOUT); fclose(STDERR); /*所以為了避免除顯示輸出的echo導致php錯誤的問題,我們一般建議這樣 * 加上下面那句,所有的顯示的不顯示的echo err之類都可以被忽略。也就是說你把 echo "kdsld";這句加上也沒有問題指到dev/null, *把/dev/null看作"黑洞". 它非常等價於一個只寫檔案. 所有寫入它的內容都會永遠丟失. 而嘗試從它那兒讀取內容則什麼也讀不到. 然而, /dev/null對命令列和指令碼都非常的有用. */ $STDOUT = fopen('/dev/null', "rw+"); $STDERR = fopen('/dev/null', "rw+"); //修改當前程序的工作目錄,由於子程序會繼承父程序的工作目錄,修改工作目錄以釋放對父程序工作目錄的佔用。 chdir('/'); umask(0); //清除檔案掩碼 } }

websockets.php
<?php
require_once('./users.php');
abstract class WebSocketServer {
  protected $userClass = 'WebSocketUser'; // redefine this if you want a custom user class.  The custom user class should inherit from WebSocketUser.
  protected $maxBufferSize;        
  protected $master;
  protected $sockets                              = array();
  protected $users                                = array();
  protected $heldMessages                         = array();
  protected $interactive                          = true;
  protected $headerOriginRequired                 = false;
  protected $headerSecWebSocketProtocolRequired   = false;
  protected $headerSecWebSocketExtensionsRequired = false;
  function __construct($addr, $port, $bufferLength = 1024) {
    $this->maxBufferSize = $bufferLength * 1024 + 8;
    $this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)  or die("Failed: socket_create()");
    socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("Failed: socket_option()");
    socket_bind($this->master, $addr, $port)                      or die("Failed: socket_bind()");
    socket_listen($this->master,20)                               or die("Failed: socket_listen()");
    $this->sockets['m'] = $this->master;
    $this->stdout("Server started\nListening on: $addr:$port\nMaster socket: ".$this->master);

    
  }
  abstract protected function process($user,$message); // Called immediately when the data is recieved. 
  abstract protected function connected($user);        // Called after the handshake response is sent to the client.
  abstract protected function closed($user);           // Called after the connection is closed.
  protected function connecting($user) {
    // Override to handle a connecting user, after the instance of the User is created, but before
    // the handshake has completed.
  }
  protected function send($user, $message) {
    if ($user->handshake) {
      $message = $this->frame($message,$user);
      $result = @socket_write($user->socket, $message, strlen($message));
    }else {
      // User has not yet performed their handshake.  Store for sending later.
      $holdingMessage = array('user' => $user, 'message' => $message);
      $this->heldMessages[] = $holdingMessage;
    }
  }
  protected function tick() {
    // Override this for any process that should happen periodically.  Will happen at least once
    // per second, but possibly more often.
  }
  protected function _tick() {
    // Core maintenance processes, such as retrying failed messages.
    foreach ($this->heldMessages as $key => $hm) {
      $found = false;
      foreach ($this->users as $currentUser) {
        if ($hm['user']->socket == $currentUser->socket) {
          $found = true;
          if ($currentUser->handshake) {
            unset($this->heldMessages[$key]);
            $this->send($currentUser, $hm['message']);
          }
        }
      }
      if (!$found) {
        // If they're no longer in the list of connected users, drop the message.
        unset($this->heldMessages[$key]);
      }
    }
  }
  /**
   * Main processing loop
   */
  public function run() {
    while(true) {
      if (empty($this->sockets)) {
        $this->sockets['m'] = $this->master;
      }
      $read = $this->sockets;
      $write = $except = null;
      $this->_tick();
      $this->tick();
      @socket_select($read,$write,$except,1);
      foreach ($read as $socket) {
        if ($socket == $this->master) {
          $client = socket_accept($socket);
          if ($client < 0) {
            $this->stderr("Failed: socket_accept()");
            continue;
          }else {
            $this->connect($client);
            $this->stdout("Client connected. " . $client);
          }
        }else {
          $numBytes = @socket_recv($socket, $buffer, $this->maxBufferSize, 0);
          if ($numBytes === false) {
            $sockErrNo = socket_last_error($socket);
            switch ($sockErrNo)
            {
              case 102: // ENETRESET    -- Network dropped connection because of reset
              case 103: // ECONNABORTED -- Software caused connection abort
              case 104: // ECONNRESET   -- Connection reset by peer
              case 108: // ESHUTDOWN    -- Cannot send after transport endpoint shutdown -- probably more of an error on our part, if we're trying to write after the socket is closed.  Probably not a critical error, though.
              case 110: // ETIMEDOUT    -- Connection timed out
              case 111: // ECONNREFUSED -- Connection refused -- We shouldn't see this one, since we're listening... Still not a critical error.
              case 112: // EHOSTDOWN    -- Host is down -- Again, we shouldn't see this, and again, not critical because it's just one connection and we still want to listen to/for others.
              case 113: // EHOSTUNREACH -- No route to host
              case 121: // EREMOTEIO    -- Rempte I/O error -- Their hard drive just blew up.
              case 125: // ECANCELED    -- Operation canceled                
                $this->stderr("Unusual disconnect on socket " . $socket);
                $this->disconnect($socket, true, $sockErrNo); // disconnect before clearing error, in case someone with their own implementation wants to check for error conditions on the socket.
                break;
              default:
                $this->stderr('Socket error: ' . socket_strerror($sockErrNo));
            }            
          }elseif ($numBytes == 0) {
            $this->disconnect($socket);
            $this->stderr("Client disconnected. TCP connection lost: " . $socket);
          }else {
            $user = $this->getUserBySocket($socket);
            if (!$user->handshake) {
              $tmp = str_replace("\r", '', $buffer);
              if (strpos($tmp, "\n\n") === false ) {
                continue; // If the client has not finished sending the header, then wait before sending our upgrade response.
              }
              $ws_tcp = strpos($buffer,"Upgrade:websocket") || strpos($buffer,"Sec-WebSocket-Key") ;
              if($ws_tcp){ //如果客戶端使用的是websocket
                  $this->doHandshake($user,$buffer);
              }else{ //當客戶端使用的是socket
                  $user->handshake = "TCP" ;
                  //echo $buffer."\n" ;
                  $this->process($user, $buffer);
              }
            }else {
                if($user->handshake == "TCP"){//如果客戶端是socket發過來的訊息
                    if (strpos($buffer, "\n\n") === false ) {
                        continue; // 檢查是否成功完全接收客戶端是否傳送訊息.
                    }
                    $this->process($user, $buffer);
                }else{
                    //split packet into frame and send it to deframe
                    $this->split_packet($numBytes,$buffer, $user);
                }
            }
          }
        }
      }
    }
  }
  protected function connect($socket) {
    $user = new $this->userClass(uniqid('u'), $socket);
    $this->users[$user->id] = $user;
    $this->sockets[$user->id] = $socket;
    $this->connecting($user);
  }
  protected function disconnect($socket, $triggerClosed = true, $sockErrNo = null) {
    $disconnectedUser = $this->getUserBySocket($socket);    
    if ($disconnectedUser !== null) {
      unset($this->users[$disconnectedUser->id]);        
      if (array_key_exists($disconnectedUser->id, $this->sockets)) {
        unset($this->sockets[$disconnectedUser->id]);
      }      
      if (!is_null($sockErrNo)) {
        socket_clear_error($socket);
      }
      if ($triggerClosed) {
        $this->stdout("Client disconnected. ".$disconnectedUser->socket);
        $this->closed($disconnectedUser);
        socket_close($disconnectedUser->socket);
      }else {
        $message = $this->frame('', $disconnectedUser, 'close');
        @socket_write($disconnectedUser->socket, $message, strlen($message));
      }
    }
  }
  protected function doHandshake($user, $buffer) {
    $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
    $headers = array();
    $lines = explode("\n",$buffer);
    foreach ($lines as $line) {
      if (strpos($line,":") !== false) {
        $header = explode(":",$line,2);
        $headers[strtolower(trim($header[0]))] = trim($header[1]);
      }elseif (stripos($line,"get ") !== false) {
        preg_match("/GET (.*) HTTP/i", $buffer, $reqResource);
        $headers['get'] = trim($reqResource[1]);
      }
    }
    if (isset($headers['get'])) {
      $user->requestedResource = $headers['get'];
    }else {
      // todo: fail the connection
      $handshakeResponse = "HTTP/1.1 405 Method Not Allowed\r\n\r\n";     
    }
    if (!isset($headers['host']) || !$this->checkHost($headers['host'])) {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    }
    if (!isset($headers['upgrade']) || strtolower($headers['upgrade']) != 'websocket') {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    } 
    if (!isset($headers['connection']) || strpos(strtolower($headers['connection']), 'upgrade') === FALSE) {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    }
    if (!isset($headers['sec-websocket-key'])) {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    }else {
    }
    if (!isset($headers['sec-websocket-version']) || strtolower($headers['sec-websocket-version']) != 13) {
      $handshakeResponse = "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocketVersion: 13";
    }
    if (($this->headerOriginRequired && !isset($headers['origin']) ) || ($this->headerOriginRequired && !$this->checkOrigin($headers['origin']))) {
      $handshakeResponse = "HTTP/1.1 403 Forbidden";
    }
    if (($this->headerSecWebSocketProtocolRequired && !isset($headers['sec-websocket-protocol'])) || ($this->headerSecWebSocketProtocolRequired && !$this->checkWebsocProtocol($headers['sec-websocket-protocol']))) {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    }
    if (($this->headerSecWebSocketExtensionsRequired && !isset($headers['sec-websocket-extensions'])) || ($this->headerSecWebSocketExtensionsRequired && !$this->checkWebsocExtensions($headers['sec-websocket-extensions']))) {
      $handshakeResponse = "HTTP/1.1 400 Bad Request";
    }
    // Done verifying the _required_ headers and optionally required headers.
    if (isset($handshakeResponse)) {
      socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse));
      $this->disconnect($user->socket);
      return;
    }
    $user->headers = $headers;
    $user->handshake = $buffer;
    $webSocketKeyHash = sha1($headers['sec-websocket-key'] . $magicGUID);
    $rawToken = "";
    for ($i = 0; $i < 20; $i++) {
      $rawToken .= chr(hexdec(substr($webSocketKeyHash,$i*2, 2)));
    }
    $handshakeToken = base64_encode($rawToken) . "\r\n";
    $subProtocol = (isset($headers['sec-websocket-protocol'])) ? $this->processProtocol($headers['sec-websocket-protocol']) : "";
    $extensions = (isset($headers['sec-websocket-extensions'])) ? $this->processExtensions($headers['sec-websocket-extensions']) : "";
    $handshakeResponse = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $handshakeToken$subProtocol$extensions\r\n";
    socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse));
    $this->connected($user);
  }
  protected function checkHost($hostName) {
    return true; // Override and return false if the host is not one that you would expect.
                 // Ex: You only want to accept hosts from the my-domain.com domain,
                 // but you receive a host from malicious-site.com instead.
  }
  protected function checkOrigin($origin) {
    return true; // Override and return false if the origin is not one that you would expect.
  }
  protected function checkWebsocProtocol($protocol) {
    return true; // Override and return false if a protocol is not found that you would expect.
  }
  protected function checkWebsocExtensions($extensions) {
    return true; // Override and return false if an extension is not found that you would expect.
  }
  protected function processProtocol($protocol) {
    return ""; // return either "Sec-WebSocket-Protocol: SelectedProtocolFromClientList\r\n" or return an empty string.  
           // The carriage return/newline combo must appear at the end of a non-empty string, and must not
           // appear at the beginning of the string nor in an otherwise empty string, or it will be considered part of 
           // the response body, which will trigger an error in the client as it will not be formatted correctly.
  }
  protected function processExtensions($extensions) {
    return ""; // return either "Sec-WebSocket-Extensions: SelectedExtensions\r\n" or return an empty string.
  }
  protected function getUserBySocket($socket) {
    foreach ($this->users as $user) {
      if ($user->socket == $socket) {
        return $user;
      }
    }
    return null;
  }
  public function stdout($message) {
    if ($this->interactive) {
      //echo "$message\n";
    }
  }
  public function stderr($message) {
    if ($this->interactive) {
      //echo "$message\n";
    }
  }
  protected function frame($message, $user, $messageType='text', $messageContinues=false) {
    switch ($messageType) {
      case 'continuous':
        $b1 = 0;
        break;
      case 'text':
        $b1 = ($user->sendingContinuous) ? 0 : 1;
        break;
      case 'binary':
        $b1 = ($user->sendingContinuous) ? 0 : 2;
        break;
      case 'close':
        $b1 = 8;
        break;
      case 'ping':
        $b1 = 9;
        break;
      case 'pong':
        $b1 = 10;
        break;
    }
    if ($messageContinues) {
      $user->sendingContinuous = true;
    }else {
      $b1 += 128;
      $user->sendingContinuous = false;
    }
    $length = strlen($message);
    $lengthField = "";
    if ($length < 126) {
      $b2 = $length;
    }elseif ($length < 65536) {
      $b2 = 126;
      $hexLength = dechex($length);
      //$this->stdout("Hex Length: $hexLength");
      if (strlen($hexLength)%2 == 1) {
        $hexLength = '0' . $hexLength;
      } 
      $n = strlen($hexLength) - 2;
      for ($i = $n; $i >= 0; $i=$i-2) {
        $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
      }
      while (strlen($lengthField) < 2) {
        $lengthField = chr(0) . $lengthField;
      }
    }else {
      $b2 = 127;
      $hexLength = dechex($length);
      if (strlen($hexLength)%2 == 1) {
        $hexLength = '0' . $hexLength;
      } 
      $n = strlen($hexLength) - 2;
      for ($i = $n; $i >= 0; $i=$i-2) {
        $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
      }
      while (strlen($lengthField) < 8) {
        $lengthField = chr(0) . $lengthField;
      }
    }
    return chr($b1) . chr($b2) . $lengthField . $message;
  } 
  //check packet if he have more than one frame and process each frame individually
  protected function split_packet($length,$packet, $user) {
    //add PartialPacket and calculate the new $length
    if ($user->handlingPartialPacket) {
      $packet = $user->partialBuffer . $packet;
      $user->handlingPartialPacket = false;
      $length=strlen($packet);
    }
    $fullpacket=$packet;
    $frame_pos=0;
    $frame_id=1;
    while($frame_pos<$length) {
      $headers = $this->extractHeaders($packet);
      $headers_size = $this->calcoffset($headers);
      $framesize=$headers['length']+$headers_size;     
      //split frame from packet and process it
      $frame=substr($fullpacket,$frame_pos,$framesize);
      if (($message = $this->deframe($frame, $user,$headers)) !== FALSE) {
        if ($user->hasSentClose) {
          $this->disconnect($user->socket);
        } else {
        //  if ((preg_match('//u', $message)) || ($headers['opcode']==2)) {
            //$this->stdout("Text msg encoded UTF-8 or Binary msg\n".$message); 
            $this->process($user, $message);
          /*} else {
            $this->stderr("not UTF-8\n");
          }*/
        }
      } 
      //get the new position also modify packet data
      $frame_pos+=$framesize;
      $packet=substr($fullpacket,$frame_pos);
      $frame_id++;
    }
  }
  protected function calcoffset($headers) {
    $offset = 2;
    if ($headers['hasmask']) {
      $offset += 4;
    }
    if ($headers['length'] > 65535) {
      $offset += 8;
    } elseif ($headers['length'] > 125) {
      $offset += 2;
    }
    return $offset;
  }
  protected function deframe($message, &$user) {
    //echo $this->strtohex($message);
    $headers = $this->extractHeaders($message);
    $pongReply = false;
    $willClose