Skip to content

Commit

Permalink
fix: commitAsync() instead of dropping calls to make sure RPC flushes…
Browse files Browse the repository at this point in the history
… buffers instead of "error"ing
  • Loading branch information
L3tum committed Feb 6, 2024
1 parent b0adca5 commit a641d8d
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/AsyncCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use DateInterval;
use RoadRunner\KV\DTO\V1\Response;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;
Expand Down Expand Up @@ -47,6 +48,7 @@ public function __construct(
* {@inheritDoc}
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteAsync(string $key): bool
{
Expand All @@ -62,14 +64,15 @@ public function deleteAsync(string $key): bool
* @psalm-param iterable<string> $keys
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteMultipleAsync(iterable $keys): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->callsInFlight = [];
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync('kv.Delete', $this->requestKeys($keys));
Expand All @@ -83,6 +86,7 @@ public function deleteMultipleAsync(iterable $keys): bool
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool
{
Expand All @@ -96,14 +100,15 @@ public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl =
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->callsInFlight = [];
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync(
Expand All @@ -116,15 +121,14 @@ public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl =

/**
* @throws KeyValueException
* @throws RPCException
*/
public function commitAsync(): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

try {
foreach ($this->callsInFlight as $seq) {
$this->rpc->getResponse($seq, Response::class);
}
$this->rpc->getResponses($this->callsInFlight, Response::class);
} catch (ServiceException $e) {
$message = str_replace(["\t", "\n"], ' ', $e->getMessage());

Expand Down

0 comments on commit a641d8d

Please sign in to comment.