Skip to content

Commit

Permalink
implement message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Mar 14, 2024
1 parent 6611fd5 commit 5626e4d
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 64 deletions.
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,39 @@ $client->ping(); // true
## Publish Subscribe

```php
// queue usage example
$queue = $client->subscribe('test_subject');

$client->publish('test_subject', 'hello');
$client->publish('test_subject', 'world');

// optional message fetch
// if there are no updates null will be returned
$message1 = $queue->fetch();
echo $message1->getPayload(); // hello

// locks untill message is fetched from subject
// to limit lock timeout, pass optional timeout value
$message2 = $queue->next();
echo $message2->getPayload(); // world

$client->publish('test_subject', 'hello');
$client->publish('test_subject', 'batching');

// batch message fetching, limit argument is optional
$messages = $queue->fetchAll(10);
echo count($messages);

// fetch all messages that are published to the subject client connection
// queue will stop message fetching when another subscription receives a message
// in advance you can time limit batch fetching
$queue->setLimit(1); // limit to 1 second
$messages = $queue->fetchAll();

// reset subscription
$client->unsubscribe($queue);

// callback hell example
$client->subscribe('hello', function ($message) {
var_dump('got message', $message); // tester
});
Expand Down Expand Up @@ -192,6 +225,46 @@ $goodbyer
// $goodbyer->interrupt();
});

// consumer can be used via queue interface
$queue = $goodbyer->getQueue();
while ($message = $queue->next()) {
if (rand(1, 10) % 2 == 0) {
mail($address, "See you later");
$message->ack();
} else {
// ack with 1 second timeout
$message->nack(1);
}
// stop processing
if (rand(1, 10) % 2 == 10) {
// don't forget to unsubscribe
$client->unsubscribe($queue);
break;
}
}

// use fetchAll method to batch process messages
// let's set batch size to 50
$queue = $goodbyer->setBatching(50)->create()->getQueue();

// fetching 100 messages provides 2 stream requests
// limit message fetching to 1 second
// it means no more that 100 messages would be fetched
$messages = $queue->setLimit(1)->fetchAll(100);

$recipients = [];
foreach ($messages as $message) {
$recipients[] = (string) $message->payload;
}

mail_to_all($recipients, "See you later");

// ack all messages
foreach ($messages as $message) {
$message->ack();
}


// you also can create ephemeral consumer
// the only thing that ephemeral consumer is created as soon as object is created
// you have to create full consumer configuration first
Expand Down
36 changes: 26 additions & 10 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,21 @@ public function request(string $name, mixed $payload, Closure $handler): self
return $this;
}

public function subscribe(string $name, Closure $handler): self
public function subscribe(string $name, ?Closure $handler = null): self|Queue
{
return $this->doSubscribe($name, null, $handler);
}

public function subscribeQueue(string $name, string $group, Closure $handler)
public function subscribeQueue(string $name, string $group, ?Closure $handler = null): self|Queue
{
return $this->doSubscribe($name, $group, $handler);
}

public function unsubscribe(string $name): self
public function unsubscribe(string|Queue $name): self
{
if ($name instanceof Queue) {
$name = $name->subject;
}
foreach ($this->subscriptions as $i => $subscription) {
if ($subscription['name'] == $name) {
unset($this->subscriptions[$i]);
Expand Down Expand Up @@ -162,7 +165,7 @@ public function setLogger(?LoggerInterface $logger): self
return $this;
}

public function process(null|int|float $timeout = 0, bool $reply = true)
public function process(null|int|float $timeout = 0, bool $reply = true): mixed
{
$message = $this->connection->getMessage($timeout);

Expand All @@ -173,20 +176,30 @@ public function process(null|int|float $timeout = 0, bool $reply = true)
}
throw new LogicException("No handler for message $message->sid");
}
$result = $this->handlers[$message->sid]($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$this->publish($message->replyTo, $result);
$handler = $this->handlers[$message->sid];
if ($handler instanceof Queue) {
$handler->handle($message);
return $handler;
} else {
$result = $handler($message->payload, $message->replyTo);
if ($reply && $message->replyTo) {
$message->reply($result);
}
return $result;
}
return $result;
} else {
return $message;
}
}

private function doSubscribe(string $subject, ?string $group, Closure $handler): self
private function doSubscribe(string $subject, ?string $group, ?Closure $handler = null): self|Queue
{
$sid = bin2hex(random_bytes(4));
$this->handlers[$sid] = $handler;
if ($handler == null) {
$this->handlers[$sid] = new Queue($this, $subject);
} else {
$this->handlers[$sid] = $handler;
}

$this->connection->sendMessage(new Subscribe([
'sid' => $sid,
Expand All @@ -199,6 +212,9 @@ private function doSubscribe(string $subject, ?string $group, Closure $handler):
'sid' => $sid,
];

if ($handler == null) {
return $this->handlers[$sid];
}
return $this;
}

Expand Down
100 changes: 51 additions & 49 deletions src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

namespace Basis\Nats\Consumer;

use Basis\Nats\Client;
use Basis\Nats\Queue;
use Basis\Nats\Message\Payload;
use Basis\Nats\Message\Publish;
use Closure;
use Basis\Nats\Client;
use Throwable;

class Consumer
{
Expand Down Expand Up @@ -96,9 +99,11 @@ public function getIterations(): int
return $this->iterations;
}

public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack = true): int
public function getQueue(): Queue
{
$requestSubject = '$JS.API.CONSUMER.MSG.NEXT.' . $this->getStream() . '.' . $this->getName();
$queueSubject = 'handler.' . bin2hex(random_bytes(4));
$queue = $this->client->subscribe($queueSubject);

$args = [
'batch' => $this->getBatching(),
];
Expand All @@ -111,62 +116,59 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack
$args['no_wait'] = true;
}

$handlerSubject = 'handler.' . bin2hex(random_bytes(4));
$launcher = new Publish([
'payload' => Payload::parse($args),
'replyTo' => $queue->subject,
'subject' => '$JS.API.CONSUMER.MSG.NEXT.' . $this->getStream() . '.' . $this->getName(),
]);

$runtime = new Runtime();

$this->create();

$this->client->subscribe($handlerSubject, function ($message, $replyTo) use ($handler, $runtime) {
if (!($message instanceof Payload)) {
return;
}

$kv_operation = $message->getHeader('KV-Operation');
$queue->setLauncher($launcher);
return $queue;
}

// Consuming deleted or purged messages must not stop processing messages as more
// messages might arrive after this.
if (!$message->isEmpty() || $kv_operation === 'DEL' || $kv_operation === 'PURGE') {
$runtime->empty = false;
$runtime->processed++;
public function handle(Closure $messageHandler, Closure $emptyHandler = null, bool $ack = true): int
{
$queue = $this->create()->getQueue();
$iterations = $this->getIterations();
$processed = 0;

if (!$message->isEmpty()) {
$handler($message, $replyTo);
while (!$this->interrupt && $iterations--) {
$messages = $queue->fetchAll($this->getBatching());
foreach ($messages as $message) {
$processed++;
$payload = $message->payload;
if ($payload->isEmpty()) {
if ($emptyHandler && !in_array($payload->getHeader('KV-Operation'), ['DEL', 'PURGE'])) {
$emptyHandler($payload, $message->replyTo);
}
continue;
}
}
});

$iteration = $this->getIterations();
while ($iteration--) {
$this->client->publish($requestSubject, $args, $handlerSubject);

foreach (range(1, $this->batch) as $_) {
$runtime->empty = true;
// expires request means that we should receive answer from stream
// consumer timeout can be more that client connection timeout
$this->client->process($this->expires ? PHP_INT_MAX : null, $ack);

if ($runtime->empty) {
if ($emptyHandler) {
$emptyHandler();
try {
$messageHandler($payload, $message->replyTo);
if ($ack) {
$message->ack();
}
break;
} catch (Throwable $e) {
if ($ack) {
$message->nack();
}
throw $e;
}
if ($this->interrupt) {
$this->interrupt = false;
break 2;
}
}

if ($this->interrupt) {
$this->interrupt = false;
break;
}

if ($iteration && $runtime->empty && !$expires) {
usleep((int) floor($this->getDelay() * 1_000_000));
if (!count($messages) && $emptyHandler) {
$emptyHandler();
if ($iterations) {
usleep((int) floor($this->getDelay() * 1_000_000));
}
}
}

$this->client->unsubscribe($handlerSubject);

return $runtime->processed;
$this->client->unsubscribe($queue);
return $processed;
}

public function info()
Expand Down
19 changes: 19 additions & 0 deletions src/Message/Ack.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Basis\Nats\Message;

class Ack extends Prototype
{
public string $subject;
public string $command = '+ACK';

public ?Payload $payload = null;

public function render(): string
{
$payload = ($this->payload ?: Payload::parse(''))->render();
return "PUB $this->subject $this->command $payload";
}
}
Loading

0 comments on commit 5626e4d

Please sign in to comment.