Skip to content

Commit

Permalink
All existing functionality moved to Generators
Browse files Browse the repository at this point in the history
  • Loading branch information
svyrydenko committed Nov 29, 2017
1 parent 46c8809 commit 10e125f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 42 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ require 'vendor/autoload.php';
## Simple WebSocket server

```php
use Generator;
use Ollyxar\LaravelAuth\FileAuth;
use Ollyxar\WebSockets\{
Server as WServer,
Ssl as Wsl,
Worker as Handler,
Frame as WFrame
Frame,
Worker,
Dispatcher
};

class MyHandler extends Handler
Expand All @@ -60,30 +61,32 @@ class MyHandler extends Handler

/**
* @param $client
* @return Generator
*/
protected function onConnect($client): void
protected function onConnect($client): Generator
{
$this->sendToAll(WFrame::encode(json_encode([
yield Dispatcher::make($this->sendToAll(WFrame::encode(json_encode([
'type' => 'system',
'message' => 'User {' . (int)$client . '} Connected.'
])));
]))));
}

/**
* @param $clientNumber
*/
protected function onClose($clientNumber): void
protected function onClose($clientNumber): Generator
{
$this->sendToAll(WFrame::encode(json_encode([
yield Dispatcher::make($this->sendToAll(WFrame::encode(json_encode([
'type' => 'system',
'message' => "User {$clientNumber} disconnected."
])));
]))));
}

/**
* @param string $message
* @return Generator
*/
protected function onDirectMessage(string $message): void
protected function onDirectMessage(string $message): Generator
{
$message = json_decode($message);
$userName = $message->name;
Expand All @@ -95,7 +98,7 @@ class MyHandler extends Handler
'message' => $userMessage
]));

$this->sendToAll($response);
yield Dispatcher::make($this->sendToAll($response));
}
}

Expand Down
21 changes: 13 additions & 8 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private function poll($timeout): void
}

foreach ($read as $socket) {
list(, $jobs) = $this->read[(int)$socket];
$jobs = $this->read[(int)$socket][1];
unset($this->read[(int)$socket]);

foreach ($jobs as $job) {
Expand All @@ -68,7 +68,7 @@ private function poll($timeout): void
}

foreach ($write as $socket) {
list(, $jobs) = $this->write[(int)$socket];
$jobs = $this->write[(int)$socket][1];
unset($this->write[(int)$socket]);

foreach ($jobs as $job) {
Expand Down Expand Up @@ -140,9 +140,14 @@ public function add(Generator $process): self
return $this;
}

public static function make(Generator $process) {
/**
* @param Generator $process
* @return SysCall
*/
public static function make(Generator $process): SysCall
{
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($process) {
function (Job $job, Dispatcher $dispatcher) use ($process) {
$job->value($dispatcher->add($process));
$dispatcher->enqueue($job);
}
Expand All @@ -153,10 +158,10 @@ function(Job $job, Dispatcher $dispatcher) use ($process) {
* @param $socket
* @return SysCall
*/
public static function listenRead($socket)
public static function listenRead($socket): SysCall
{
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($socket) {
function (Job $job, Dispatcher $dispatcher) use ($socket) {
$dispatcher->appendRead($socket, $job);
}
);
Expand All @@ -166,10 +171,10 @@ function(Job $job, Dispatcher $dispatcher) use ($socket) {
* @param $socket
* @return SysCall
*/
public static function listenWrite($socket)
public static function listenWrite($socket): SysCall
{
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($socket) {
function (Job $job, Dispatcher $dispatcher) use ($socket) {
$dispatcher->appendWrite($socket, $job);
}
);
Expand Down
49 changes: 29 additions & 20 deletions src/Master.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
*/
final class Master
{
/**
* @var array
*/
private $workers = [];

/**
* Unix socket connector
*/
private $connector;

/**
Expand All @@ -25,45 +32,46 @@ public function __construct($workers, $connector)

/**
* @param $client
* @param $data
* @return Generator
*/
protected function write($client, $data): Generator
protected function read($client): Generator
{
yield Dispatcher::listenWrite($client);
fwrite($client, $data);
}
yield Dispatcher::listenRead($client);

protected function listenWorker($socket): Generator
{
yield Dispatcher::listenRead($socket);
yield Dispatcher::listenWrite($socket);

$data = Frame::decode($socket);
$data = Frame::decode($client);

if (!$data['opcode']) {
return yield;
}

stream_select($read, $this->workers, $except, 0);

foreach ($this->workers as $worker) {
if ($worker !== $socket) {
dump('sending to worker # '.(int)$worker);
if ($worker !== $client) {
yield Dispatcher::listenWrite($worker);
yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode'])));
}
}

yield Dispatcher::make($this->read($client));
}

/**
* @param $client
* @param $data
* @return Generator
*/
protected function write($client, $data): Generator
{
yield Dispatcher::listenWrite($client);
fwrite($client, $data);
}

/**
* @return Generator
*/
protected function listenWorkers(): Generator
{
while (true) {
foreach ($this->workers as $worker) {
yield Dispatcher::make($this->listenWorker($worker));
}
foreach ($this->workers as $worker) {
yield Dispatcher::make($this->read($worker));
}
}

Expand All @@ -83,6 +91,7 @@ protected function listenConnector(): Generator
}

foreach ($this->workers as $worker) {
yield Dispatcher::listenWrite($worker);
yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode'])));
}
}
Expand All @@ -98,7 +107,7 @@ public function dispatch(): void
{
(new Dispatcher())
->add($this->listenConnector())
//->add($this->listenWorkers())
->add($this->listenWorkers())
->dispatch();
}
}
3 changes: 2 additions & 1 deletion src/Server.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php namespace Ollyxar\WebSockets;

use \Exception;
use Exception;

/**
* Class Server
Expand Down Expand Up @@ -146,6 +146,7 @@ public function setPassPhrase(string $passPhrase = 'abracadabra'): self
* Launching server
*
* @return void
* @throws Exception
*/
public function run(): void
{
Expand Down
3 changes: 2 additions & 1 deletion src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected function read($client): Generator
case Frame::CLOSE: {
yield Dispatcher::make($this->onClose((int)$client));
unset($this->clients[(int)$client]);
fclose($client);
break;
}
case Frame::PING: {
Expand Down Expand Up @@ -138,7 +139,7 @@ protected function accept($client): Generator
fclose($client);
} else {
$this->clients[(int)$client] = $client;
$this->onConnect($client);
yield Dispatcher::make($this->onConnect($client));

yield Dispatcher::make($this->read($client));
}
Expand Down

0 comments on commit 10e125f

Please sign in to comment.