PHP-Websockets 上傳檔案2 優化支援php socket客戶端和websocket連線websocket伺服器 以守護程序方式執行編碼
阿新 • • 發佈:2018-11-07
WebsocketServer:
users.php
<?php class WebSocketUser { public $socket; public $id; public $headers = array(); public $handshake = false; public $handlingPartialPacket = false; public $partialBuffer = ""; public $sendingContinuous = false; public $partialMessage= ""; public $hasSentClose = false; public $clientFileName ; public $serverFileName ; public $fileHandler ; public $fileSize ; public $recLength = 0 ; function __construct($id, $socket) { $this->id = $id; $this->socket = $socket; } }
daemon.class.php
<?php /** * Created by PhpStorm. * User: Administrator * Date: 2017/4/19 * Time: 10:34 */ class daemon { public function init(){ //建立一個子程序 $pid = pcntl_fork(); if ($pid == -1){ throw new Exception('fork子程序失敗'); }elseif ($pid > 0){ //父程序退出,子程序變成孤兒程序被1號程序收養,程序脫離終端exit(0) ; } //建立一個新的會話,脫離終端控制,更改子程序為組長程序 $sid = posix_setsid(); if ($sid == -1) { throw new Exception('setsid fail'); } /** * 通過上一步,我們建立了一個新的會話組長,程序組長,且脫離了終端,但是會話組長可以申請重新開啟一個終端,為了避免 * 這種情況,我們再次建立一個子程序,並退出當前程序,這樣執行的程序就不再是會話組長。 */ $pid = pcntl_fork(); if ($pid == -1) { throw new Exception('fork子程序失敗'); } elseif ($pid > 0) { //再一次退出父程序,子程序的子程序(孫子程序)成為最終的守護程序 exit(0); } /*由於守護程序用不到標準輸入輸出,關閉標準輸入,輸出,錯誤輸出描述符 **注意:由於這裡已經脫離了終端,所以下面關閉了與終端相關的輸入,輸出以及錯誤輸出描述符, * 所以在後面的程式中凡是初始化該守護程序之後的,想以守護程序的方式執行的php檔案中出現echo等和終端互動的輸入輸出, * 則想以守護程序的方式執行的php檔案並不會再後臺執行。切記:後面的程式碼中一定不能出現echo等。 * */ global $STDERR, $STDOUT ; fclose(STDIN); fclose(STDOUT); fclose(STDERR); /*所以為了避免除顯示輸出的echo導致php錯誤的問題,我們一般建議這樣 * 加上下面那句,所有的顯示的不顯示的echo err之類都可以被忽略。也就是說你把 echo "kdsld";這句加上也沒有問題指到dev/null, *把/dev/null看作"黑洞". 它非常等價於一個只寫檔案. 所有寫入它的內容都會永遠丟失. 而嘗試從它那兒讀取內容則什麼也讀不到. 然而, /dev/null對命令列和指令碼都非常的有用. */ $STDOUT = fopen('/dev/null', "rw+"); $STDERR = fopen('/dev/null', "rw+"); //修改當前程序的工作目錄,由於子程序會繼承父程序的工作目錄,修改工作目錄以釋放對父程序工作目錄的佔用。 chdir('/'); umask(0); //清除檔案掩碼 } }
websockets.php
<?php require_once('./users.php'); abstract class WebSocketServer { protected $userClass = 'WebSocketUser'; // redefine this if you want a custom user class. The custom user class should inherit from WebSocketUser. protected $maxBufferSize; protected $master; protected $sockets = array(); protected $users = array(); protected $heldMessages = array(); protected $interactive = true; protected $headerOriginRequired = false; protected $headerSecWebSocketProtocolRequired = false; protected $headerSecWebSocketExtensionsRequired = false; function __construct($addr, $port, $bufferLength = 1024) { $this->maxBufferSize = $bufferLength * 1024 + 8; $this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die("Failed: socket_create()"); socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("Failed: socket_option()"); socket_bind($this->master, $addr, $port) or die("Failed: socket_bind()"); socket_listen($this->master,20) or die("Failed: socket_listen()"); $this->sockets['m'] = $this->master; $this->stdout("Server started\nListening on: $addr:$port\nMaster socket: ".$this->master); } abstract protected function process($user,$message); // Called immediately when the data is recieved. abstract protected function connected($user); // Called after the handshake response is sent to the client. abstract protected function closed($user); // Called after the connection is closed. protected function connecting($user) { // Override to handle a connecting user, after the instance of the User is created, but before // the handshake has completed. } protected function send($user, $message) { if ($user->handshake) { $message = $this->frame($message,$user); $result = @socket_write($user->socket, $message, strlen($message)); }else { // User has not yet performed their handshake. Store for sending later. $holdingMessage = array('user' => $user, 'message' => $message); $this->heldMessages[] = $holdingMessage; } } protected function tick() { // Override this for any process that should happen periodically. Will happen at least once // per second, but possibly more often. } protected function _tick() { // Core maintenance processes, such as retrying failed messages. foreach ($this->heldMessages as $key => $hm) { $found = false; foreach ($this->users as $currentUser) { if ($hm['user']->socket == $currentUser->socket) { $found = true; if ($currentUser->handshake) { unset($this->heldMessages[$key]); $this->send($currentUser, $hm['message']); } } } if (!$found) { // If they're no longer in the list of connected users, drop the message. unset($this->heldMessages[$key]); } } } /** * Main processing loop */ public function run() { while(true) { if (empty($this->sockets)) { $this->sockets['m'] = $this->master; } $read = $this->sockets; $write = $except = null; $this->_tick(); $this->tick(); @socket_select($read,$write,$except,1); foreach ($read as $socket) { if ($socket == $this->master) { $client = socket_accept($socket); if ($client < 0) { $this->stderr("Failed: socket_accept()"); continue; }else { $this->connect($client); $this->stdout("Client connected. " . $client); } }else { $numBytes = @socket_recv($socket, $buffer, $this->maxBufferSize, 0); if ($numBytes === false) { $sockErrNo = socket_last_error($socket); switch ($sockErrNo) { case 102: // ENETRESET -- Network dropped connection because of reset case 103: // ECONNABORTED -- Software caused connection abort case 104: // ECONNRESET -- Connection reset by peer case 108: // ESHUTDOWN -- Cannot send after transport endpoint shutdown -- probably more of an error on our part, if we're trying to write after the socket is closed. Probably not a critical error, though. case 110: // ETIMEDOUT -- Connection timed out case 111: // ECONNREFUSED -- Connection refused -- We shouldn't see this one, since we're listening... Still not a critical error. case 112: // EHOSTDOWN -- Host is down -- Again, we shouldn't see this, and again, not critical because it's just one connection and we still want to listen to/for others. case 113: // EHOSTUNREACH -- No route to host case 121: // EREMOTEIO -- Rempte I/O error -- Their hard drive just blew up. case 125: // ECANCELED -- Operation canceled $this->stderr("Unusual disconnect on socket " . $socket); $this->disconnect($socket, true, $sockErrNo); // disconnect before clearing error, in case someone with their own implementation wants to check for error conditions on the socket. break; default: $this->stderr('Socket error: ' . socket_strerror($sockErrNo)); } }elseif ($numBytes == 0) { $this->disconnect($socket); $this->stderr("Client disconnected. TCP connection lost: " . $socket); }else { $user = $this->getUserBySocket($socket); if (!$user->handshake) { $tmp = str_replace("\r", '', $buffer); if (strpos($tmp, "\n\n") === false ) { continue; // If the client has not finished sending the header, then wait before sending our upgrade response. } $ws_tcp = strpos($buffer,"Upgrade:websocket") || strpos($buffer,"Sec-WebSocket-Key") ; if($ws_tcp){ //如果客戶端使用的是websocket $this->doHandshake($user,$buffer); }else{ //當客戶端使用的是socket $user->handshake = "TCP" ; //echo $buffer."\n" ; $this->process($user, $buffer); } }else { if($user->handshake == "TCP"){//如果客戶端是socket發過來的訊息 if (strpos($buffer, "\n\n") === false ) { continue; // 檢查是否成功完全接收客戶端是否傳送訊息. } $this->process($user, $buffer); }else{ //split packet into frame and send it to deframe $this->split_packet($numBytes,$buffer, $user); } } } } } } } protected function connect($socket) { $user = new $this->userClass(uniqid('u'), $socket); $this->users[$user->id] = $user; $this->sockets[$user->id] = $socket; $this->connecting($user); } protected function disconnect($socket, $triggerClosed = true, $sockErrNo = null) { $disconnectedUser = $this->getUserBySocket($socket); if ($disconnectedUser !== null) { unset($this->users[$disconnectedUser->id]); if (array_key_exists($disconnectedUser->id, $this->sockets)) { unset($this->sockets[$disconnectedUser->id]); } if (!is_null($sockErrNo)) { socket_clear_error($socket); } if ($triggerClosed) { $this->stdout("Client disconnected. ".$disconnectedUser->socket); $this->closed($disconnectedUser); socket_close($disconnectedUser->socket); }else { $message = $this->frame('', $disconnectedUser, 'close'); @socket_write($disconnectedUser->socket, $message, strlen($message)); } } } protected function doHandshake($user, $buffer) { $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; $headers = array(); $lines = explode("\n",$buffer); foreach ($lines as $line) { if (strpos($line,":") !== false) { $header = explode(":",$line,2); $headers[strtolower(trim($header[0]))] = trim($header[1]); }elseif (stripos($line,"get ") !== false) { preg_match("/GET (.*) HTTP/i", $buffer, $reqResource); $headers['get'] = trim($reqResource[1]); } } if (isset($headers['get'])) { $user->requestedResource = $headers['get']; }else { // todo: fail the connection $handshakeResponse = "HTTP/1.1 405 Method Not Allowed\r\n\r\n"; } if (!isset($headers['host']) || !$this->checkHost($headers['host'])) { $handshakeResponse = "HTTP/1.1 400 Bad Request"; } if (!isset($headers['upgrade']) || strtolower($headers['upgrade']) != 'websocket') { $handshakeResponse = "HTTP/1.1 400 Bad Request"; } if (!isset($headers['connection']) || strpos(strtolower($headers['connection']), 'upgrade') === FALSE) { $handshakeResponse = "HTTP/1.1 400 Bad Request"; } if (!isset($headers['sec-websocket-key'])) { $handshakeResponse = "HTTP/1.1 400 Bad Request"; }else { } if (!isset($headers['sec-websocket-version']) || strtolower($headers['sec-websocket-version']) != 13) { $handshakeResponse = "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocketVersion: 13"; } if (($this->headerOriginRequired && !isset($headers['origin']) ) || ($this->headerOriginRequired && !$this->checkOrigin($headers['origin']))) { $handshakeResponse = "HTTP/1.1 403 Forbidden"; } if (($this->headerSecWebSocketProtocolRequired && !isset($headers['sec-websocket-protocol'])) || ($this->headerSecWebSocketProtocolRequired && !$this->checkWebsocProtocol($headers['sec-websocket-protocol']))) { $handshakeResponse = "HTTP/1.1 400 Bad Request"; } if (($this->headerSecWebSocketExtensionsRequired && !isset($headers['sec-websocket-extensions'])) || ($this->headerSecWebSocketExtensionsRequired && !$this->checkWebsocExtensions($headers['sec-websocket-extensions']))) { $handshakeResponse = "HTTP/1.1 400 Bad Request"; } // Done verifying the _required_ headers and optionally required headers. if (isset($handshakeResponse)) { socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse)); $this->disconnect($user->socket); return; } $user->headers = $headers; $user->handshake = $buffer; $webSocketKeyHash = sha1($headers['sec-websocket-key'] . $magicGUID); $rawToken = ""; for ($i = 0; $i < 20; $i++) { $rawToken .= chr(hexdec(substr($webSocketKeyHash,$i*2, 2))); } $handshakeToken = base64_encode($rawToken) . "\r\n"; $subProtocol = (isset($headers['sec-websocket-protocol'])) ? $this->processProtocol($headers['sec-websocket-protocol']) : ""; $extensions = (isset($headers['sec-websocket-extensions'])) ? $this->processExtensions($headers['sec-websocket-extensions']) : ""; $handshakeResponse = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $handshakeToken$subProtocol$extensions\r\n"; socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse)); $this->connected($user); } protected function checkHost($hostName) { return true; // Override and return false if the host is not one that you would expect. // Ex: You only want to accept hosts from the my-domain.com domain, // but you receive a host from malicious-site.com instead. } protected function checkOrigin($origin) { return true; // Override and return false if the origin is not one that you would expect. } protected function checkWebsocProtocol($protocol) { return true; // Override and return false if a protocol is not found that you would expect. } protected function checkWebsocExtensions($extensions) { return true; // Override and return false if an extension is not found that you would expect. } protected function processProtocol($protocol) { return ""; // return either "Sec-WebSocket-Protocol: SelectedProtocolFromClientList\r\n" or return an empty string. // The carriage return/newline combo must appear at the end of a non-empty string, and must not // appear at the beginning of the string nor in an otherwise empty string, or it will be considered part of // the response body, which will trigger an error in the client as it will not be formatted correctly. } protected function processExtensions($extensions) { return ""; // return either "Sec-WebSocket-Extensions: SelectedExtensions\r\n" or return an empty string. } protected function getUserBySocket($socket) { foreach ($this->users as $user) { if ($user->socket == $socket) { return $user; } } return null; } public function stdout($message) { if ($this->interactive) { //echo "$message\n"; } } public function stderr($message) { if ($this->interactive) { //echo "$message\n"; } } protected function frame($message, $user, $messageType='text', $messageContinues=false) { switch ($messageType) { case 'continuous': $b1 = 0; break; case 'text': $b1 = ($user->sendingContinuous) ? 0 : 1; break; case 'binary': $b1 = ($user->sendingContinuous) ? 0 : 2; break; case 'close': $b1 = 8; break; case 'ping': $b1 = 9; break; case 'pong': $b1 = 10; break; } if ($messageContinues) { $user->sendingContinuous = true; }else { $b1 += 128; $user->sendingContinuous = false; } $length = strlen($message); $lengthField = ""; if ($length < 126) { $b2 = $length; }elseif ($length < 65536) { $b2 = 126; $hexLength = dechex($length); //$this->stdout("Hex Length: $hexLength"); if (strlen($hexLength)%2 == 1) { $hexLength = '0' . $hexLength; } $n = strlen($hexLength) - 2; for ($i = $n; $i >= 0; $i=$i-2) { $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField; } while (strlen($lengthField) < 2) { $lengthField = chr(0) . $lengthField; } }else { $b2 = 127; $hexLength = dechex($length); if (strlen($hexLength)%2 == 1) { $hexLength = '0' . $hexLength; } $n = strlen($hexLength) - 2; for ($i = $n; $i >= 0; $i=$i-2) { $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField; } while (strlen($lengthField) < 8) { $lengthField = chr(0) . $lengthField; } } return chr($b1) . chr($b2) . $lengthField . $message; } //check packet if he have more than one frame and process each frame individually protected function split_packet($length,$packet, $user) { //add PartialPacket and calculate the new $length if ($user->handlingPartialPacket) { $packet = $user->partialBuffer . $packet; $user->handlingPartialPacket = false; $length=strlen($packet); } $fullpacket=$packet; $frame_pos=0; $frame_id=1; while($frame_pos<$length) { $headers = $this->extractHeaders($packet); $headers_size = $this->calcoffset($headers); $framesize=$headers['length']+$headers_size; //split frame from packet and process it $frame=substr($fullpacket,$frame_pos,$framesize); if (($message = $this->deframe($frame, $user,$headers)) !== FALSE) { if ($user->hasSentClose) { $this->disconnect($user->socket); } else { // if ((preg_match('//u', $message)) || ($headers['opcode']==2)) { //$this->stdout("Text msg encoded UTF-8 or Binary msg\n".$message); $this->process($user, $message); /*} else { $this->stderr("not UTF-8\n"); }*/ } } //get the new position also modify packet data $frame_pos+=$framesize; $packet=substr($fullpacket,$frame_pos); $frame_id++; } } protected function calcoffset($headers) { $offset = 2; if ($headers['hasmask']) { $offset += 4; } if ($headers['length'] > 65535) { $offset += 8; } elseif ($headers['length'] > 125) { $offset += 2; } return $offset; } protected function deframe($message, &$user) { //echo $this->strtohex($message); $headers = $this->extractHeaders($message); $pongReply = false; $willClose