Skip to content

Commit

Permalink
feat: Fetching all stored key value pairs in bucket.
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoffer Lindahl authored and nekufa committed Jan 17, 2024
1 parent 2eaf2fd commit 452786c
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Basis\Nats\Consumer;

use Basis\Nats\Message\Payload;
use Closure;
use Basis\Nats\Client;

Expand Down Expand Up @@ -115,10 +116,21 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack
$this->create();

$this->client->subscribe($handlerSubject, function ($message, $replyTo) use ($handler, $runtime) {
if (!$message->isEmpty()) {
if (!($message instanceof Payload)) {
return;
}

$kv_operation = $message->getHeader('KV-Operation');

// Consuming deleted or purged messages must not stop processing messages as more
// messages might arrive after this.
if (!$message->isEmpty() || $kv_operation === 'DEL' || $kv_operation === 'PURGE') {
$runtime->empty = false;
$runtime->processed++;
$handler($message, $replyTo);

if (!$message->isEmpty()) {
$handler($message, $replyTo);
}
}
});

Expand Down
34 changes: 34 additions & 0 deletions src/KeyValue/Bucket.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Basis\Nats\KeyValue;

use Basis\Nats\Client;
use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Stream\Stream;
use Basis\Nats\Message\Payload;

Expand Down Expand Up @@ -47,6 +48,39 @@ public function getEntry(string $key): ?Entry
return new Entry($this->name, $key, $value, $revision);
}

/**
* @return Entry[]
*/
public function getAll(): array
{
$entries = [];

$stream = $this->getStream();
if (!$stream->exists()) {
return $entries;
}

$stream_name = $stream->getName();
$configuration = new ConsumerConfiguration($stream_name);
$consumer = $stream->createEphemeralConsumer($configuration);
$subject_prefix_length = 1 + strlen(sprintf('$KV.%s', $this->name));

$consumer->handle(function (Payload $payload) use (&$entries, $subject_prefix_length) {
if ($payload->subject === null) {
return;
}

$key = substr($payload->subject, $subject_prefix_length);
$entries[] = new Entry('', $key, $payload->body, 0);
}, function () use ($consumer) {
$consumer->interrupt();
});

$consumer->delete();

return $entries;
}

public function getStatus(): Status
{
return new Status($this->name, $this->getStream()->info());
Expand Down
63 changes: 63 additions & 0 deletions tests/Functional/KeyValue/BucketTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Tests\Functional\KeyValue;

use Basis\Nats\KeyValue\Entry;
use Tests\FunctionalTestCase;

class BucketTest extends FunctionalTestCase
Expand Down Expand Up @@ -53,4 +54,66 @@ public function testBasics()

$this->assertCount(1, json_decode($bucket->get('service_handlers')));
}

public function testGetAll()
{
$bucket = $this->createClient()
->getApi()
->getBucket('test_bucket');

$this->assertSame(0, $bucket->getStatus()->values);

$kv_pairs = [
'KEY1' => 'value1',
'KEY2' => 'value2',
'KEY3' => 'value3',
];

foreach ($kv_pairs as $key => $value) {
$bucket->put($key, $value);
}

$this->assertSame(count($kv_pairs), $bucket->getStatus()->values);
$actual_entries = $this->entriesAsAssocArray($bucket->getAll());
$this->assertEquals($kv_pairs, $actual_entries);
}

public function testGetAllAfterPurge()
{
$bucket = $this->createClient()
->getApi()
->getBucket('test_bucket');

$this->assertSame(0, $bucket->getStatus()->values);

$bucket->put('KEY1', 'value1');
$bucket->purge('KEY1');

$kv_pairs = [
'KEY2' => 'value2',
'KEY3' => 'value3',
];

foreach ($kv_pairs as $key => $value) {
$bucket->put($key, $value);
}

$actual_entries = $this->entriesAsAssocArray($bucket->getAll());
$this->assertEquals($kv_pairs, $actual_entries);
}

/**
* @param Entry[] $entries
* @return array<string, string>
*/
private function entriesAsAssocArray(array $entries): array
{
$assoc = [];

foreach ($entries as $entry) {
$assoc[$entry->key] = $entry->value;
}

return $assoc;
}
}

0 comments on commit 452786c

Please sign in to comment.