diff --git a/src/Dispatcher.php b/src/Dispatcher.php new file mode 100644 index 0000000..ec48be2 --- /dev/null +++ b/src/Dispatcher.php @@ -0,0 +1,199 @@ +queue = new SplQueue(); + } + + /** + * Poll connections + * + * @param $timeout + * @return void + */ + private function poll($timeout): void + { + $read = $write = []; + + foreach ($this->read as [$socket]) { + $read[] = $socket; + } + + foreach ($this->write as [$socket]) { + $write[] = $socket; + } + + if (!@stream_select($read, $write, $except, $timeout)) { + return; + } + + foreach ($read as $socket) { + list(, $jobs) = $this->read[(int)$socket]; + unset($this->read[(int)$socket]); + + foreach ($jobs as $job) { + $this->enqueue($job); + } + } + + foreach ($write as $socket) { + list(, $jobs) = $this->write[(int)$socket]; + unset($this->write[(int)$socket]); + + foreach ($jobs as $job) { + $this->enqueue($job); + } + } + } + + /** + * @return Generator + */ + private function pollProcess(): Generator + { + while (true) { + if ($this->queue->isEmpty()) { + $this->poll(null); + } else { + $this->poll(0); + } + yield; + } + } + + /** + * @param Job $job + * @return void + */ + public function enqueue(Job $job): void + { + $this->queue->enqueue($job); + } + + /** + * @param $socket + * @param Job $job + * @return void + */ + public function appendRead($socket, Job $job): void + { + if (isset($this->read[(int)$socket])) { + $this->read[(int)$socket][1][] = $job; + } else { + $this->read[(int)$socket] = [$socket, [$job]]; + } + } + + /** + * @param $socket + * @param Job $job + * @return void + */ + public function appendWrite($socket, Job $job): void + { + if (isset($this->write[(int)$socket])) { + $this->write[(int)$socket][1][] = $job; + } else { + $this->write[(int)$socket] = [$socket, [$job]]; + } + } + + /** + * @param Generator $process + * @return Dispatcher + */ + public function add(Generator $process): self + { + $this->enqueue(new Job($process)); + + return $this; + } + + public static function make(Generator $process) { + return new SysCall( + function(Job $job, Dispatcher $dispatcher) use ($process) { + $job->value($dispatcher->add($process)); + $dispatcher->enqueue($job); + } + ); + } + + /** + * @param $socket + * @return SysCall + */ + public static function listenRead($socket) + { + return new SysCall( + function(Job $job, Dispatcher $dispatcher) use ($socket) { + $dispatcher->appendRead($socket, $job); + } + ); + } + + /** + * @param $socket + * @return SysCall + */ + public static function listenWrite($socket) + { + return new SysCall( + function(Job $job, Dispatcher $dispatcher) use ($socket) { + $dispatcher->appendWrite($socket, $job); + } + ); + } + + /** + * @return void + */ + public function dispatch(): void + { + $this->add($this->pollProcess()); + + while (!$this->queue->isEmpty()) { + $job = $this->queue->dequeue(); + $result = $job->run(); + + if ($result instanceof SysCall) { + $result($job, $this); + continue; + } + + if (!$job->finished()) { + $this->enqueue($job); + } + } + } +} \ No newline at end of file diff --git a/src/Job.php b/src/Job.php new file mode 100644 index 0000000..f403c72 --- /dev/null +++ b/src/Job.php @@ -0,0 +1,67 @@ +process = $process; + } + + /** + * @param mixed $sendValue + */ + public function value($sendValue) + { + $this->sendValue = $sendValue; + } + + /** + * @return mixed + */ + public function run() + { + if ($this->init) { + $this->init = false; + + return $this->process->current(); + } else { + $result = $this->process->send($this->sendValue); + $this->sendValue = null; + + return $result; + } + } + + /** + * @return bool + */ + public function finished(): bool + { + return !$this->process->valid(); + } +} \ No newline at end of file diff --git a/src/Master.php b/src/Master.php index 1d444ab..18a6c19 100644 --- a/src/Master.php +++ b/src/Master.php @@ -1,10 +1,12 @@ workers; - $read[] = $this->connector; + yield Dispatcher::listenWrite($client); + fwrite($client, $data); + } - @stream_select($read, $write, $except, null); + protected function listenWorker($socket): Generator + { + yield Dispatcher::listenRead($socket); + yield Dispatcher::listenWrite($socket); - foreach ($read as $client) { - if ($client === $this->connector) { - $client = @stream_socket_accept($client); - } + $data = Frame::decode($socket); + + if (!$data['opcode']) { + return yield; + } + + stream_select($read, $this->workers, $except, 0); + + foreach ($this->workers as $worker) { + if ($worker !== $socket) { + dump('sending to worker # '.(int)$worker); + yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode']))); + } + } + } + + /** + * @return Generator + */ + protected function listenWorkers(): Generator + { + while (true) { + foreach ($this->workers as $worker) { + yield Dispatcher::make($this->listenWorker($worker)); + } + } + } + + /** + * @return Generator + */ + protected function listenConnector(): Generator + { + while (true) { + yield Dispatcher::listenRead($this->connector); - $data = Frame::decode($client); + if ($socket = @stream_socket_accept($this->connector)) { + $data = Frame::decode($socket); if (!$data['opcode']) { continue; } foreach ($this->workers as $worker) { - if ($worker !== $client) { - @fwrite($worker, Frame::encode($data['payload'], $data['opcode'])); - } + yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode']))); } } } } + + /** + * Dispatch messaging + * + * @return void + */ + public function dispatch(): void + { + (new Dispatcher()) + ->add($this->listenConnector()) + //->add($this->listenWorkers()) + ->dispatch(); + } } diff --git a/src/SysCall.php b/src/SysCall.php new file mode 100644 index 0000000..ea6fb40 --- /dev/null +++ b/src/SysCall.php @@ -0,0 +1,32 @@ +callback = $callback; + } + + /** + * @param Job $job + * @param Dispatcher $dispatcher + * @return mixed + */ + public function __invoke(Job $job, Dispatcher $dispatcher) + { + return call_user_func($this->callback, $job, $dispatcher); + } +} \ No newline at end of file diff --git a/src/Worker.php b/src/Worker.php index 1d47aa7..8985f2b 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -1,6 +1,7 @@ clients) && stream_select($read, $this->clients, $except, 0)) { - foreach ($this->clients as $client) { - @fwrite($client, $msg); - } + foreach ($this->clients as $client) { + yield Dispatcher::make($this->write($client, $msg)); } if ($global) { - fwrite($this->master, $msg); + yield Dispatcher::make($this->write($this->master, $msg)); } } @@ -84,6 +83,98 @@ private function handshake($socket): bool } } + /** + * @param $client + * @return Generator + */ + protected function read($client): Generator + { + yield Dispatcher::listenRead($client); + yield Dispatcher::listenWrite($client); + + $data = Frame::decode($client); + + switch ($data['opcode']) { + case Frame::CLOSE: { + yield Dispatcher::make($this->onClose((int)$client)); + unset($this->clients[(int)$client]); + break; + } + case Frame::PING: { + yield Dispatcher::make($this->write($client, Frame::encode('', Frame::PONG))); + break; + } + case Frame::TEXT: { + yield Dispatcher::make($this->onDirectMessage($data['payload'], (int)$client)); + break; + } + } + + yield Dispatcher::make($this->read($client)); + } + + /** + * @param $client + * @param $data + * @return Generator + */ + protected function write($client, $data): Generator + { + yield Dispatcher::listenWrite($client); + @fwrite($client, $data); + } + + /** + * @param $client + * @return Generator + */ + protected function accept($client): Generator + { + yield Dispatcher::listenRead($client); + yield Dispatcher::listenWrite($client); + + if (!$this->handshake($client)) { + unset($this->clients[(int)$client]); + fclose($client); + } else { + $this->clients[(int)$client] = $client; + $this->onConnect($client); + + yield Dispatcher::make($this->read($client)); + } + } + + /** + * @return Generator + */ + protected function listerMaster(): Generator + { + while (true) { + yield Dispatcher::listenRead($this->master); + $data = Frame::decode($this->master); + + if ($data['opcode'] == Frame::TEXT) { + yield Dispatcher::make($this->onFilteredMessage($data['payload'])); + } + } + } + + /** + * Main socket listener + * + * @return Generator + */ + protected function listenSocket(): Generator + { + while (true) { + yield Dispatcher::listenRead($this->server); + + if ($client = @stream_socket_accept($this->server)) { + yield Dispatcher::make($this->accept($client)); + } + } + } + /** * Process headers after handshake success * @@ -100,17 +191,17 @@ protected function afterHandshake(array $headers, $socket): bool * Called when user successfully connected * * @param $client - * @return void + * @return Generator */ - abstract protected function onConnect($client): void; + abstract protected function onConnect($client): Generator; /** * Called when user disconnected gracefully * * @param $clientNumber - * @return void + * @return Generator */ - abstract protected function onClose($clientNumber): void; + abstract protected function onClose($clientNumber): Generator; /** * This method called when user directly (from the browser) send a message @@ -119,11 +210,11 @@ abstract protected function onClose($clientNumber): void; * * @param string $message * @param int $socketId - * @return void + * @return Generator */ - protected function onDirectMessage(string $message, int $socketId): void + protected function onDirectMessage(string $message, int $socketId): Generator { - $this->sendToAll(Frame::encode($message)); + yield Dispatcher::make($this->sendToAll(Frame::encode($message))); } /** @@ -131,11 +222,11 @@ protected function onDirectMessage(string $message, int $socketId): void * from Unix connector * * @param string $message - * @return void + * @return Generator */ - protected function onFilteredMessage(string $message): void + protected function onFilteredMessage(string $message): Generator { - $this->sendToAll(Frame::encode($message), false); + yield Dispatcher::make($this->sendToAll(Frame::encode($message), false)); } /** @@ -143,55 +234,11 @@ protected function onFilteredMessage(string $message): void * * @return void */ - public function handle(): void + final public function handle(): void { - while (true) { - $read = $this->clients; - $read[] = $this->server; - $read[] = $this->master; - - @stream_select($read, $write, $except, null); - - if (in_array($this->server, $read)) { - if ($client = @stream_socket_accept($this->server)) { - $this->clients[(int)$client] = $client; - if (!$this->handshake($client)) { - unset($this->clients[(int)$client]); - fclose($client); - } else { - $this->onConnect($client); - } - } - - unset($read[array_search($this->server, $read)]); - } - - if (in_array($this->master, $read)) { - $data = Frame::decode($this->master); - - if ($data['opcode'] == Frame::TEXT) { - $this->onFilteredMessage($data['payload']); - } - - unset($read[array_search($this->master, $read)]); - } - - foreach ($read as $changedSocket) { - $data = Frame::decode($changedSocket); - - if ($data['opcode'] == Frame::CLOSE) { - $this->onClose((int)$changedSocket); - unset($this->clients[(int)$changedSocket]); - } - - if ($data['opcode'] == Frame::PING) { - @fwrite($changedSocket, Frame::encode('', Frame::PONG)); - } - - if ($data['opcode'] == Frame::TEXT) { - $this->onDirectMessage($data['payload'], (int)$changedSocket); - } - } - } + (new Dispatcher()) + ->add($this->listerMaster()) + ->add($this->listenSocket()) + ->dispatch(); } }