diff --git a/src/Handler.php b/src/Handler.php index da54c45..502f91b 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -56,7 +56,7 @@ protected function broadcast(string $msg, bool $global = true): Generator * @param $socket * @return Generator */ - private function handshake($socket): Generator + protected function handshake($socket): Generator { yield Dispatcher::listenRead($socket); Logger::log('worker', $this->pid, 'handshake for ', (int)$socket); @@ -75,6 +75,16 @@ private function handshake($socket): Generator return yield; } + if (!$this->validateClient($headers, $socket)) { + Logger::log('worker', $this->pid, 'handshake for ' . (int)$socket . ' aborted'); + unset($this->clients[(int)$socket]); + $this->dispatcher->removeClient((int)$socket); + yield Dispatcher::async($this->write($socket, "HTTP:/1.1 403 Forbidden\r\n")); + fclose($socket); + + return yield; + } + $secKey = $headers['Sec-WebSocket-Key']; $secAccept = base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))); $response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . @@ -86,7 +96,7 @@ private function handshake($socket): Generator try { yield Dispatcher::async($this->write($socket, $response)); - yield Dispatcher::async($this->afterHandshake($headers, $socket)); + yield Dispatcher::async($this->afterHandshake($socket)); } catch (\Throwable $e) { return yield; } @@ -201,23 +211,15 @@ protected function validateClient(array $headers, $socket): bool /** * Process headers after handshake success * - * @param array $headers * @param $socket * @return Generator */ - private function afterHandshake(array $headers, $socket): Generator + protected function afterHandshake($socket): Generator { - if (!$this->validateClient($headers, $socket)) { - Logger::log('worker', $this->pid, 'handshake for ' . (int)$socket . ' aborted'); - unset($this->clients[(int)$socket]); - yield Dispatcher::listenRemove((int)$socket); - fclose($socket); - } else { - Logger::log('worker', $this->pid, 'connection accepted for', (int)$socket); - $this->clients[(int)$socket] = $socket; - yield Dispatcher::async($this->onConnect($socket)); - yield Dispatcher::async($this->read($socket)); - } + Logger::log('worker', $this->pid, 'connection accepted for', (int)$socket); + $this->clients[(int)$socket] = $socket; + yield Dispatcher::async($this->onConnect($socket)); + yield Dispatcher::async($this->read($socket)); } /** diff --git a/src/Server.php b/src/Server.php index 0460ada..1218876 100644 --- a/src/Server.php +++ b/src/Server.php @@ -21,6 +21,7 @@ class Server protected $cert; protected $passPhrase; protected $workerCount = 4; + protected $workerPids = []; protected $handler; public static $connector = '/var/run/wsc.sock'; @@ -34,7 +35,7 @@ class Server * @param bool $useConnector * @param bool $exchangeWorkersData */ - public function __construct(string $host, int $port, int $workerCount = 4, $useSSL = false, $useConnector = true, $exchangeWorkersData = true) + public function __construct(string $host, int $port, int $workerCount = 4, bool $useSSL = false, bool $useConnector = true, bool $exchangeWorkersData = true) { $this->host = $host; $this->port = $port; @@ -131,24 +132,31 @@ protected function spawn(): array return [$pid, $master, $workers]; } + /** + * @param $signal + */ + protected function terminate($signal) + { + Logger::log('master', posix_getpid(), 'Stopping workers...'); + + foreach ($this->workerPids as $pid => $worker) { + posix_kill($pid, $signal); + } + + Logger::log('master', posix_getpid(), 'Server stopped.'); + exit(0); + } + /** * Process system signals * - * @param $workers * @return void */ - protected function handleSignals(&$workers): void + protected function handleSignals(): void { foreach ([SIGTERM, SIGQUIT, SIGABRT, SIGINT] as $signal) { - pcntl_signal($signal, function ($signal) use ($workers) { - Logger::log('master', posix_getpid(), 'Stopping workers...'); - - foreach ($workers as $pid => $worker) { - posix_kill($pid, $signal); - } - - Logger::log('master', posix_getpid(), 'Server stopped.'); - exit(0); + pcntl_signal($signal, function ($signal) { + $this->terminate($signal); }); } } @@ -209,8 +217,8 @@ public function run(): void if ($pid) { fclose($this->socket); - - $this->handleSignals($workers); + $this->workerPids = $workers; + $this->handleSignals(); (new Master($workers, $this->unixConnector, $this->exchangeWorkersData))->dispatch(); } else {