We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Diff against HttpProtocol implementation, implementing the 101 Switching protocols for Websockets:
HttpProtocol
diff --git a/src/main/php/xp/web/srv/HttpProtocol.class.php b/src/main/php/xp/web/srv/HttpProtocol.class.php index 5de7199..f97a498 100755 --- a/src/main/php/xp/web/srv/HttpProtocol.class.php +++ b/src/main/php/xp/web/srv/HttpProtocol.class.php @@ -3,7 +3,9 @@ use Throwable; use lang\ClassLoader; use peer\server\{AsyncServer, ServerProtocol}; +use util\cmd\Console; use web\{Error, InternalServerError, Request, Response, Headers, Status}; +use websocket\protocol\{Opcodes, Connection}; /** * HTTP protocol implementation @@ -14,6 +16,7 @@ class HttpProtocol implements ServerProtocol { private $application, $logging; public $server= null; private $close= false; + private $protocols= []; /** * Creates a new protocol instance @@ -24,6 +27,114 @@ class HttpProtocol implements ServerProtocol { private function __construct($application, $logging) { $this->application= $application; $this->logging= $logging; + $this->protocols= [ + 'websocket' => function($socket) { + $socket->setTimeout(600); + $socket->useNoDelay(); + + $conn= new Connection($socket, (int)$socket->getHandle(), null); + foreach ($conn->receive() as $type => $message) { + if (Opcodes::CLOSE === $type) { + $conn->close(); + break; + } + + try { + yield from $this->application->message($conn, $message) ?? []; + $hint= ''; + } catch (Throwable $e) { + $hint= " \e[2m[{$e->getMessage()}]\e[0m"; + } + + // TODO: Use logging facility + Console::writeLinef( + " \e[33m[%s %d %.3fkB]\e[0m WSS %s (%d bytes)%s", + date('Y-m-d H:i:s'), + getmypid(), + memory_get_usage() / 1024, + Opcodes::nameOf($type), + strlen($message), + $hint + ); + } + yield; + }, + 'http' => function($socket) { + $input= new Input($socket); + yield from $input->consume(); + + if ($input->kind & Input::REQUEST) { + gc_enable(); + $version= $input->version(); + $request= new Request($input); + $response= new Response(new Output($socket, $version)); + $response->header('Date', Headers::date()); + $response->header('Server', 'XP'); + + // HTTP/1.1 defaults to keeping connection alive, HTTP/1.0 defaults to closing + $connection= $request->header('Connection', ''); + if ($this->close) { + $close= true; + $response->header('Connection', 'close'); + } else if ($version < '1.1') { + $close= 0 !== strncasecmp('keep-alive', $connection, 10); + $close || $response->header('Connection', 'keep-alive'); + } else { + $close= 0 === strncasecmp('close', $connection, 5); + $close && $response->header('Connection', 'close'); + } + + try { + if (Input::REQUEST === $input->kind) { + yield from $this->application->service($request, $response) ?? []; + } else if ($input->kind & Input::TIMEOUT) { + $response->answer(408); + $response->send('Client timed out sending status line and request headers', 'text/plain'); + $close= true; + } else if ($input->kind & Input::EXCESSIVE) { + $response->answer(431); + $response->send('Client sent excessively long status line or request headers', 'text/plain'); + $close= true; + } + + $this->logging->log($request, $response, $response->trace); + + if (101 === $response->status()) { + $close= false; + return [$response->headers()['Upgrade'] => $request]; + } + } catch (CannotWrite $e) { + $this->logging->log($request, $response, $response->trace + ['warn' => $e]); + } catch (Error $e) { + $this->sendError($request, $response, $e); + } catch (Throwable $e) { + $this->sendError($request, $response, new InternalServerError($e)); + } finally { + $response->end(); + $close ? $socket->close() : $request->consume(); + + gc_collect_cycles(); + gc_disable(); + clearstatcache(); + \xp::gc(); + } + return; + } + + // Handle request errors and close the socket + if (!($input->kind & Input::CLOSE)) { + $status= '400 Bad Request'; + $error= 'Client sent incomplete HTTP request: "'.addcslashes($input->buffer, "\0..\37!\177..\377").'"'; + $socket->write(sprintf( + "HTTP/1.1 %s\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n%s", + $status, + strlen($error), + $error + )); + } + $socket->close(); + } + ]; } /** @@ -96,7 +207,7 @@ class HttpProtocol implements ServerProtocol { * @param peer.Socket $socket */ public function handleConnect($socket) { - // Intentionally empty + $this->protocols[(int)$socket->getHandle()]= $this->protocols['http']; } /** @@ -105,6 +216,7 @@ class HttpProtocol implements ServerProtocol { * @param peer.Socket $socket */ public function handleDisconnect($socket) { + unset($this->protocols[(int)$socket->getHandle()]); $socket->close(); } @@ -115,74 +227,14 @@ class HttpProtocol implements ServerProtocol { * @return void */ public function handleData($socket) { - $input= new Input($socket); - yield from $input->consume(); - - if ($input->kind & Input::REQUEST) { - gc_enable(); - $version= $input->version(); - $request= new Request($input); - $response= new Response(new Output($socket, $version)); - $response->header('Date', Headers::date()); - $response->header('Server', 'XP'); - - // HTTP/1.1 defaults to keeping connection alive, HTTP/1.0 defaults to closing - $connection= $request->header('Connection', ''); - if ($this->close) { - $close= true; - $response->header('Connection', 'close'); - } else if ($version < '1.1') { - $close= 0 !== strncasecmp('keep-alive', $connection, 10); - $close || $response->header('Connection', 'keep-alive'); - } else { - $close= 0 === strncasecmp('close', $connection, 5); - $close && $response->header('Connection', 'close'); - } + $handle= (int)$socket->getHandle(); + $handler= $this->protocols[$handle]($socket); - try { - if (Input::REQUEST === $input->kind) { - yield from $this->application->service($request, $response) ?? []; - } else if ($input->kind & Input::TIMEOUT) { - $response->answer(408); - $response->send('Client timed out sending status line and request headers', 'text/plain'); - $close= true; - } else if ($input->kind & Input::EXCESSIVE) { - $response->answer(431); - $response->send('Client sent excessively long status line or request headers', 'text/plain'); - $close= true; - } + yield from $handler; - $this->logging->log($request, $response, $response->trace); - } catch (CannotWrite $e) { - $this->logging->log($request, $response, $response->trace + ['warn' => $e]); - } catch (Error $e) { - $this->sendError($request, $response, $e); - } catch (Throwable $e) { - $this->sendError($request, $response, new InternalServerError($e)); - } finally { - $response->end(); - $close ? $socket->close() : $request->consume(); - - gc_collect_cycles(); - gc_disable(); - clearstatcache(); - \xp::gc(); - } - return; + if ($switch= $handler->getReturn()) { + $this->protocols[$handle]= $this->protocols[key($switch)]; } - - // Handle request errors and close the socket - if (!($input->kind & Input::CLOSE)) { - $status= '400 Bad Request'; - $error= 'Client sent incomplete HTTP request: "'.addcslashes($input->buffer, "\0..\37!\177..\377").'"'; - $socket->write(sprintf( - "HTTP/1.1 %s\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n%s", - $status, - strlen($error), - $error - )); - } - $socket->close(); } /** @@ -192,7 +244,7 @@ class HttpProtocol implements ServerProtocol { * @param lang.XPException $e */ public function handleError($socket, $e) { - // $e->printStackTrace(); + unset($this->protocols[(int)$socket->getHandle()]); $socket->close(); } } \ No newline at end of file
TODO:
The text was updated successfully, but these errors were encountered:
Successfully merging a pull request may close this issue.
Implementation
Diff against
HttpProtocol
implementation, implementing the 101 Switching protocols for Websockets:TODO:
Usage
The text was updated successfully, but these errors were encountered: