From 10e125f79cf1a3cb95ecb327f1482802cbcc3f13 Mon Sep 17 00:00:00 2001 From: svyrydenko Date: Wed, 29 Nov 2017 18:06:25 +0200 Subject: [PATCH] All existing functionality moved to Generators --- README.md | 27 +++++++++++++------------ src/Dispatcher.php | 21 ++++++++++++-------- src/Master.php | 49 +++++++++++++++++++++++++++------------------- src/Server.php | 3 ++- src/Worker.php | 3 ++- 5 files changed, 61 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 6916ca8..34dab0d 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,12 @@ require 'vendor/autoload.php'; ## Simple WebSocket server ```php +use Generator; +use Ollyxar\LaravelAuth\FileAuth; use Ollyxar\WebSockets\{ - Server as WServer, - Ssl as Wsl, - Worker as Handler, - Frame as WFrame + Frame, + Worker, + Dispatcher }; class MyHandler extends Handler @@ -60,30 +61,32 @@ class MyHandler extends Handler /** * @param $client + * @return Generator */ - protected function onConnect($client): void + protected function onConnect($client): Generator { - $this->sendToAll(WFrame::encode(json_encode([ + yield Dispatcher::make($this->sendToAll(WFrame::encode(json_encode([ 'type' => 'system', 'message' => 'User {' . (int)$client . '} Connected.' - ]))); + ])))); } /** * @param $clientNumber */ - protected function onClose($clientNumber): void + protected function onClose($clientNumber): Generator { - $this->sendToAll(WFrame::encode(json_encode([ + yield Dispatcher::make($this->sendToAll(WFrame::encode(json_encode([ 'type' => 'system', 'message' => "User {$clientNumber} disconnected." - ]))); + ])))); } /** * @param string $message + * @return Generator */ - protected function onDirectMessage(string $message): void + protected function onDirectMessage(string $message): Generator { $message = json_decode($message); $userName = $message->name; @@ -95,7 +98,7 @@ class MyHandler extends Handler 'message' => $userMessage ])); - $this->sendToAll($response); + yield Dispatcher::make($this->sendToAll($response)); } } diff --git a/src/Dispatcher.php b/src/Dispatcher.php index ec48be2..84e6838 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -59,7 +59,7 @@ private function poll($timeout): void } foreach ($read as $socket) { - list(, $jobs) = $this->read[(int)$socket]; + $jobs = $this->read[(int)$socket][1]; unset($this->read[(int)$socket]); foreach ($jobs as $job) { @@ -68,7 +68,7 @@ private function poll($timeout): void } foreach ($write as $socket) { - list(, $jobs) = $this->write[(int)$socket]; + $jobs = $this->write[(int)$socket][1]; unset($this->write[(int)$socket]); foreach ($jobs as $job) { @@ -140,9 +140,14 @@ public function add(Generator $process): self return $this; } - public static function make(Generator $process) { + /** + * @param Generator $process + * @return SysCall + */ + public static function make(Generator $process): SysCall + { return new SysCall( - function(Job $job, Dispatcher $dispatcher) use ($process) { + function (Job $job, Dispatcher $dispatcher) use ($process) { $job->value($dispatcher->add($process)); $dispatcher->enqueue($job); } @@ -153,10 +158,10 @@ function(Job $job, Dispatcher $dispatcher) use ($process) { * @param $socket * @return SysCall */ - public static function listenRead($socket) + public static function listenRead($socket): SysCall { return new SysCall( - function(Job $job, Dispatcher $dispatcher) use ($socket) { + function (Job $job, Dispatcher $dispatcher) use ($socket) { $dispatcher->appendRead($socket, $job); } ); @@ -166,10 +171,10 @@ function(Job $job, Dispatcher $dispatcher) use ($socket) { * @param $socket * @return SysCall */ - public static function listenWrite($socket) + public static function listenWrite($socket): SysCall { return new SysCall( - function(Job $job, Dispatcher $dispatcher) use ($socket) { + function (Job $job, Dispatcher $dispatcher) use ($socket) { $dispatcher->appendWrite($socket, $job); } ); diff --git a/src/Master.php b/src/Master.php index 18a6c19..a996d24 100644 --- a/src/Master.php +++ b/src/Master.php @@ -8,7 +8,14 @@ */ final class Master { + /** + * @var array + */ private $workers = []; + + /** + * Unix socket connector + */ private $connector; /** @@ -25,34 +32,37 @@ public function __construct($workers, $connector) /** * @param $client - * @param $data * @return Generator */ - protected function write($client, $data): Generator + protected function read($client): Generator { - yield Dispatcher::listenWrite($client); - fwrite($client, $data); - } + yield Dispatcher::listenRead($client); - protected function listenWorker($socket): Generator - { - yield Dispatcher::listenRead($socket); - yield Dispatcher::listenWrite($socket); - - $data = Frame::decode($socket); + $data = Frame::decode($client); if (!$data['opcode']) { return yield; } - stream_select($read, $this->workers, $except, 0); - foreach ($this->workers as $worker) { - if ($worker !== $socket) { - dump('sending to worker # '.(int)$worker); + if ($worker !== $client) { + yield Dispatcher::listenWrite($worker); yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode']))); } } + + yield Dispatcher::make($this->read($client)); + } + + /** + * @param $client + * @param $data + * @return Generator + */ + protected function write($client, $data): Generator + { + yield Dispatcher::listenWrite($client); + fwrite($client, $data); } /** @@ -60,10 +70,8 @@ protected function listenWorker($socket): Generator */ protected function listenWorkers(): Generator { - while (true) { - foreach ($this->workers as $worker) { - yield Dispatcher::make($this->listenWorker($worker)); - } + foreach ($this->workers as $worker) { + yield Dispatcher::make($this->read($worker)); } } @@ -83,6 +91,7 @@ protected function listenConnector(): Generator } foreach ($this->workers as $worker) { + yield Dispatcher::listenWrite($worker); yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode']))); } } @@ -98,7 +107,7 @@ public function dispatch(): void { (new Dispatcher()) ->add($this->listenConnector()) - //->add($this->listenWorkers()) + ->add($this->listenWorkers()) ->dispatch(); } } diff --git a/src/Server.php b/src/Server.php index 969bfda..f09859e 100644 --- a/src/Server.php +++ b/src/Server.php @@ -1,6 +1,6 @@ onClose((int)$client)); unset($this->clients[(int)$client]); + fclose($client); break; } case Frame::PING: { @@ -138,7 +139,7 @@ protected function accept($client): Generator fclose($client); } else { $this->clients[(int)$client] = $client; - $this->onConnect($client); + yield Dispatcher::make($this->onConnect($client)); yield Dispatcher::make($this->read($client)); }