From f91903a5fda8f5dfe6f401da7c665c6a2cac289c Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 19 May 2024 10:50:40 +0200 Subject: [PATCH] Make app work --- src/App.php | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/src/App.php b/src/App.php index 8b89910..80d3298 100644 --- a/src/App.php +++ b/src/App.php @@ -4,6 +4,9 @@ namespace Mammatus\Queue; +use Interop\Queue\Consumer; +use Interop\Queue\Context; +use Mammatus\LifeCycleEvents\Shutdown; use Mammatus\Queue\Contracts\Worker; use Mammatus\Queue\Contracts\Worker as WorkerContract; use Mammatus\Queue\Generated\AbstractList_; @@ -12,6 +15,7 @@ use React\EventLoop\Loop; use RuntimeException; use Throwable; +use WyriHaximus\Broadcast\Contracts\Listener; use WyriHaximus\PSR3\ContextLogger\ContextLogger; use function React\Async\async; @@ -19,14 +23,22 @@ use function React\Promise\all; use function WyriHaximus\React\futurePromise; -final class App extends AbstractList_ +final class App extends AbstractList_ implements Listener { + private bool $running = true; + public function __construct( private readonly ContainerInterface $container, + private readonly Context $context, private readonly LoggerInterface $logger, ) { } + public function stop(Shutdown $event): void + { + $this->running = false; + } + public function run(string $className): int { $promises = []; @@ -35,10 +47,7 @@ public function run(string $className): int continue; } - for ($i = 0; $i < $worker->concurrency; $i++) { - $this->logger->info('Starting consumer ' . $i . ' of ' . $worker->concurrency . ' for ' . $worker->class); - $promises[] = async(fn () => $this->consume($worker))(); - } + $promises[] = async(fn () => $this->setupConsumer($worker))(); } await(all($promises)); @@ -46,11 +55,25 @@ public function run(string $className): int return 0; } - private function consume(\Mammatus\Queue\Worker $worker): void + private function setupConsumer(\Mammatus\Queue\Worker $worker): int { $consumer = $this->context->createConsumer(new Queue($worker->queue)); $workerInstance = $this->container->get($worker->class); assert($workerInstance instanceof WorkerContract); + + $promises = []; + for ($i = 0; $i < $worker->concurrency; $i++) { + $this->logger->info('Starting consumer ' . $i . ' of ' . $worker->concurrency . ' for ' . $worker->class); + $promises[] = async(fn () => $this->consume($consumer, $workerInstance))(); + } + + await(all($promises)); + + return 0; + } + + private function consume(Consumer $consumer, WorkerContract $worker): void + { while ($this->running) { $message = $consumer->receiveNoWait(); if ($message === null) { @@ -59,7 +82,7 @@ private function consume(\Mammatus\Queue\Worker $worker): void } try { - $workerInstance->perform($message); + $worker->perform($message); $consumer->acknowledge($message); } catch (Throwable) { $consumer->reject($message);