From e61d4669dd4145bf8562b21a77ce9d1d26a0c47a Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 31 Jul 2018 22:39:49 +0300 Subject: [PATCH 1/2] Simple load balancer (only for 2 forks) Preventing delays on client socket connect --- composer.json | 6 ++++-- src/Handler.php | 44 +++++++++++++++++++++++++++++++++++++------- src/Server.php | 2 +- 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/composer.json b/composer.json index 8c0b2ce..3dda89a 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,9 @@ } ], "require": { - "php": ">=7.1" + "php": "^7.1", + "ext-posix": "^7.1", + "ext-pcntl": "^7.1" }, "require-dev": { "phpunit/phpunit": "^6.0" @@ -36,4 +38,4 @@ "WebSockets\\Tests\\": "tests/" } } -} \ No newline at end of file +} diff --git a/src/Handler.php b/src/Handler.php index 488690e..d218e9c 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -8,6 +8,9 @@ */ abstract class Handler { + protected $dispatcher; + protected $balancer = 'semaphore.balancer'; + protected $useBalancer; protected $server; protected $master; protected $pid = 0; @@ -18,12 +21,20 @@ abstract class Handler * * @param $server * @param $master + * @param bool $useBalancer */ - public function __construct($server, $master) + public function __construct($server, $master, bool $useBalancer = true) { + $this->dispatcher = new Dispatcher(); $this->server = $server; $this->master = $master; $this->pid = posix_getpid(); + + if ($useBalancer) { + $this->useBalancer = $useBalancer; + file_exists($this->balancer) or touch($this->balancer); + chmod($this->balancer, 0777); + } } /** @@ -102,10 +113,10 @@ protected function read($client): Generator switch ($data['opcode']) { case Frame::CLOSE: Logger::log('worker', $this->pid, 'close', (int)$client); - yield Dispatcher::async($this->onClose((int)$client)); - yield Dispatcher::listenRemove((int)$client); + $this->dispatcher->removeClient((int)$client); unset($this->clients[(int)$client]); fclose($client); + yield Dispatcher::async($this->onClose((int)$client)); break; case Frame::PING: Logger::log('worker', $this->pid, 'ping', (int)$client); @@ -153,6 +164,22 @@ protected function listerMaster(): Generator yield Dispatcher::async($this->listerMaster()); } + /** + * @return Generator + */ + protected function acceptSocket(): Generator + { + if ($client = @stream_socket_accept($this->server, -1)) { + Logger::log('worker', $this->pid, 'socket accepted'); + + if ($this->useBalancer) { + file_put_contents($this->balancer, $this->pid); + } + + yield Dispatcher::async($this->handshake($client)); + } + } + /** * Main socket listener * @@ -162,9 +189,12 @@ protected function listenSocket(): Generator { yield Dispatcher::listenRead($this->server); - if ($client = @stream_socket_accept($this->server)) { - Logger::log('worker', $this->pid, 'socket accepted'); - yield Dispatcher::async($this->handshake($client)); + if ($this->useBalancer) { + if (file_get_contents($this->balancer) != $this->pid) { + yield Dispatcher::async($this->acceptSocket()); + } + } else { + yield Dispatcher::async($this->acceptSocket()); } yield Dispatcher::async($this->listenSocket()); @@ -251,7 +281,7 @@ protected function onMasterMessage(string $message): Generator */ final public function handle(): void { - (new Dispatcher()) + $this->dispatcher ->add($this->listerMaster()) ->add($this->listenSocket()) ->dispatch(); diff --git a/src/Server.php b/src/Server.php index 0460ada..1a74e54 100644 --- a/src/Server.php +++ b/src/Server.php @@ -218,7 +218,7 @@ public function run(): void fclose($this->unixConnector); } - (new $this->handler($this->socket, $master))->handle(); + (new $this->handler($this->socket, $master, $this->workerCount > 1))->handle(); } } } \ No newline at end of file From 9b1d9d2218ee35852a535ab281dc240e3cc61786 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 2 Aug 2018 22:07:42 +0300 Subject: [PATCH 2/2] Implemented locker for concurrent socket acceptance (multiple workers) Preventing delays on client socket connect --- src/Handler.php | 33 +++++++++------------------------ src/Server.php | 2 +- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/src/Handler.php b/src/Handler.php index d218e9c..da54c45 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -9,8 +9,7 @@ abstract class Handler { protected $dispatcher; - protected $balancer = 'semaphore.balancer'; - protected $useBalancer; + protected $locker = 'semaphore.lock'; protected $server; protected $master; protected $pid = 0; @@ -21,20 +20,13 @@ abstract class Handler * * @param $server * @param $master - * @param bool $useBalancer */ - public function __construct($server, $master, bool $useBalancer = true) + public function __construct($server, $master) { $this->dispatcher = new Dispatcher(); $this->server = $server; $this->master = $master; $this->pid = posix_getpid(); - - if ($useBalancer) { - $this->useBalancer = $useBalancer; - file_exists($this->balancer) or touch($this->balancer); - chmod($this->balancer, 0777); - } } /** @@ -73,6 +65,7 @@ private function handshake($socket): Generator foreach ($lines as $line) { $line = rtrim($line); + if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) { $headers[$matches[1]] = $matches[2]; } @@ -84,7 +77,6 @@ private function handshake($socket): Generator $secKey = $headers['Sec-WebSocket-Key']; $secAccept = base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))); - $response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" . "Upgrade: websocket\r\n" . "Connection: Upgrade\r\n" . @@ -107,7 +99,6 @@ private function handshake($socket): Generator protected function read($client): Generator { yield Dispatcher::listenRead($client); - $data = Frame::decode($client); switch ($data['opcode']) { @@ -161,6 +152,7 @@ protected function listerMaster(): Generator Logger::log('worker', $this->pid, 'received text from master:', $data['payload']); yield Dispatcher::async($this->onMasterMessage($data['payload'])); } + yield Dispatcher::async($this->listerMaster()); } @@ -169,13 +161,8 @@ protected function listerMaster(): Generator */ protected function acceptSocket(): Generator { - if ($client = @stream_socket_accept($this->server, -1)) { + if ($client = @stream_socket_accept($this->server, 0, $peerName)) { Logger::log('worker', $this->pid, 'socket accepted'); - - if ($this->useBalancer) { - file_put_contents($this->balancer, $this->pid); - } - yield Dispatcher::async($this->handshake($client)); } } @@ -188,15 +175,14 @@ protected function acceptSocket(): Generator protected function listenSocket(): Generator { yield Dispatcher::listenRead($this->server); + $handle = fopen($this->locker, 'a'); - if ($this->useBalancer) { - if (file_get_contents($this->balancer) != $this->pid) { - yield Dispatcher::async($this->acceptSocket()); - } - } else { + if ($handle && flock($handle, LOCK_EX)) { yield Dispatcher::async($this->acceptSocket()); + flock($handle, LOCK_UN); } + fclose($handle); yield Dispatcher::async($this->listenSocket()); } @@ -229,7 +215,6 @@ private function afterHandshake(array $headers, $socket): Generator } else { Logger::log('worker', $this->pid, 'connection accepted for', (int)$socket); $this->clients[(int)$socket] = $socket; - yield Dispatcher::async($this->onConnect($socket)); yield Dispatcher::async($this->read($socket)); } diff --git a/src/Server.php b/src/Server.php index 1a74e54..0460ada 100644 --- a/src/Server.php +++ b/src/Server.php @@ -218,7 +218,7 @@ public function run(): void fclose($this->unixConnector); } - (new $this->handler($this->socket, $master, $this->workerCount > 1))->handle(); + (new $this->handler($this->socket, $master))->handle(); } } } \ No newline at end of file