Skip to content

Commit

Permalink
Merge pull request #10 from ollyxar/exp-load-balancing
Browse files Browse the repository at this point in the history
Load balancer (semaphore locker)
  • Loading branch information
alexslipknot authored Aug 2, 2018
2 parents 6d5836c + 9b1d9d2 commit 61fbaf1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
}
],
"require": {
"php": ">=7.1"
"php": "^7.1",
"ext-posix": "^7.1",
"ext-pcntl": "^7.1"
},
"require-dev": {
"phpunit/phpunit": "^6.0"
Expand All @@ -36,4 +38,4 @@
"WebSockets\\Tests\\": "tests/"
}
}
}
}
33 changes: 24 additions & 9 deletions src/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
abstract class Handler
{
protected $dispatcher;
protected $locker = 'semaphore.lock';
protected $server;
protected $master;
protected $pid = 0;
Expand All @@ -21,6 +23,7 @@ abstract class Handler
*/
public function __construct($server, $master)
{
$this->dispatcher = new Dispatcher();
$this->server = $server;
$this->master = $master;
$this->pid = posix_getpid();
Expand Down Expand Up @@ -62,6 +65,7 @@ private function handshake($socket): Generator

foreach ($lines as $line) {
$line = rtrim($line);

if (preg_match('/\A(\S+): (.*)\z/', $line, $matches)) {
$headers[$matches[1]] = $matches[2];
}
Expand All @@ -73,7 +77,6 @@ private function handshake($socket): Generator

$secKey = $headers['Sec-WebSocket-Key'];
$secAccept = base64_encode(pack('H*', sha1($secKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));

$response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
Expand All @@ -96,16 +99,15 @@ private function handshake($socket): Generator
protected function read($client): Generator
{
yield Dispatcher::listenRead($client);

$data = Frame::decode($client);

switch ($data['opcode']) {
case Frame::CLOSE:
Logger::log('worker', $this->pid, 'close', (int)$client);
yield Dispatcher::async($this->onClose((int)$client));
yield Dispatcher::listenRemove((int)$client);
$this->dispatcher->removeClient((int)$client);
unset($this->clients[(int)$client]);
fclose($client);
yield Dispatcher::async($this->onClose((int)$client));
break;
case Frame::PING:
Logger::log('worker', $this->pid, 'ping', (int)$client);
Expand Down Expand Up @@ -150,9 +152,21 @@ protected function listerMaster(): Generator
Logger::log('worker', $this->pid, 'received text from master:', $data['payload']);
yield Dispatcher::async($this->onMasterMessage($data['payload']));
}

yield Dispatcher::async($this->listerMaster());
}

/**
* @return Generator
*/
protected function acceptSocket(): Generator
{
if ($client = @stream_socket_accept($this->server, 0, $peerName)) {
Logger::log('worker', $this->pid, 'socket accepted');
yield Dispatcher::async($this->handshake($client));
}
}

/**
* Main socket listener
*
Expand All @@ -161,12 +175,14 @@ protected function listerMaster(): Generator
protected function listenSocket(): Generator
{
yield Dispatcher::listenRead($this->server);
$handle = fopen($this->locker, 'a');

if ($client = @stream_socket_accept($this->server)) {
Logger::log('worker', $this->pid, 'socket accepted');
yield Dispatcher::async($this->handshake($client));
if ($handle && flock($handle, LOCK_EX)) {
yield Dispatcher::async($this->acceptSocket());
flock($handle, LOCK_UN);
}

fclose($handle);
yield Dispatcher::async($this->listenSocket());
}

Expand Down Expand Up @@ -199,7 +215,6 @@ private function afterHandshake(array $headers, $socket): Generator
} else {
Logger::log('worker', $this->pid, 'connection accepted for', (int)$socket);
$this->clients[(int)$socket] = $socket;

yield Dispatcher::async($this->onConnect($socket));
yield Dispatcher::async($this->read($socket));
}
Expand Down Expand Up @@ -251,7 +266,7 @@ protected function onMasterMessage(string $message): Generator
*/
final public function handle(): void
{
(new Dispatcher())
$this->dispatcher
->add($this->listerMaster())
->add($this->listenSocket())
->dispatch();
Expand Down

0 comments on commit 61fbaf1

Please sign in to comment.