Skip to content

Commit

Permalink
Merge pull request #30 from roadrunner-php/ping-pong-stream
Browse files Browse the repository at this point in the history
Implement Ping-Pong strategy for worker with blocking relay
  • Loading branch information
roxblnfk authored Oct 3, 2023
2 parents 97f966a + 075ddc6 commit 5c22fab
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 7 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"ext-json": "*",
"ext-sockets": "*",
"psr/log": "^2.0|^3.0",
"spiral/goridge": "^4.0",
"spiral/goridge": "^4.1.0",
"spiral/roadrunner": "^2023.1",
"composer-runtime-api": "^2.0"
},
Expand Down
15 changes: 15 additions & 0 deletions src/Message/Command/Pong.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\Message\Command;

use Spiral\RoadRunner\Message\SkipMessage;
use Spiral\RoadRunner\Payload;

/**
* @psalm-immutable
*/
final class Pong extends Payload implements SkipMessage
{
}
5 changes: 5 additions & 0 deletions src/PayloadFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Spiral\Goridge\Frame;
use Spiral\RoadRunner\Exception\RoadRunnerException;
use Spiral\RoadRunner\Message\Command\GetProcessId;
use Spiral\RoadRunner\Message\Command\Pong;
use Spiral\RoadRunner\Message\Command\StreamStop;
use Spiral\RoadRunner\Message\Command\WorkerStop;

Expand All @@ -26,6 +27,10 @@ public static function fromFrame(Frame $frame): Payload
return new StreamStop($payload);
}

if (($frame->byte10 & Frame::BYTE10_PONG) !== 0) {
return new Pong($payload);
}

return new Payload(
\substr($payload, $frame->options[0]),
\substr($payload, 0, $frame->options[0]),
Expand Down
10 changes: 10 additions & 0 deletions src/StreamWorkerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner;

interface StreamWorkerInterface extends WorkerInterface
{
public function withStreamMode(): static;
}
75 changes: 69 additions & 6 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Spiral\RoadRunner;

use Psr\Log\LoggerInterface;
use Spiral\Goridge\BlockingRelayInterface;
use Spiral\Goridge\Exception\GoridgeException;
use Spiral\Goridge\Exception\TransportException;
use Spiral\Goridge\Frame;
Expand All @@ -13,6 +14,7 @@
use Spiral\RoadRunner\Exception\RoadRunnerException;
use Spiral\RoadRunner\Internal\StdoutHandler;
use Spiral\RoadRunner\Message\Command\GetProcessId;
use Spiral\RoadRunner\Message\Command\Pong;
use Spiral\RoadRunner\Message\Command\WorkerStop;
use Spiral\RoadRunner\Message\SkipMessage;

Expand All @@ -27,13 +29,19 @@
* }
* </code>
*/
class Worker implements WorkerInterface
class Worker implements StreamWorkerInterface
{
private const JSON_ENCODE_FLAGS = \JSON_THROW_ON_ERROR | \JSON_PRESERVE_ZERO_FRACTION;

/** @var array<int, Payload> */
private array $payloads = [];

private bool $streamMode = false;
/** @var int<0, max> Count of frames sent in stream mode */
private int $framesSent = 0;
private bool $shouldPing = false;
private bool $waitingPong = false;

public function __construct(
private readonly RelayInterface $relay,
bool $interceptSideEffects = true,
Expand Down Expand Up @@ -63,18 +71,33 @@ public function waitPayload(): ?Payload
case $payload::class === Payload::class:
return $payload;
case $payload instanceof WorkerStop:
$this->waitingPong = false;
return null;
case $payload::class === GetProcessId::class:
$this->sendProcessId();
// no break
continue 2;
case $payload instanceof Pong:
$this->waitingPong = false;
continue 2;
case $payload instanceof SkipMessage:
continue 2;
}
}
}

public function withStreamMode(): static
{
$clone = clone $this;
$clone->streamMode = true;
$clone->framesSent = 0;
$clone->shouldPing = false;
$clone->waitingPong = false;
return $clone;
}

public function respond(Payload $payload): void
{
$this->streamMode and ++$this->framesSent;
$this->send($payload->body, $payload->header, $payload->eos);
}

Expand Down Expand Up @@ -133,7 +156,7 @@ private function findPayload(string $class = null): ?int
}

$payload = $this->pullPayload();
if ($payload === null) {
if ($payload === null || $payload instanceof Pong) {
break;
}

Expand All @@ -151,20 +174,40 @@ private function findPayload(string $class = null): ?int
*/
private function pullPayload(): ?Payload
{
if (!$this->waitingPong && $this->relay instanceof BlockingRelayInterface) {
if (!$this->streamMode) {
return null;
}

$this->haveToPing();
return null;
}

if (!$this->relay->hasFrame()) {
return null;
}

$frame = $this->relay->waitFrame();
return PayloadFactory::fromFrame($frame);
$payload = PayloadFactory::fromFrame($frame);

if ($payload instanceof Pong) {
$this->waitingPong = false;
return null;
}

return $payload;
}

private function send(string $body = '', string $header = '', bool $eos = true): void
{
$frame = new Frame($header . $body, [\strlen($header)]);

if (!$eos) {
$frame->byte10 = Frame::BYTE10_STREAM;
$frame->byte10 |= Frame::BYTE10_STREAM;
}

if ($this->shouldPing) {
$frame->byte10 |= Frame::BYTE10_PING;
}

$this->sendFrame($frame);
Expand All @@ -173,6 +216,12 @@ private function send(string $body = '', string $header = '', bool $eos = true):
private function sendFrame(Frame $frame): void
{
try {
if ($this->streamMode && ($frame->byte10 & Frame::BYTE10_STREAM) && $this->shouldPing) {
$frame->byte10 |= Frame::BYTE10_PING;
$this->shouldPing = false;
$this->waitingPong = true;
}

$this->relay->send($frame);
} catch (GoridgeException $e) {
throw new TransportException($e->getMessage(), $e->getCode(), $e);
Expand Down Expand Up @@ -208,8 +257,11 @@ public static function createFromEnvironment(
bool $interceptSideEffects = true,
LoggerInterface $logger = new Logger(),
): self {
$address = $env->getRelayAddress();
\assert($address !== '', 'Relay address must be specified in environment');

return new self(
relay: Relay::create($env->getRelayAddress()),
relay: Relay::create($address),
interceptSideEffects: $interceptSideEffects,
logger: $logger
);
Expand All @@ -221,4 +273,15 @@ private function sendProcessId(): static
$this->sendFrame($frame);
return $this;
}

private function haveToPing(): void
{
if ($this->waitingPong || $this->framesSent === 0) {
return;
}

if ($this->framesSent % 5 === 0) {
$this->shouldPing = true;
}
}
}

0 comments on commit 5c22fab

Please sign in to comment.