Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC Implementation using multiple relays to enable async communication #25

Merged
merged 29 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
db96c17
feat: Implement "Async" RPC
L3tum Feb 5, 2024
dac5700
fix: Issues uncovered by Psalm
L3tum Feb 5, 2024
eb82ce5
feat: Add getResponses as well as fix a number of logic or Psalm errors
L3tum Feb 5, 2024
9a285df
feat: Add an unholy amount of tests
L3tum Feb 5, 2024
c6c7e0c
feat: Optimize getResponse to remove array_search and fix potential o…
L3tum Feb 5, 2024
a196dc7
chore: Add test for response buffer handling
L3tum Feb 5, 2024
68798fa
fix: Accidentally saved too many responses
L3tum Feb 6, 2024
c5affab
fix: Reorder methods to make sure we do not lose a relay
L3tum Feb 6, 2024
198d879
fix: Wrong order for $seq
L3tum Feb 6, 2024
10bc6df
fix: Up response buffer maximum
L3tum Feb 6, 2024
48f0efc
fix: Add Error Handling to MultiRPC::getResponses()
L3tum Feb 6, 2024
f7cb0d5
feat: Simplify MultiRPC and MultiRelayHelper, fixes some issues resul…
L3tum Feb 7, 2024
330bf9f
fix: Actually call tests
L3tum Feb 7, 2024
e35ef15
fix: Model array_key_last output as docblock
L3tum Feb 7, 2024
66fbc8a
fix: Issues introduced by simplification of relay handling
L3tum Feb 7, 2024
560008f
fix: Gracefully handle socket disconnect without blocking (too much) …
L3tum Feb 7, 2024
39966bc
fix: Handle cloning of MultiRPC
L3tum Feb 8, 2024
d7b8ea2
fix: Handle cloning of MultiRPC with SocketRelay
L3tum Feb 8, 2024
45ef16a
fix: Typo in Testclass Name
L3tum Feb 12, 2024
4a46e78
fix: Add comments documenting ensureFreeRelayAvailable and getRespons…
L3tum Feb 12, 2024
2cb78fb
fix: Simplify getResponses() a bit
L3tum Feb 12, 2024
825953a
fix: Add configurable buffer threshold and change exception message
L3tum Feb 12, 2024
ca811f7
feat: Refactor code around specialty handling of SocketRelay and add …
L3tum Feb 13, 2024
aadfe5b
fix: Make exception more descriptive
L3tum Feb 13, 2024
789a088
fix: Missing extends statement in interface
L3tum Feb 13, 2024
d4eb166
fix: Enforce __clone impl and test streams with data on them
L3tum Feb 13, 2024
34b26f5
fix: Remove @throws Error annotation
L3tum Feb 13, 2024
a7ed4fb
fix: Cloning MultiRPC could result in a lot of wasted memory due to a…
L3tum Feb 23, 2024
4c621e4
fix: StreamRelay in MultiRPC & Tests
L3tum Feb 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"scripts": {
"test": "phpunit --no-coverage --colors=always",
"test-cover": "phpunit --coverage-clover=coverage.xml",
"test-static": "psalm",
"test-static": "psalm --no-cache",
"test-mutations": "infection"
},
"minimum-stability": "dev",
Expand Down
36 changes: 36 additions & 0 deletions src/ConnectedRelayInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace Spiral\Goridge;

use Spiral\Goridge\Exception\RelayException;

/**
* This interface describes a Relay that explictily establishes a connection.
* That connection can also be re-established on the fly (in comparison to StreamRelay, which relies on the existence of the streams).
* The object is also clonable, i.e. supports cloning without data errors due to shared state.
*/
interface ConnectedRelayInterface extends RelayInterface
{
/**
* Returns true if the underlying connection is already established
*/
public function isConnected(): bool;

/**
* Establishes the underlying connection and returns true on success, false on failure, or throws an exception in case of an error.
*
* @throws RelayException
*/
public function connect(): bool;

/**
* Closes the underlying connection.
*/
public function close(): void;

/**
* Enforce implementation of __clone magic method
* @psalm-return void
*/
public function __clone();
}
115 changes: 115 additions & 0 deletions src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge;

use Spiral\Goridge\RPC\Exception\RPCException;
use function socket_select;

