Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket support #120

Open
2 of 5 tasks
thekid opened this issue Nov 3, 2024 · 0 comments · May be fixed by #121
Open
2 of 5 tasks

Add websocket support #120

thekid opened this issue Nov 3, 2024 · 0 comments · May be fixed by #121

Comments

@thekid
Copy link
Member

thekid commented Nov 3, 2024

Implementation

Diff against HttpProtocol implementation, implementing the 101 Switching protocols for Websockets:

diff --git a/src/main/php/xp/web/srv/HttpProtocol.class.php b/src/main/php/xp/web/srv/HttpProtocol.class.php
index 5de7199..f97a498 100755
--- a/src/main/php/xp/web/srv/HttpProtocol.class.php
+++ b/src/main/php/xp/web/srv/HttpProtocol.class.php
@@ -3,7 +3,9 @@
 use Throwable;
 use lang\ClassLoader;
 use peer\server\{AsyncServer, ServerProtocol};
+use util\cmd\Console;
 use web\{Error, InternalServerError, Request, Response, Headers, Status};
+use websocket\protocol\{Opcodes, Connection};
 
 /**
  * HTTP protocol implementation
@@ -14,6 +16,7 @@ class HttpProtocol implements ServerProtocol {
   private $application, $logging;
   public $server= null;
   private $close= false;
+  private $protocols= [];
 
   /**
    * Creates a new protocol instance
@@ -24,6 +27,114 @@ class HttpProtocol implements ServerProtocol {
   private function __construct($application, $logging) {
     $this->application= $application;
     $this->logging= $logging;
+    $this->protocols= [
+      'websocket' => function($socket) {
+        $socket->setTimeout(600);
+        $socket->useNoDelay();
+
+        $conn= new Connection($socket, (int)$socket->getHandle(), null);
+        foreach ($conn->receive() as $type => $message) {
+          if (Opcodes::CLOSE === $type) {
+            $conn->close();
+            break;
+          }
+
+          try {
+            yield from $this->application->message($conn, $message) ?? [];
+            $hint= '';
+          } catch (Throwable $e) {
+            $hint= " \e[2m[{$e->getMessage()}]\e[0m";
+          }
+
+          // TODO: Use logging facility
+          Console::writeLinef(
+            "  \e[33m[%s %d %.3fkB]\e[0m WSS %s (%d bytes)%s",
+            date('Y-m-d H:i:s'),
+            getmypid(),
+            memory_get_usage() / 1024,
+            Opcodes::nameOf($type),
+            strlen($message),
+            $hint
+          );
+        }
+        yield;
+      },
+      'http' => function($socket) {
+        $input= new Input($socket);
+        yield from $input->consume();
+
+        if ($input->kind & Input::REQUEST) {
+          gc_enable();
+          $version= $input->version();
+          $request= new Request($input);
+          $response= new Response(new Output($socket, $version));
+          $response->header('Date', Headers::date());
+          $response->header('Server', 'XP');
+
+          // HTTP/1.1 defaults to keeping connection alive, HTTP/1.0 defaults to closing
+          $connection= $request->header('Connection', '');
+          if ($this->close) {
+            $close= true;
+            $response->header('Connection', 'close');
+          } else if ($version < '1.1') {
+            $close= 0 !== strncasecmp('keep-alive', $connection, 10);
+            $close || $response->header('Connection', 'keep-alive');
+          } else {
+            $close= 0 === strncasecmp('close', $connection, 5);
+            $close && $response->header('Connection', 'close');
+          }
+
+          try {
+            if (Input::REQUEST === $input->kind) {
+              yield from $this->application->service($request, $response) ?? [];
+            } else if ($input->kind & Input::TIMEOUT) {
+              $response->answer(408);
+              $response->send('Client timed out sending status line and request headers', 'text/plain');
+              $close= true;
+            } else if ($input->kind & Input::EXCESSIVE) {
+              $response->answer(431);
+              $response->send('Client sent excessively long status line or request headers', 'text/plain');
+              $close= true;
+            }
+
+            $this->logging->log($request, $response, $response->trace);
+
+            if (101 === $response->status()) {
+              $close= false;
+              return [$response->headers()['Upgrade'] => $request];
+            }
+          } catch (CannotWrite $e) {
+            $this->logging->log($request, $response, $response->trace + ['warn' => $e]);
+          } catch (Error $e) {
+            $this->sendError($request, $response, $e);
+          } catch (Throwable $e) {
+            $this->sendError($request, $response, new InternalServerError($e));
+          } finally {
+            $response->end();
+            $close ? $socket->close() : $request->consume();
+
+            gc_collect_cycles();
+            gc_disable();
+            clearstatcache();
+            \xp::gc();
+          }
+          return;
+        }
+
+        // Handle request errors and close the socket
+        if (!($input->kind & Input::CLOSE)) {
+          $status= '400 Bad Request';
+          $error= 'Client sent incomplete HTTP request: "'.addcslashes($input->buffer, "\0..\37!\177..\377").'"';
+          $socket->write(sprintf(
+            "HTTP/1.1 %s\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n%s",
+            $status,
+            strlen($error),
+            $error
+          ));
+        }
+        $socket->close();
+      }
+    ];
   }
 
   /**
@@ -96,7 +207,7 @@ class HttpProtocol implements ServerProtocol {
    * @param  peer.Socket $socket
    */
   public function handleConnect($socket) {
-    // Intentionally empty
+    $this->protocols[(int)$socket->getHandle()]= $this->protocols['http'];
   }
 
   /**
@@ -105,6 +216,7 @@ class HttpProtocol implements ServerProtocol {
    * @param  peer.Socket $socket
    */
   public function handleDisconnect($socket) {
+    unset($this->protocols[(int)$socket->getHandle()]);
     $socket->close();
   }
 
