diff --git a/composer.json b/composer.json index 8c0b2ce..3dda89a 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,9 @@ } ], "require": { - "php": ">=7.1" + "php": "^7.1", + "ext-posix": "^7.1", + "ext-pcntl": "^7.1" }, "require-dev": { "phpunit/phpunit": "^6.0" @@ -36,4 +38,4 @@ "WebSockets\\Tests\\": "tests/" } } -} \ No newline at end of file +} diff --git a/src/Handler.php b/src/Handler.php index 488690e..da54c45 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -8,6 +8,8 @@ */ abstract class Handler { + protected $dispatcher; + protected $locker = 'semaphore.lock'; protected $server; protected $master; protected $pid = 0; @@ -21,6 +23,7 @@ abstract class Handler */ public function __construct($server, $master) { + $this->dispatcher = new Dispatcher(); $this->server = $server; $this->master = $master; $this->pid = posix_getpid(); @@ -62,6 +65,7 @@ private function handshake($socket): Generator foreach ($lines as $line) { $line = rtrim($line); + if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) { $headers[$matches[1]] = $matches[2]; } @@ -73,7 +77,6 @@ private function handshake($socket): Generator $secKey = $headers['Sec-WebSocket-Key']; $secAccept = base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))); - $response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . @@ -96,16 +99,15 @@ private function handshake($socket): Generator protected function read($client): Generator { yield Dispatcher::listenRead($client); - $data = Frame::decode($client); switch ($data['opcode']) { case Frame::CLOSE: Logger::log('worker', $this->pid, 'close', (int)$client); - yield Dispatcher::async($this->onClose((int)$client)); - yield Dispatcher::listenRemove((int)$client); + $this->dispatcher->removeClient((int)$client); unset($this->clients[(int)$client]); fclose($client); + yield Dispatcher::async($this->onClose((int)$client)); break; case Frame::PING: Logger::log('worker', $this->pid, 'ping', (int)$client); @@ -150,9 +152,21 @@ protected function listerMaster(): Generator Logger::log('worker', $this->pid, 'received text from master:', $data['payload']); yield Dispatcher::async($this->onMasterMessage($data['payload'])); } + yield Dispatcher::async($this->listerMaster()); } + /** + * @return Generator + */ + protected function acceptSocket(): Generator + { + if ($client = @stream_socket_accept($this->server, 0, $peerName)) { + Logger::log('worker', $this->pid, 'socket accepted'); + yield Dispatcher::async($this->handshake($client)); + } + } + /** * Main socket listener * @@ -161,12 +175,14 @@ protected function listerMaster(): Generator protected function listenSocket(): Generator { yield Dispatcher::listenRead($this->server); + $handle = fopen($this->locker, 'a'); - if ($client = @stream_socket_accept($this->server)) { - Logger::log('worker', $this->pid, 'socket accepted'); - yield Dispatcher::async($this->handshake($client)); + if ($handle && flock($handle, LOCK_EX)) { + yield Dispatcher::async($this->acceptSocket()); + flock($handle, LOCK_UN); } + fclose($handle); yield Dispatcher::async($this->listenSocket()); } @@ -199,7 +215,6 @@ private function afterHandshake(array $headers, $socket): Generator } else { Logger::log('worker', $this->pid, 'connection accepted for', (int)$socket); $this->clients[(int)$socket] = $socket; - yield Dispatcher::async($this->onConnect($socket)); yield Dispatcher::async($this->read($socket)); } @@ -251,7 +266,7 @@ protected function onMasterMessage(string $message): Generator */ final public function handle(): void { - (new Dispatcher()) + $this->dispatcher ->add($this->listerMaster()) ->add($this->listenSocket()) ->dispatch();