diff --git a/composer.json b/composer.json index 8b4445d..7a54dc8 100644 --- a/composer.json +++ b/composer.json @@ -75,7 +75,7 @@ "scripts": { "test": "phpunit --no-coverage --colors=always", "test-cover": "phpunit --coverage-clover=coverage.xml", - "test-static": "psalm", + "test-static": "psalm --no-cache", "test-mutations": "infection" }, "minimum-stability": "dev", diff --git a/src/ConnectedRelayInterface.php b/src/ConnectedRelayInterface.php new file mode 100644 index 0000000..ce15cd3 --- /dev/null +++ b/src/ConnectedRelayInterface.php @@ -0,0 +1,36 @@ + $relays + * @return array-key[]|false + * @internal + * Returns either + * - an array of array keys, even if only one + * - or false if none + */ + public static function findRelayWithMessage(array $relays, int $timeoutInMicroseconds = 0): array|false + { + if (count($relays) === 0) { + return false; + } + + if ($relays[array_key_first($relays)] instanceof SocketRelay) { + $sockets = []; + $socketIdToRelayIndexMap = []; + foreach ($relays as $relayIndex => $relay) { + assert($relay instanceof SocketRelay); + + // Enforce connection + if ($relay->socket === null) { + // Important: Do not force reconnect here as it would otherwise completely ruin further handling + continue; + } + + $sockets[] = $relay->socket; + $socketIdToRelayIndexMap[spl_object_id($relay->socket)] = $relayIndex; + } + + if (count($sockets) === 0) { + return false; + } + + $writes = null; + $except = null; + $changes = socket_select($sockets, $writes, $except, 0, $timeoutInMicroseconds); + + if ($changes > 0) { + $indexes = []; + foreach ($sockets as $socket) { + $indexes[] = $socketIdToRelayIndexMap[spl_object_id($socket)] ?? throw new RPCException("Invalid socket??"); + } + + return $indexes; + } else { + return false; + } + } + + if ($relays[array_key_first($relays)] instanceof StreamRelay) { + $streams = []; + $streamNameToRelayIndexMap = []; + foreach ($relays as $relayIndex => $relay) { + assert($relay instanceof StreamRelay); + + $streams[] = $relay->in; + $streamNameToRelayIndexMap[(string)$relay->in] = $relayIndex; + } + + $writes = null; + $except = null; + $changes = stream_select($streams, $writes, $except, 0, $timeoutInMicroseconds); + + if ($changes > 0) { + $indexes = []; + foreach ($streams as $stream) { + $indexes[] = $streamNameToRelayIndexMap[(string)$stream] ?? throw new RPCException("Invalid stream??"); + } + + return $indexes; + } else { + return false; + } + } + + return false; + } + + /** + * @param array $relays + * @return array-key[]|false + * @internal + * Returns either + * - an array of array keys, even if only one + * - or false if none + */ + public static function checkConnected(array $relays): array|false + { + if (count($relays) === 0) { + return false; + } + + $keysNotConnected = []; + foreach ($relays as $key => $relay) { + if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) { + $relay->connect(); + $keysNotConnected[] = $key; + } + } + + return $keysNotConnected; + } +} diff --git a/src/RPC/AbstractRPC.php b/src/RPC/AbstractRPC.php new file mode 100644 index 0000000..ab4e593 --- /dev/null +++ b/src/RPC/AbstractRPC.php @@ -0,0 +1,90 @@ +service = $service; + + return $rpc; + } + + /** + * @psalm-pure + */ + public function withCodec(CodecInterface $codec): self + { + /** @psalm-suppress ImpureVariable */ + $rpc = clone $this; + $rpc->codec = $codec; + + return $rpc; + } + + /** + * @throws Exception\ServiceException + */ + protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed + { + // exclude method name + $body = substr((string)$frame->payload, $frame->options[1]); + + if ($frame->hasFlag(Frame::ERROR)) { + $name = $relay instanceof Stringable + ? (string)$relay + : $relay::class; + + throw new ServiceException(sprintf("Error '%s' on %s", $body, $name)); + } + + return $this->codec->decode($body, $options); + } + + /** + * @param non-empty-string $method + */ + protected function packFrame(string $method, mixed $payload): Frame + { + if ($this->service !== null) { + $method = $this->service . '.' . ucfirst($method); + } + + $body = $method . $this->codec->encode($payload); + return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex()); + } +} diff --git a/src/RPC/AsyncRPCInterface.php b/src/RPC/AsyncRPCInterface.php new file mode 100644 index 0000000..ac0959c --- /dev/null +++ b/src/RPC/AsyncRPCInterface.php @@ -0,0 +1,69 @@ + Response. + * @throws RelayException + * @throws ServiceException + * @throws RPCException + * + * @param array $seqs + * @return iterable + * + */ + public function getResponses(array $seqs, mixed $options = null): iterable; +} diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php new file mode 100644 index 0000000..e518947 --- /dev/null +++ b/src/RPC/MultiRPC.php @@ -0,0 +1,453 @@ + + */ + private static array $freeRelays = []; + + /** + * Occupied Relays is a map of seq to relay to make removal easier once a response is received. + * @var array + */ + private static array $occupiedRelays = []; + + /** + * A map of seq to relay to use for decodeResponse(). + * Technically the relay there is only needed in case of an error. + * + * @var array + */ + private static array $seqToRelayMap = []; + + /** + * Map of seq to response Frame + * Should only really need to be used in cases of high amounts of traffic + * + * @var array + */ + private static array $asyncResponseBuffer = []; + + /** + * The threshold after which the asyncResponseBuffer is flushed of all entries. + */ + private int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD; + + /** + * @param array $relays + */ + public function __construct( + array $relays, + int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD, + CodecInterface $codec = new JsonCodec() + ) { + // Check if we have at least one either existing or new relay here + if (count($relays) === 0 && count(self::$freeRelays) === 0 && count(self::$occupiedRelays) === 0) { + throw new RPCException("MultiRPC needs at least one relay. Zero provided."); + } + + if (count($relays) > 0) { + // Check if all new relays are of the same type + if (count(array_unique(array_map(static fn(RelayInterface $relay) => $relay::class, $relays))) > 1) { + throw new RPCException("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class); + } + + // Check if the existing relays (if any) and the new relays are of the same type. + if (count(self::$freeRelays) > 0) { + $existingRelay = self::$freeRelays[0]; + } elseif (count(self::$occupiedRelays) > 0) { + $existingRelay = self::$occupiedRelays[array_key_first(self::$occupiedRelays)]; + } else { + $existingRelay = null; + } + + if ($existingRelay !== null && $existingRelay::class !== $relays[0]::class) { + throw new RPCException("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class); + } + } + + // The relays (and related arrays) are static to support cloning this class. + // Basically the following problem exists: + // - If we make these arrays instance variables, then we need to recreate the relays on clone, otherwise we'd run into data issues. + // When we do that, the number of relays in existence can increase quite dramatically, resulting in balooning memory usage for socket buffers. + // - If we make these arrays static variables, then we need to make certain that they stay the same across all instances + // of this class. As a result the arrays are basically only appended on, and never deleted or modified. + // In the end that *can* mean that if someone were to repeatedly call `new MultiRPC([a bunch of relays])` that we'd + // tack all those relays into this array resulting in the same problem. + // It also means that different services can cannibalize the number of relays available to them, + // for example a Metrics service and a KV (Cache) service. + // IMHO(L3tum) a balooning memory usage that occurs unexpectly is way worse, than any of the other problems. In the end + // one can work against cannibalized relays by simply upping the number of relays at any point. + self::$freeRelays = [...self::$freeRelays, ...$relays]; + $this->asyncBufferThreshold = $asyncBufferThreshold; + parent::__construct($codec); + } + + /** + * @param non-empty-string $connection + * @param positive-int $count + */ + public static function create( + string $connection, + int $count = 50, + int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD, + CodecInterface $codec = new JsonCodec() + ): self { + assert($count > 0); + $count = $count - count(self::$freeRelays) - count(self::$occupiedRelays); + $relays = []; + + for ($i = 0; $i < $count; $i++) { + $relay = Relay::create($connection); + $relays[] = $relay; + } + + return new self($relays, $asyncBufferThreshold, $codec); + } + + /** + * Force-connects all relays. + * @throws RelayException + */ + public function preConnectRelays(): void + { + foreach (self::$freeRelays as $relay) { + if ($relay instanceof ConnectedRelayInterface) { + // Force connect + $relay->connect(); + } + } + } + + public function call(string $method, mixed $payload, mixed $options = null): mixed + { + $relayIndex = $this->ensureFreeRelayAvailable(); + $relay = self::$freeRelays[$relayIndex]; + + $relay->send($this->packFrame($method, $payload)); + + // wait for the frame confirmation + $frame = $this->getResponseFromRelay($relay, self::$seq, true); + + self::$seq++; + + return $this->decodeResponse($frame, $relay, $options); + } + + public function callIgnoreResponse(string $method, mixed $payload): void + { + $relayIndex = $this->ensureFreeRelayAvailable(); + $relay = self::$freeRelays[$relayIndex]; + + $relay->send($this->packFrame($method, $payload)); + + $seq = self::$seq; + self::$seq++; + self::$occupiedRelays[$seq] = $relay; + // Last index so no need for array_pop or stuff + unset(self::$freeRelays[$relayIndex]); + } + + public function callAsync(string $method, mixed $payload): int + { + // Flush buffer if someone doesn't call getResponse + if (count(self::$asyncResponseBuffer) > $this->asyncBufferThreshold) { + // We don't need to clean up occupiedRelays here since the buffer is solely for responses already + // fetched from relays, and those relays are put back to freeRelays in getNextFreeRelay() + self::$seqToRelayMap = array_diff_key(self::$seqToRelayMap, self::$asyncResponseBuffer); + self::$asyncResponseBuffer = []; + } + + $relayIndex = $this->ensureFreeRelayAvailable(); + $relay = self::$freeRelays[$relayIndex]; + + $relay->send($this->packFrame($method, $payload)); + + $seq = self::$seq; + self::$seq++; + self::$occupiedRelays[$seq] = $relay; + self::$seqToRelayMap[$seq] = $relay; + // Last index so no need for array_pop or stuff + unset(self::$freeRelays[$relayIndex]); + + return $seq; + } + + public function hasResponse(int $seq): bool + { + // Check if we have the response buffered previously due to congestion + if (isset(self::$asyncResponseBuffer[$seq])) { + return true; + } + + // Else check if the relay has the response in its buffer + if (self::$seqToRelayMap[$seq]->hasFrame()) { + return true; + } + + return false; + } + + public function hasResponses(array $seqs): array + { + $relays = []; + /** @var array $relayIndexToSeq */ + $relayIndexToSeq = []; + $seqsWithResponse = []; + + // The behaviour is essentially the same as self::hasResponse, just mapped to multiple $seqs aka $relays. + // In order to use MultiRelayHelper we create a map of index => seq to map it back after checking for messages. + foreach ($seqs as $seq) { + if (isset(self::$asyncResponseBuffer[$seq])) { + $seqsWithResponse[] = $seq; + } elseif (isset(self::$seqToRelayMap[$seq])) { + $relayIndexToSeq[count($relays)] = $seq; + $relays[] = self::$seqToRelayMap[$seq]; + } + } + + /** @var int[]|false $index */ + $index = MultiRelayHelper::findRelayWithMessage($relays); + + if ($index === false) { + return $seqsWithResponse; + } + + foreach ($index as $relayIndex) { + $seqsWithResponse[] = $relayIndexToSeq[$relayIndex]; + } + + return $seqsWithResponse; + } + + public function getResponse(int $seq, mixed $options = null): mixed + { + $relay = self::$seqToRelayMap[$seq] ?? throw new RPCException(self::ERR_INVALID_SEQ_NUMBER); + unset(self::$seqToRelayMap[$seq]); + + if (($frame = $this->getResponseFromBuffer($seq)) !== null) { + /** + * We can assume through @see MultiRPC::ensureFreeRelayAvailable() that a relay whose response is already + * in this buffer has also been added to freeRelays (or is otherwise occupied). + * Thus we only re-add (and do so without searching for it first) if we don't have the response yet. + */ + } else { + self::$freeRelays[] = self::$occupiedRelays[$seq]; + unset(self::$occupiedRelays[$seq]); + + $frame = $this->getResponseFromRelay($relay, $seq, true); + } + + return $this->decodeResponse($frame, $relay, $options); + } + + public function getResponses(array $seqs, mixed $options = null): iterable + { + // Quick return + if (count($seqs) === 0) { + return; + } + + // Flip the array to use the $seqs for key indexing + $seqsKeyed = []; + + foreach ($seqs as $seq) { + if (isset(self::$asyncResponseBuffer[$seq])) { + // We can use getResponse() here since it's doing basically what we want to do here anyway + yield $seq => $this->getResponse($seq, $options); + } else { + $seqsKeyed[$seq] = true; + } + } + + // Fetch all relays that are still occupied and which we need responses from + $seqsToRelays = array_intersect_key(self::$occupiedRelays, $seqsKeyed); + + // Make sure we have relays for all $seqs, otherwise something went wrong + if (count($seqsToRelays) !== count($seqsKeyed)) { + throw new RPCException(self::ERR_INVALID_SEQ_NUMBER); + } + + $timeoutInMicroseconds = 0; + while (count($seqsToRelays) > 0) { + // Do a first pass without a timeout. Maybe there's already most responses which would make a timeout unnecessary. + /** @var positive-int[]|false $seqsReceivedResponse */ + $seqsReceivedResponse = MultiRelayHelper::findRelayWithMessage($seqsToRelays, $timeoutInMicroseconds); + $timeoutInMicroseconds = 500; + + if ($seqsReceivedResponse === false) { + if ($this->checkAllOccupiedRelaysStillConnected()) { + // Check if we've lost a relay we were waiting on, if so we need to quit since something is wrong. + if (count(array_diff_key($seqsToRelays, self::$occupiedRelays)) > 0) { + throw new RPCException(self::ERR_INVALID_SEQ_NUMBER); + } + } + continue; + } + + foreach ($seqsReceivedResponse as $seq) { + // Add the previously occupied relay to freeRelays here so that we don't lose it in case of an error + $relay = $seqsToRelays[$seq]; + self::$freeRelays[] = $relay; + unset(self::$occupiedRelays[$seq]); + + // Yield the response + $frame = $this->getResponseFromRelay($relay, $seq, true); + yield $seq => $this->decodeResponse($frame, $relay, $options); + + // Unset tracking map + unset($seqsToRelays[$seq], self::$seqToRelayMap[$seq]); + } + } + } + + /** + * Returns array-key of free relay + * @throws RPCException + */ + private function ensureFreeRelayAvailable(): int + { + if (count(self::$freeRelays) > 0) { + // Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling) + /** @psalm-return int */ + return array_key_last(self::$freeRelays); + } + + if (count(self::$occupiedRelays) === 0) { + // If we have neither freeRelays nor occupiedRelays then someone either initialized this with 0 relays + // or something went terribly wrong. Either way we need to quit. + throw new RPCException("No relays available at all"); + } + + while (count(self::$freeRelays) === 0) { + /** @var positive-int[]|false $index */ + $index = MultiRelayHelper::findRelayWithMessage(self::$occupiedRelays); + + if ($index === false) { + // Check if all currently occupied relays are even still connected. Do another loop if they aren't. + if ($this->checkAllOccupiedRelaysStillConnected()) { + continue; + } else { + // Just choose the first occupiedRelay to wait on since instead we may busyloop here + // checking relay status and not giving RR the chance to actually answer (in a single core env for example). + $index = [array_key_first(self::$occupiedRelays)]; + } + } + + // Flush as many relays as we can up until a limit (arbitrarily 10?) + for ($i = 0, $max = min(10, count($index)); $i < $max; $i++) { + /** @var positive-int $seq */ + $seq = $index[$i]; + // Move relay from occupiedRelays into freeRelays before trying to get the response from it + // in case something happens, so we don't lose it. + $relay = self::$occupiedRelays[$seq]; + self::$freeRelays[] = $relay; + unset(self::$occupiedRelays[$seq]); + // Save response if in seqToRelayMap (aka a response is expected) + // only save response in case of mismatched seq = response not in seqToRelayMap + try { + $this->getResponseFromRelay($relay, $seq, !isset(self::$seqToRelayMap[$seq])); + } catch (RelayException|RPCException) { + // Intentionally left blank + } + } + } + + // Sometimes check if all occupied relays are even still connected + $this->checkAllOccupiedRelaysStillConnected(); + + // Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling) + return array_key_last(self::$freeRelays); + } + + /** + * Gets a response from the relay, blocking for it if necessary, with some error handling in regards to mismatched seq + * + * @param positive-int $expectedSeq + */ + private function getResponseFromRelay(RelayInterface $relay, int $expectedSeq, bool $onlySaveResponseInCaseOfMismatchedSeq = false): Frame + { + if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) { + throw new TransportException("Unable to read payload from the stream"); + } + + $frame = $relay->waitFrame(); + + if (count($frame->options) !== 2) { + // Expect at least a few options + throw new RPCException('Invalid RPC frame, options missing'); + } + + if ($frame->options[0] !== $expectedSeq) { + // Save response since $seq was invalid but the response may not + /** @var positive-int $responseSeq */ + $responseSeq = $frame->options[0]; + self::$asyncResponseBuffer[$responseSeq] = $frame; + + throw new RPCException('Invalid RPC frame, sequence mismatch'); + } + + if (!$onlySaveResponseInCaseOfMismatchedSeq) { + // If we want to save the response, regardless of whether the $seq was a match or not, + // we'll need to add it to the buffer. + // This is used in e.g. flushing a relay in ensureFreeRelay() + // so that we can at least *try* to get the resonse back to the user. + self::$asyncResponseBuffer[$expectedSeq] = $frame; + } + + return $frame; + } + + /** + * Tries to get a response (Frame) from the buffer and unsets the entry if it finds the response. + * + * @param positive-int $seq + * @return Frame|null + */ + private function getResponseFromBuffer(int $seq): ?Frame + { + if (($frame = self::$asyncResponseBuffer[$seq] ?? null) !== null) { + unset(self::$asyncResponseBuffer[$seq]); + } + + return $frame; + } + + private function checkAllOccupiedRelaysStillConnected(): bool + { + if (($relaysNotConnected = MultiRelayHelper::checkConnected(self::$occupiedRelays)) !== false) { + /** @var positive-int $seq */ + foreach ($relaysNotConnected as $seq) { + self::$freeRelays[] = self::$occupiedRelays[$seq]; + unset(self::$seqToRelayMap[$seq], self::$occupiedRelays[$seq]); + } + + return true; + } + + return false; + } +} diff --git a/src/RPC/RPC.php b/src/RPC/RPC.php index d26fe6d..28979dc 100644 --- a/src/RPC/RPC.php +++ b/src/RPC/RPC.php @@ -4,55 +4,21 @@ namespace Spiral\Goridge\RPC; -use Spiral\Goridge\Frame; use Spiral\Goridge\Relay; use Spiral\Goridge\RelayInterface; use Spiral\Goridge\RPC\Codec\JsonCodec; use Spiral\Goridge\RPC\Exception\RPCException; -use Spiral\Goridge\RPC\Exception\ServiceException; +use function count; -class RPC implements RPCInterface +class RPC extends AbstractRPC { - /** - * RPC calls service prefix. - * - * @var non-empty-string|null - */ - private ?string $service = null; - - /** - * @var positive-int - */ - private static int $seq = 1; public function __construct( private readonly RelayInterface $relay, - private CodecInterface $codec = new JsonCodec(), - ) { - } - - /** - * @psalm-pure - */ - public function withServicePrefix(string $service): RPCInterface - { - /** @psalm-suppress ImpureVariable */ - $rpc = clone $this; - $rpc->service = $service; - - return $rpc; - } - - /** - * @psalm-pure - */ - public function withCodec(CodecInterface $codec): RPCInterface + CodecInterface $codec = new JsonCodec(), + ) { - /** @psalm-suppress ImpureVariable */ - $rpc = clone $this; - $rpc->codec = $codec; - - return $rpc; + parent::__construct($codec); } public function call(string $method, mixed $payload, mixed $options = null): mixed @@ -62,7 +28,7 @@ public function call(string $method, mixed $payload, mixed $options = null): mix // wait for the frame confirmation $frame = $this->relay->waitFrame(); - if (\count($frame->options) !== 2) { + if (count($frame->options) !== 2) { throw new RPCException('Invalid RPC frame, options missing'); } @@ -72,7 +38,7 @@ public function call(string $method, mixed $payload, mixed $options = null): mix self::$seq++; - return $this->decodeResponse($frame, $options); + return $this->decodeResponse($frame, $this->relay, $options); } /** @@ -84,36 +50,4 @@ public static function create(string $connection, CodecInterface $codec = new Js return new self($relay, $codec); } - - /** - * @throws Exception\ServiceException - */ - private function decodeResponse(Frame $frame, mixed $options = null): mixed - { - // exclude method name - $body = \substr((string)$frame->payload, $frame->options[1]); - - if ($frame->hasFlag(Frame::ERROR)) { - $name = $this->relay instanceof \Stringable - ? (string)$this->relay - : $this->relay::class; - - throw new ServiceException(\sprintf("Error '%s' on %s", $body, $name)); - } - - return $this->codec->decode($body, $options); - } - - /** - * @param non-empty-string $method - */ - private function packFrame(string $method, mixed $payload): Frame - { - if ($this->service !== null) { - $method = $this->service . '.' . \ucfirst($method); - } - - $body = $method . $this->codec->encode($payload); - return new Frame($body, [self::$seq, \strlen($method)], $this->codec->getIndex()); - } } diff --git a/src/RPC/RPCInterface.php b/src/RPC/RPCInterface.php index 310db86..3472eb6 100644 --- a/src/RPC/RPCInterface.php +++ b/src/RPC/RPCInterface.php @@ -26,7 +26,7 @@ public function withServicePrefix(string $service): self; public function withCodec(CodecInterface $codec): self; /** - * Invoke remove RoadRunner service method using given payload (free form). + * Invoke remote RoadRunner service method using given payload (free form). * * @param non-empty-string $method * diff --git a/src/SocketRelay.php b/src/SocketRelay.php index 9e31876..7dc607d 100644 --- a/src/SocketRelay.php +++ b/src/SocketRelay.php @@ -24,7 +24,7 @@ * * @psalm-suppress DeprecatedInterface */ -class SocketRelay extends Relay implements Stringable +class SocketRelay extends Relay implements Stringable, ConnectedRelayInterface { final public const RECONNECT_RETRIES = 10; final public const RECONNECT_TIMEOUT = 100; @@ -38,7 +38,12 @@ class SocketRelay extends Relay implements Stringable /** @var PortType */ private readonly ?int $port; private readonly SocketType $type; - private ?Socket $socket = null; + /** + * @internal + * This isn't really ideal but there's no easy way since we need access to the underlying socket + * to do a socket_select across multiple SocketRelays. + */ + public ?Socket $socket = null; /** * Example: @@ -105,6 +110,12 @@ public function __toString(): string return "unix://{$this->address}"; } + public function __clone() + { + // Remove reference to socket on clone + $this->socket = null; + } + public function getAddress(): string { return $this->address; @@ -214,7 +225,6 @@ public function hasFrame(): bool * @param int<0, max> $timeout Timeout between reconnections in microseconds. * * @throws RelayException - * @throws \Error When sockets are used in unsupported environment. */ public function connect(int $retries = self::RECONNECT_RETRIES, int $timeout = self::RECONNECT_TIMEOUT): bool { diff --git a/src/StreamRelay.php b/src/StreamRelay.php index acf2094..6fca939 100644 --- a/src/StreamRelay.php +++ b/src/StreamRelay.php @@ -22,8 +22,11 @@ class StreamRelay extends Relay implements BlockingRelayInterface { /** * @var resource + * @internal + * This isn't really ideal but there's no easy way since we need access to the underlying stream + * to do a stream_select across multiple StreamRelays. */ - private $in; + public $in; /** * @var resource diff --git a/tests/Goridge/MsgPackMultiRPCTest.php b/tests/Goridge/MsgPackMultiRPCTest.php new file mode 100644 index 0000000..a66e2e1 --- /dev/null +++ b/tests/Goridge/MsgPackMultiRPCTest.php @@ -0,0 +1,40 @@ +expectException(ServiceException::class); + $this->rpc->call('Service.Process', random_bytes(256)); + } + + public function testJsonExceptionAsync(): void + { + $id = $this->rpc->callAsync('Service.Process', random_bytes(256)); + $this->expectException(ServiceException::class); + $this->rpc->getResponse($id); + } + + public function testJsonExceptionNotThrownWithIgnoreResponse(): void + { + $this->rpc->callIgnoreResponse('Service.Process', random_bytes(256)); + $this->forceFlushRpc(); + } + + protected function makeRPC(int $count = 10): void + { + parent::makeRPC($count); + $this->rpc = $this->rpc->withCodec(new MsgpackCodec()); + } +} diff --git a/tests/Goridge/MultiRPC.php b/tests/Goridge/MultiRPC.php new file mode 100644 index 0000000..29b37e5 --- /dev/null +++ b/tests/Goridge/MultiRPC.php @@ -0,0 +1,899 @@ +setValue([]); + + $relays = []; + for ($i = 0; $i < 10; $i++) { + $relays[] = $this->makeRelay(); + } + /** @var SocketRelay $relay */ + $relay = $relays[0]; + $this->rpc = new GoridgeMultiRPC($relays); + $this->expectedNumberOfRelays = 10; + + $this->assertFalse($relay->isConnected()); + + $relay->connect(); + $this->assertTrue($relay->isConnected()); + + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); + $this->assertTrue($relay->isConnected()); + + $this->rpc->preConnectRelays(); + foreach ($relays as $relay) { + $this->assertTrue($relay->isConnected()); + } + } + + public function testReconnect(): void + { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $property->setValue([]); + + /** @var SocketRelay $relay */ + $relay = $this->makeRelay(); + $this->rpc = new GoridgeMultiRPC([$relay]); + $this->expectedNumberOfRelays = 1; + + $this->assertFalse($relay->isConnected()); + + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); + $this->assertTrue($relay->isConnected()); + + $relay->close(); + $this->assertFalse($relay->isConnected()); + + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); + $this->assertTrue($relay->isConnected()); + } + + public function testPingPong(): void + { + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); + } + + public function testPingPongAsync(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testPrefixPingPong(): void + { + $this->rpc = $this->rpc->withServicePrefix('Service'); + $this->assertSame('pong', $this->rpc->call('Ping', 'ping')); + } + + public function testPrefixPingPongAsync(): void + { + $this->rpc = $this->rpc->withServicePrefix('Service'); + $id = $this->rpc->callAsync('Ping', 'ping'); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testPingNull(): void + { + $this->assertSame('', $this->rpc->call('Service.Ping', 'not-ping')); + } + + public function testPingNullAsync(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'not-ping'); + $this->assertSame('', $this->rpc->getResponse($id)); + } + + public function testNegate(): void + { + $this->assertSame(-10, $this->rpc->call('Service.Negate', 10)); + } + + public function testNegateAsync(): void + { + $id = $this->rpc->callAsync('Service.Negate', 10); + $this->assertSame(-10, $this->rpc->getResponse($id)); + } + + public function testNegateNegative(): void + { + $this->assertSame(10, $this->rpc->call('Service.Negate', -10)); + } + + public function testNegateNegativeAsync(): void + { + $id = $this->rpc->callAsync('Service.Negate', -10); + $this->assertSame(10, $this->rpc->getResponse($id)); + } + + public function testInvalidService(): void + { + $this->expectException(ServiceException::class); + $this->rpc = $this->rpc->withServicePrefix('Service2'); + $this->assertSame('pong', $this->rpc->call('Ping', 'ping')); + } + + public function testInvalidServiceAsync(): void + { + $this->rpc = $this->rpc->withServicePrefix('Service2'); + $id = $this->rpc->callAsync('Ping', 'ping'); + $this->expectException(ServiceException::class); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testInvalidMethod(): void + { + $this->expectException(ServiceException::class); + $this->rpc = $this->rpc->withServicePrefix('Service'); + $this->assertSame('pong', $this->rpc->call('Ping2', 'ping')); + } + + public function testInvalidMethodAsync(): void + { + $this->rpc = $this->rpc->withServicePrefix('Service'); + $id = $this->rpc->callAsync('Ping2', 'ping'); + $this->expectException(ServiceException::class); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testLongEcho(): void + { + $payload = base64_encode(random_bytes(65000 * 5)); + + $resp = $this->rpc->call('Service.Echo', $payload); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testLongEchoAsync(): void + { + $payload = base64_encode(random_bytes(65000 * 5)); + + $id = $this->rpc->callAsync('Service.Echo', $payload); + $resp = $this->rpc->getResponse($id); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testConvertException(): void + { + $this->expectException(ServiceException::class); + $this->expectExceptionMessage('unknown Raw payload type'); + + $payload = base64_encode(random_bytes(65000 * 5)); + + $resp = $this->rpc->withCodec(new RawCodec())->call( + 'Service.Echo', + $payload + ); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testConvertExceptionAsync(): void + { + $payload = base64_encode(random_bytes(65000 * 5)); + + $this->rpc = $this->rpc->withCodec(new RawCodec()); + $id = $this->rpc->callAsync( + 'Service.Echo', + $payload + ); + + $this->expectException(ServiceException::class); + $this->expectExceptionMessage('unknown Raw payload type'); + + $resp = $this->rpc->getResponse($id); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testRawBody(): void + { + $payload = random_bytes(100); + + $resp = $this->rpc->withCodec(new RawCodec())->call( + 'Service.EchoBinary', + $payload + ); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testRawBodyAsync(): void + { + $payload = random_bytes(100); + + $this->rpc = $this->rpc->withCodec(new RawCodec()); + $id = $this->rpc->callAsync( + 'Service.EchoBinary', + $payload + ); + $resp = $this->rpc->getResponse($id); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testLongRawBody(): void + { + $payload = random_bytes(65000 * 1000); + + $resp = $this->rpc->withCodec(new RawCodec())->call( + 'Service.EchoBinary', + $payload + ); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testLongRawBodyAsync(): void + { + $payload = random_bytes(65000 * 1000); + + $this->rpc = $this->rpc->withCodec(new RawCodec()); + $id = $this->rpc->callAsync( + 'Service.EchoBinary', + $payload + ); + $resp = $this->rpc->getResponse($id); + + $this->assertSame(strlen($payload), strlen($resp)); + $this->assertSame(md5($payload), md5($resp)); + } + + public function testPayload(): void + { + $resp = $this->rpc->call( + 'Service.Process', + [ + 'Name' => 'wolfy-j', + 'Value' => 18 + ] + ); + + $this->assertSame( + [ + 'Name' => 'WOLFY-J', + 'Value' => -18, + 'Keys' => null + ], + $resp + ); + } + + public function testPayloadAsync(): void + { + $id = $this->rpc->callAsync( + 'Service.Process', + [ + 'Name' => 'wolfy-j', + 'Value' => 18 + ] + ); + $resp = $this->rpc->getResponse($id); + + $this->assertSame( + [ + 'Name' => 'WOLFY-J', + 'Value' => -18, + 'Keys' => null + ], + $resp + ); + } + + public function testBadPayload(): void + { + $this->expectException(ServiceException::class); + $this->expectExceptionMessage('unknown Raw payload type'); + + $this->rpc->withCodec(new RawCodec())->call('Service.Process', 'raw'); + } + + public function testBadPayloadAsync(): void + { + $this->rpc = $this->rpc->withCodec(new RawCodec()); + $id = $this->rpc->callAsync('Service.Process', 'raw'); + + $this->expectException(ServiceException::class); + $this->expectExceptionMessage('unknown Raw payload type'); + $resp = $this->rpc->getResponse($id); + } + + public function testPayloadWithMap(): void + { + $resp = $this->rpc->call( + 'Service.Process', + [ + 'Name' => 'wolfy-j', + 'Value' => 18, + 'Keys' => [ + 'Key' => 'value', + 'Email' => 'domain' + ] + ] + ); + + $this->assertIsArray($resp['Keys']); + $this->assertArrayHasKey('value', $resp['Keys']); + $this->assertArrayHasKey('domain', $resp['Keys']); + + $this->assertSame('Key', $resp['Keys']['value']); + $this->assertSame('Email', $resp['Keys']['domain']); + } + + public function testPayloadWithMapAsync(): void + { + $id = $this->rpc->callAsync( + 'Service.Process', + [ + 'Name' => 'wolfy-j', + 'Value' => 18, + 'Keys' => [ + 'Key' => 'value', + 'Email' => 'domain' + ] + ] + ); + $resp = $this->rpc->getResponse($id); + + $this->assertIsArray($resp['Keys']); + $this->assertArrayHasKey('value', $resp['Keys']); + $this->assertArrayHasKey('domain', $resp['Keys']); + + $this->assertSame('Key', $resp['Keys']['value']); + $this->assertSame('Email', $resp['Keys']['domain']); + } + + public function testBrokenPayloadMap(): void + { + $id = $this->rpc->callAsync( + 'Service.Process', + [ + 'Name' => 'wolfy-j', + 'Value' => 18, + 'Keys' => 1111 + ] + ); + + $this->expectException(ServiceException::class); + $resp = $this->rpc->getResponse($id); + } + + public function testJsonException(): void + { + $this->expectException(CodecException::class); + + $this->rpc->call('Service.Process', random_bytes(256)); + } + + public function testJsonExceptionAsync(): void + { + $this->expectException(CodecException::class); + $id = $this->rpc->callAsync('Service.Process', random_bytes(256)); + } + + public function testJsonExceptionNotThrownWithIgnoreResponse(): void + { + $this->expectException(CodecException::class); + $this->rpc->callIgnoreResponse('Service.Process', random_bytes(256)); + } + + public function testSleepEcho(): void + { + $time = hrtime(true) / 1e9; + $this->assertSame('Hello', $this->rpc->call('Service.SleepEcho', 'Hello')); + // sleep is 100ms, so we check if we are further along than 100ms + $this->assertGreaterThanOrEqual($time + 0.1, hrtime(true) / 1e9); + } + + public function testSleepEchoAsync(): void + { + $time = hrtime(true) / 1e9; + $id = $this->rpc->callAsync('Service.SleepEcho', 'Hello'); + // hrtime is in nanoseconds, and at most expect 1ms delay (sleep is 100ms) + $this->assertLessThanOrEqual($time + 0.001, hrtime(true) / 1e9); + $this->assertFalse($this->rpc->hasResponse($id)); + $this->assertSame('Hello', $this->rpc->getResponse($id)); + // sleep is 100ms, so we check if we are further along than 100ms + $this->assertGreaterThanOrEqual($time + 0.1, hrtime(true) / 1e9); + } + + public function testSleepEchoIgnoreResponse(): void + { + $time = hrtime(true) / 1e9; + $this->rpc->callIgnoreResponse('Service.SleepEcho', 'Hello'); + // hrtime is in nanoseconds, and at most expect 1ms delay (sleep is 100ms) + $this->assertLessThanOrEqual($time + 0.001, hrtime(true) / 1e9); + // Wait for response + usleep(100_000); + + $this->forceFlushRpc(); + } + + public function testCannotGetSameResponseTwice(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->assertSame('pong', $this->rpc->getResponse($id)); + $this->assertFreeRelaysCorrectNumber($this->rpc); + $this->expectException(RPCException::class); + $this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testCanCallMoreTimesThanRelays(): void + { + $ids = []; + + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + } + + public function testCanCallMoreTimesThanBufferAndNotGetResponses(): void + { + $ids = []; + + // Flood to force the issue + for ($i = 0; $i < 20_000; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + $this->expectException(RPCException::class); + + // We cheat here since the order in which responses are discarded depends on when they are received + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'asyncResponseBuffer'); + $buffer = $property->getValue(); + + foreach ($ids as $id) { + if (!isset($buffer[$id])) { + $this->rpc->getResponse($id); + $this->fail("Invalid seq did not throw exception"); + } + } + } + + public function testCanCallMoreTimesThanRelaysWithIntermittentResponseHandling(): void + { + $ids = []; + + for ($i = 0; $i < 150; $i++) { + if ($i === 50) { + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + $ids = []; + } + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + } + + public function testHandleRelayDisconnect(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $occupiedRelays = $property->getValue(); + $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); + $occupiedRelays[$id]->close(); + $this->expectException(TransportException::class); + $this->rpc->getResponse($id); + } + + public function testHandleRelayDisconnectWithPressure(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $occupiedRelays = $property->getValue(); + $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); + $occupiedRelays[$id]->close(); + + $ids = []; + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + + // In this case there may be two different scenarios, which is why there are three tests basically doing the same + // In the first one, the disconnected relay was already discovered. In that case, an RPCException is thrown (unknown seq). + // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. + // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); + $discovered = !isset($property->getValue()[$id]); + + if ($discovered) { + $this->expectException(RPCException::class); + $this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER); + } else { + $this->expectException(TransportException::class); + $this->expectExceptionMessage('Unable to read payload from the stream'); + } + $this->rpc->getResponse($id); + } + + public function testHandleRelayDisconnectWithPressureForceDiscovered(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $occupiedRelays = $property->getValue(); + $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); + $occupiedRelays[$id]->close(); + + $ids = []; + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + + // In this case there may be two different scenarios, which is why there are three tests basically doing the same + // In the first one, the disconnected relay was already discovered. In that case, an RPCException is thrown (unknown seq). + // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. + // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); + $discovered = !isset($property->getValue()[$id]); + + if (!$discovered) { + $method = new ReflectionMethod(GoridgeMultiRPC::class, 'checkAllOccupiedRelaysStillConnected'); + $method->invoke($this->rpc); + } + + $this->expectException(RPCException::class); + $this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER); + $this->rpc->getResponse($id); + } + + public function testHandleRelayDisconnectWithPressureForceUndiscovered(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $occupiedProperty = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $occupiedRelays = $occupiedProperty->getValue(); + $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); + $occupiedRelays[$id]->close(); + + $ids = []; + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + + // In this case there may be two different scenarios, which is why there are three tests basically doing the same + // In the first one, the disconnected relay was already discovered. In that case, an RPCException is thrown (unknown seq). + // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. + // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. + $mapProperty = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); + $seqToRelayMap = $mapProperty->getValue(); + $discovered = !isset($seqToRelayMap[$id]); + + if ($discovered) { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $freeRelays = $property->getValue(); + $relay = array_pop($freeRelays); + $property->setValue($freeRelays); + assert($relay instanceof SocketRelay); + $relay->close(); + $seqToRelayMap[$id] = $relay; + $occupiedRelays[$id] = $relay; + $mapProperty->setValue($seqToRelayMap); + $occupiedProperty->setValue($occupiedRelays); + + + $this->expectException(RPCException::class); + $this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER); + } + + $this->expectException(TransportException::class); + $this->expectExceptionMessage('Unable to read payload from the stream'); + $this->rpc->getResponse($id); + } + + public function testHandleRelayDisconnectWithPressureGetResponses(): void + { + $ids = []; + $ids[] = $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $occupiedRelays = $property->getValue(); + $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); + $occupiedRelays[$id]->close(); + + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + $this->expectException(RPCException::class); + $this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER); + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + } + + /** + * This test checks whether relays are cloned correctly, or if they get shared between the cloned instances. + * Without cloning them explicitly they get shared and thus, when one RPC gets called, the freeRelays array + * in the other RPC stays the same, making it reuse the just-used and still occupied relay. + */ + public function testHandlesCloneCorrectly(): void + { + $this->rpc->preConnectRelays(); + + // This is to support the MsgPackMultiRPC Tests + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'codec'); + $codec = $property->getValue($this->rpc); + $clonedRpc = $this->rpc->withCodec($codec instanceof MsgpackCodec ? new JsonCodec() : new MsgpackCodec()); + + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + foreach ($property->getValue() as $relay) { + /** @var ConnectedRelayInterface $relay */ + $this->assertTrue($relay->isConnected()); + } + + $ids = []; + $clonedIds = []; + + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + + for ($i = 0; $i < 50; $i++) { + $clonedIds[] = $clonedRpc->callAsync('Service.Echo', 'Hello'); + } + // Wait 100ms for the response(s) + usleep(100 * 1000); + + // Can use wrong RPC for response (unfortunately, but there's no easy solution) + try { + $response = $this->rpc->getResponse($clonedIds[0]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'codec'); + + if ($property->getValue($this->rpc) instanceof MsgpackCodec) { + // Msgpack internally does not throw an error, only returns the encoded response because of course why + // would normal error handling be something that is important in a library. + // Locally this returned the number 34, but I'm not sure if there's some variation in that + // so we test on the expected response. + // This also notifies PHPUnit since msgpack logs a warning. + if ($response !== 'Hello') { + throw new CodecException("msgpack is a big meany"); + } + } + + $this->fail("Should've thrown an Exception due to wrong codec"); + } catch (CodecException $exception) { + $this->assertNotEmpty($exception->getMessage()); + } + + // The $seq should not be available anymore + try { + $response = $clonedRpc->getResponse($clonedIds[0]); + $this->fail("Should've thrown an exception due to wrong seq"); + } catch (RPCException $exception) { + $this->assertNotEmpty($exception->getMessage()); + } + + array_shift($clonedIds); + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + + foreach ($clonedRpc->getResponses($clonedIds) as $response) { + $this->assertSame('Hello', $response); + } + } + + public function testNeedsAtLeastOne(): void + { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $property->setValue([]); + $this->expectedNumberOfRelays = 0; + $this->expectException(RPCException::class); + $this->expectExceptionMessage("MultiRPC needs at least one relay. Zero provided."); + new GoridgeMultiRPC([]); + } + + public function testChecksIfResponseIsInRelay(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + // Wait a bit + usleep(100 * 1000); + + $this->assertTrue($this->rpc->hasResponse($id)); + } + + public function testChecksIfResponseIsInBuffer(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + // Wait a bit + usleep(100 * 1000); + $this->forceFlushRpc(); + + $this->assertTrue($this->rpc->hasResponse($id)); + } + + public function testChecksIfResponseIsNotReceivedYet(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->assertFalse($this->rpc->hasResponse($id)); + } + + public function testChecksMultipleResponses(): void + { + $ids = []; + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->forceFlushRpc(); + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + usleep(100 * 1000); + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + $responses = $this->rpc->hasResponses($ids); + $this->assertContains($ids[0], $responses); + $this->assertContains($ids[1], $responses); + $this->assertNotContains($ids[2], $responses); + } + + public function testHasResponsesReturnsEmptyArrayWhenNoResponses(): void + { + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->assertEmpty($this->rpc->hasResponses([$id])); + } + + public function testGetResponsesReturnsWhenNoRelaysAvailableToAvoidInfiniteLoop(): void + { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $property->setValue([]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $property->setValue([]); + $this->expectedNumberOfRelays = 0; + $this->expectException(RPCException::class); + $this->expectExceptionMessage("No relays available at all"); + $this->rpc->call('Service.Ping', 'ping'); + } + + public function testMultiRPCIsUsableWithOneRelay(): void + { + $this->makeRPC(1); + $this->rpc->callIgnoreResponse('Service.Ping', 'ping'); + $this->rpc->callIgnoreResponse('Service.SleepEcho', 'Hello'); + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->rpc->callIgnoreResponse('Service.Echo', 'Hello'); + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); + $this->assertSame('pong', $this->rpc->getResponse($id)); + } + + public function testThrowsWhenMixedRelaysProvided(): void + { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $property->setValue([]); + $this->expectedNumberOfRelays = 0; + $relays = [new StreamRelay(STDIN, STDOUT), $this->makeRelay()]; + $this->expectException(RPCException::class); + $this->expectExceptionMessage("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class); + new GoridgeMultiRPC($relays); + } + + public function testThrowsWhenRelaysDontMatchExistingOnes(): void + { + $relays = [new StreamRelay(STDIN, STDOUT)]; + $this->expectException(RPCException::class); + $this->expectExceptionMessage("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class); + new GoridgeMultiRPC($relays); + } + + protected function setUp(): void + { + $this->makeRPC(); + } + + protected function makeRPC(int $count = 10): void + { + // We need to manually clean the static properties between test runs. + // In an actual application this would never happen. + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $property->setValue([]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $property->setValue([]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); + $property->setValue([]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'asyncResponseBuffer'); + $property->setValue([]); + $type = self::SOCK_TYPE->value; + $address = self::SOCK_ADDR; + $port = self::SOCK_PORT; + $this->rpc = GoridgeMultiRPC::create("$type://$address:$port", $count); + $this->expectedNumberOfRelays = $count; + } + + /** + * @return RelayInterface + */ + protected function makeRelay(): RelayInterface + { + return new SocketRelay(static::SOCK_ADDR, static::SOCK_PORT, static::SOCK_TYPE); + } + + protected function tearDown(): void + { + $this->assertFreeRelaysCorrectNumber(); + } + + protected function assertFreeRelaysCorrectNumber(): void + { + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + $numberOfFreeRelays = count($property->getValue()); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $numberOfOccupiedRelays = count($property->getValue()); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); + $numberOfWaitingResponses = count($property->getValue()); + + $this->assertSame( + $this->expectedNumberOfRelays, + $numberOfFreeRelays + $numberOfOccupiedRelays, + "RPC has lost at least one relay! Waiting Responses: $numberOfWaitingResponses, Free Relays: $numberOfFreeRelays, Occupied Relays: $numberOfOccupiedRelays" + ); + } + + protected function forceFlushRpc(): void + { + // Force consuming relay by flooding requests + $ids = []; + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + foreach ($this->rpc->getResponses($ids) as $id => $response) { + $this->assertSame('pong', $response); + array_splice($ids, array_search($id, $ids, true), 1); + } + $this->assertEmpty($ids); + } +} diff --git a/tests/Goridge/MultiRelayHelperTest.php b/tests/Goridge/MultiRelayHelperTest.php new file mode 100644 index 0000000..1adeafe --- /dev/null +++ b/tests/Goridge/MultiRelayHelperTest.php @@ -0,0 +1,40 @@ +value; + $address = MultiRPC::SOCK_ADDR; + $port = MultiRPC::SOCK_PORT; + + $in = stream_socket_client("$type://$address:$port"); + $this->assertTrue(stream_set_blocking($in, true)); + $this->assertFalse(feof($in)); + $relays = [new StreamRelay($in, STDOUT), new StreamRelay($in, STDERR)]; + // No message available on STDIN, aka a read would block, so this returns false + $this->assertFalse(MultiRelayHelper::findRelayWithMessage($relays)); + fclose($in); + } + + public function testSupportsReadingFromStreamRelay(): void + { + $stream = fopen('php://temp', 'rw+'); + fwrite($stream, 'Hello'); + fseek($stream, 0); + $this->assertTrue(stream_set_blocking($stream, true)); + $this->assertFalse(feof($stream)); + $relays = [new StreamRelay($stream, STDOUT)]; + $this->assertCount(1, MultiRelayHelper::findRelayWithMessage($relays)); + fclose($stream); + } +} diff --git a/tests/Goridge/TCPMultiRPCTest.php b/tests/Goridge/TCPMultiRPCTest.php new file mode 100644 index 0000000..4c98cbe --- /dev/null +++ b/tests/Goridge/TCPMultiRPCTest.php @@ -0,0 +1,15 @@ +