@@ -115,74 +227,14 @@ class HttpProtocol implements ServerProtocol {
    * @return void
    */
   public function handleData($socket) {
-    $input= new Input($socket);
-    yield from $input->consume();
-
-    if ($input->kind & Input::REQUEST) {
-      gc_enable();
-      $version= $input->version();
-      $request= new Request($input);
-      $response= new Response(new Output($socket, $version));
-      $response->header('Date', Headers::date());
-      $response->header('Server', 'XP');
-
-      // HTTP/1.1 defaults to keeping connection alive, HTTP/1.0 defaults to closing
-      $connection= $request->header('Connection', '');
-      if ($this->close) {
-        $close= true;
-        $response->header('Connection', 'close');
-      } else if ($version < '1.1') {
-        $close= 0 !== strncasecmp('keep-alive', $connection, 10);
-        $close || $response->header('Connection', 'keep-alive');
-      } else {
-        $close= 0 === strncasecmp('close', $connection, 5);
-        $close && $response->header('Connection', 'close');
-      }
+    $handle= (int)$socket->getHandle();
+    $handler= $this->protocols[$handle]($socket);
 
-      try {
-        if (Input::REQUEST === $input->kind) {
-          yield from $this->application->service($request, $response) ?? [];
-        } else if ($input->kind & Input::TIMEOUT) {
-          $response->answer(408);
-          $response->send('Client timed out sending status line and request headers', 'text/plain');
-          $close= true;
-        } else if ($input->kind & Input::EXCESSIVE) {
-          $response->answer(431);
-          $response->send('Client sent excessively long status line or request headers', 'text/plain');
-          $close= true;
-        }
+    yield from $handler;
 
-        $this->logging->log($request, $response, $response->trace);
-      } catch (CannotWrite $e) {
-        $this->logging->log($request, $response, $response->trace + ['warn' => $e]);
-      } catch (Error $e) {
-        $this->sendError($request, $response, $e);
-      } catch (Throwable $e) {
-        $this->sendError($request, $response, new InternalServerError($e));
-      } finally {
-        $response->end();
-        $close ? $socket->close() : $request->consume();
-
-        gc_collect_cycles();
-        gc_disable();
-        clearstatcache();
-        \xp::gc();
-      }
-      return;
+    if ($switch= $handler->getReturn()) {
+      $this->protocols[$handle]= $this->protocols[key($switch)];
     }
-
-    // Handle request errors and close the socket
-    if (!($input->kind & Input::CLOSE)) {
-      $status= '400 Bad Request';
-      $error= 'Client sent incomplete HTTP request: "'.addcslashes($input->buffer, "\0..\37!\177..\377").'"';
-      $socket->write(sprintf(
-        "HTTP/1.1 %s\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n%s",
-        $status,
-        strlen($error),
-        $error
-      ));
-    }
-    $socket->close();
   }
 
   /**
@@ -192,7 +244,7 @@ class HttpProtocol implements ServerProtocol {
    * @param  lang.XPException $e
    */
   public function handleError($socket, $e) {
-    // $e->printStackTrace();
+    unset($this->protocols[(int)$socket->getHandle()]);
     $socket->close();
   }
 }
\ No newline at end of file

TODO:

  • Proof of concept
  • Extensive testing
  • Figure out how to schedule keep-alive PINGs
  • Extract protocol handlers
  • Pass path and headers to websocket connections

Usage

@thekid thekid linked a pull request Jan 19, 2025 that will close this issue
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant