Skip to content

Commit

Permalink
Non-blocking IO (Generators approach)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexslipknot committed Nov 29, 2017
1 parent 7013693 commit 46c8809
Show file tree
Hide file tree
Showing 5 changed files with 477 additions and 84 deletions.
199 changes: 199 additions & 0 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
<?php namespace Ollyxar\WebSockets;

use Generator;
use SplQueue;

/**
* Class Dispatcher
* @package Ollyxar\WebSockets
*/
final class Dispatcher
{
/**
* @var SplQueue
*/
private $queue;

/**
* Sockets ready to read
*
* @var array
*/
private $read = [];

/**
* Sockets ready to write
*
* @var array
*/
private $write = [];

/**
* Dispatcher constructor.
*/
public function __construct()
{
$this->queue = new SplQueue();
}

/**
* Poll connections
*
* @param $timeout
* @return void
*/
private function poll($timeout): void
{
$read = $write = [];

foreach ($this->read as [$socket]) {
$read[] = $socket;
}

foreach ($this->write as [$socket]) {
$write[] = $socket;
}

if (!@stream_select($read, $write, $except, $timeout)) {
return;
}

foreach ($read as $socket) {
list(, $jobs) = $this->read[(int)$socket];
unset($this->read[(int)$socket]);

foreach ($jobs as $job) {
$this->enqueue($job);
}
}

foreach ($write as $socket) {
list(, $jobs) = $this->write[(int)$socket];
unset($this->write[(int)$socket]);

foreach ($jobs as $job) {
$this->enqueue($job);
}
}
}

/**
* @return Generator
*/
private function pollProcess(): Generator
{
while (true) {
if ($this->queue->isEmpty()) {
$this->poll(null);
} else {
$this->poll(0);
}
yield;
}
}

/**
* @param Job $job
* @return void
*/
public function enqueue(Job $job): void
{
$this->queue->enqueue($job);
}

/**
* @param $socket
* @param Job $job
* @return void
*/
public function appendRead($socket, Job $job): void
{
if (isset($this->read[(int)$socket])) {
$this->read[(int)$socket][1][] = $job;
} else {
$this->read[(int)$socket] = [$socket, [$job]];
}
}

/**
* @param $socket
* @param Job $job
* @return void
*/
public function appendWrite($socket, Job $job): void
{
if (isset($this->write[(int)$socket])) {
$this->write[(int)$socket][1][] = $job;
} else {
$this->write[(int)$socket] = [$socket, [$job]];
}
}

/**
* @param Generator $process
* @return Dispatcher
*/
public function add(Generator $process): self
{
$this->enqueue(new Job($process));

return $this;
}

public static function make(Generator $process) {
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($process) {
$job->value($dispatcher->add($process));
$dispatcher->enqueue($job);
}
);
}

/**
* @param $socket
* @return SysCall
*/
public static function listenRead($socket)
{
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($socket) {
$dispatcher->appendRead($socket, $job);
}
);
}

/**
* @param $socket
* @return SysCall
*/
public static function listenWrite($socket)
{
return new SysCall(
function(Job $job, Dispatcher $dispatcher) use ($socket) {
$dispatcher->appendWrite($socket, $job);
}
);
}

/**
* @return void
*/
public function dispatch(): void
{
$this->add($this->pollProcess());

while (!$this->queue->isEmpty()) {
$job = $this->queue->dequeue();
$result = $job->run();

if ($result instanceof SysCall) {
$result($job, $this);
continue;
}

if (!$job->finished()) {
$this->enqueue($job);
}
}
}
}
67 changes: 67 additions & 0 deletions src/Job.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php namespace Ollyxar\WebSockets;

use Generator;

/**
* Class Job
* @package Ollyxar\WebSockets
*/
final class Job
{
/**
* @var Generator
*/
protected $process;

/**
* @var mixed
*/
protected $sendValue = null;

/**
* @var bool
*/
protected $init = true;

/**
* Job constructor.
* @param Generator $process
*/
public function __construct(Generator $process)
{
$this->process = $process;
}

/**
* @param mixed $sendValue
*/
public function value($sendValue)
{
$this->sendValue = $sendValue;
}

/**
* @return mixed
*/
public function run()
{
if ($this->init) {
$this->init = false;

return $this->process->current();
} else {
$result = $this->process->send($this->sendValue);
$this->sendValue = null;

return $result;
}
}

/**
* @return bool
*/
public function finished(): bool
{
return !$this->process->valid();
}
}
82 changes: 65 additions & 17 deletions src/Master.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
<?php namespace Ollyxar\WebSockets;

use Generator;

/**
* Class Master
* @package Ollyxar\WebSockets
*/
class Master
final class Master
{
private $workers = [];
private $connector;
Expand All @@ -22,35 +24,81 @@ public function __construct($workers, $connector)
}

/**
* Dispatch messaging
*
* @return void
* @param $client
* @param $data
* @return Generator
*/
public function dispatch(): void
protected function write($client, $data): Generator
{
while (true) {
$read = $this->workers;
$read[] = $this->connector;
yield Dispatcher::listenWrite($client);
fwrite($client, $data);
}

@stream_select($read, $write, $except, null);
protected function listenWorker($socket): Generator
{
yield Dispatcher::listenRead($socket);
yield Dispatcher::listenWrite($socket);

foreach ($read as $client) {
if ($client === $this->connector) {
$client = @stream_socket_accept($client);
}
$data = Frame::decode($socket);

if (!$data['opcode']) {
return yield;
}

stream_select($read, $this->workers, $except, 0);

foreach ($this->workers as $worker) {
if ($worker !== $socket) {
dump('sending to worker # '.(int)$worker);
yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode'])));
}
}
}

/**
* @return Generator
*/
protected function listenWorkers(): Generator
{
while (true) {
foreach ($this->workers as $worker) {
yield Dispatcher::make($this->listenWorker($worker));
}
}
}

/**
* @return Generator
*/
protected function listenConnector(): Generator
{
while (true) {
yield Dispatcher::listenRead($this->connector);

$data = Frame::decode($client);
if ($socket = @stream_socket_accept($this->connector)) {
$data = Frame::decode($socket);

if (!$data['opcode']) {
continue;
}

foreach ($this->workers as $worker) {
if ($worker !== $client) {
@fwrite($worker, Frame::encode($data['payload'], $data['opcode']));
}
yield Dispatcher::make($this->write($worker, Frame::encode($data['payload'], $data['opcode'])));
}
}
}
}

/**
* Dispatch messaging
*
* @return void
*/
public function dispatch(): void
{
(new Dispatcher())
->add($this->listenConnector())
//->add($this->listenWorkers())
->dispatch();
}
}
Loading

0 comments on commit 46c8809

Please sign in to comment.