class MultiRelayHelper
{
/**
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function findRelayWithMessage(array $relays, int $timeoutInMicroseconds = 0): array|false
{
if (count($relays) === 0) {
return false;

Check warning on line 23 in src/MultiRelayHelper.php

View check run for this annotation

Codecov / codecov/patch

src/MultiRelayHelper.php#L23

Added line #L23 was not covered by tests
}

if ($relays[array_key_first($relays)] instanceof SocketRelay) {
$sockets = [];
$socketIdToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
assert($relay instanceof SocketRelay);

// Enforce connection
if ($relay->socket === null) {
// Important: Do not force reconnect here as it would otherwise completely ruin further handling
continue;
}

$sockets[] = $relay->socket;
$socketIdToRelayIndexMap[spl_object_id($relay->socket)] = $relayIndex;
}

if (count($sockets) === 0) {
return false;

Check warning on line 43 in src/MultiRelayHelper.php

View check run for this annotation

Codecov / codecov/patch

src/MultiRelayHelper.php#L43

Added line #L43 was not covered by tests
}

$writes = null;
$except = null;
$changes = socket_select($sockets, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($sockets as $socket) {
$indexes[] = $socketIdToRelayIndexMap[spl_object_id($socket)] ?? throw new RPCException("Invalid socket??");
}

return $indexes;
} else {
return false;
}
}

if ($relays[array_key_first($relays)] instanceof StreamRelay) {
$streams = [];
$streamNameToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
assert($relay instanceof StreamRelay);

$streams[] = $relay->in;
$streamNameToRelayIndexMap[(string)$relay->in] = $relayIndex;
}

$writes = null;
$except = null;
$changes = stream_select($streams, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($streams as $stream) {
$indexes[] = $streamNameToRelayIndexMap[(string)$stream] ?? throw new RPCException("Invalid stream??");
}

return $indexes;
} else {
return false;
}
}

return false;

Check warning on line 88 in src/MultiRelayHelper.php

View check run for this annotation

Codecov / codecov/patch

src/MultiRelayHelper.php#L88

Added line #L88 was not covered by tests
}

/**
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function checkConnected(array $relays): array|false
{
if (count($relays) === 0) {
return false;
}

$keysNotConnected = [];
foreach ($relays as $key => $relay) {
if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) {
$relay->connect();
$keysNotConnected[] = $key;
}
}

return $keysNotConnected;
}
}
90 changes: 90 additions & 0 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Stringable;
use function sprintf;
use function strlen;
use function substr;
use function ucfirst;

abstract class AbstractRPC implements RPCInterface
{
/**
* RPC calls service prefix.
*
* @var non-empty-string|null
*/
protected ?string $service = null;

/**
* @var positive-int
*/
protected static int $seq = 1;

public function __construct(
protected CodecInterface $codec
) {
}

/**
* @psalm-pure
*/
public function withServicePrefix(string $service): self
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->service = $service;

return $rpc;
}

/**
* @psalm-pure
*/
public function withCodec(CodecInterface $codec): self
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->codec = $codec;

return $rpc;
}

/**
* @throws Exception\ServiceException
*/
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
{
// exclude method name
$body = substr((string)$frame->payload, $frame->options[1]);

if ($frame->hasFlag(Frame::ERROR)) {
$name = $relay instanceof Stringable
? (string)$relay
: $relay::class;

Check warning on line 70 in src/RPC/AbstractRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/AbstractRPC.php#L70

Added line #L70 was not covered by tests

throw new ServiceException(sprintf("Error '%s' on %s", $body, $name));
}

return $this->codec->decode($body, $options);
}

/**
* @param non-empty-string $method
*/
protected function packFrame(string $method, mixed $payload): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex());
}
}
69 changes: 69 additions & 0 deletions src/RPC/AsyncRPCInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Exception\GoridgeException;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;

interface AsyncRPCInterface extends RPCInterface
{
/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly and ignore the response.
*
* @param non-empty-string $method
*
* @throws GoridgeException
*/
public function callIgnoreResponse(string $method, mixed $payload): void;

/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly but accept a response.
*
* @param non-empty-string $method
* @return positive-int An "ID" to check whether a response has been received and to fetch said response.
*
* @throws GoridgeException
*/
public function callAsync(string $method, mixed $payload): int;

/**
* Check whether a response has been received using the "ID" obtained through @see AsyncRPCInterface::callAsync() .
*
* @param positive-int $seq
* @return bool
*/
public function hasResponse(int $seq): bool;

/**
* Checks the "ID"s obtained through @see AsyncRPCInterface::callAsync() if they've got a response yet.
* Returns an array of "ID"s that do.
*
* @param positive-int[] $seqs
* @return positive-int[]
*/
public function hasResponses(array $seqs): array;

/**
* Fetch the response for the "ID" obtained through @see AsyncRPCInterface::callAsync() .
* @param positive-int $seq
* @throws RPCException
* @throws ServiceException
* @throws RelayException
*/
public function getResponse(int $seq, mixed $options = null): mixed;

/**
* Fetches the responses for the "ID"s obtained through @see AsyncRPCInterface::callAsync()
* and returns a map of "ID" => Response.
* @throws RelayException
* @throws ServiceException
* @throws RPCException
*
* @param array<array-key, positive-int> $seqs
* @return iterable<positive-int, mixed>
*
*/
public function getResponses(array $seqs, mixed $options = null): iterable;
}
Loading
Loading