From c2df1cc49fe377d0df1688865ad0f09dddccf17d Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sun, 5 Jan 2025 16:26:00 +0100 Subject: [PATCH] Introduce Producer --- bin/mammatus-queue | 2 +- composer.json | 5 +- composer.lock | 130 +++++++++++++----- etc/generated_templates/AbstractList.php.twig | 1 - etc/generated_templates/WorkQueueMap.php.twig | 28 ++++ etc/qa/phpcs.xml | 2 +- etc/qa/phpstan.neon | 5 + src/App.php | 54 ++++---- src/Composer/Plugin.php | 10 ++ src/Consumer.php | 37 +++-- src/Encoder/InvalidJSON.php | 11 ++ src/Encoder/InvalidJSONArray.php | 11 ++ src/Encoder/JSON.php | 40 ++++++ src/Generated/AbstractList.php | 15 +- src/Generated/WorkQueueMap.php | 28 ++++ src/Message.php | 13 ++ src/Producer.php | 44 ++++++ tests/AppTest.php | 32 +++-- tests/ConsumerFactory.php | 3 +- tests/ConsumerTest.php | 3 + 20 files changed, 370 insertions(+), 104 deletions(-) create mode 100644 etc/generated_templates/WorkQueueMap.php.twig create mode 100644 src/Encoder/InvalidJSON.php create mode 100644 src/Encoder/InvalidJSONArray.php create mode 100644 src/Encoder/JSON.php create mode 100644 src/Generated/WorkQueueMap.php create mode 100644 src/Message.php create mode 100644 src/Producer.php diff --git a/bin/mammatus-queue b/bin/mammatus-queue index 5dc35da..47ab9c7 100644 --- a/bin/mammatus-queue +++ b/bin/mammatus-queue @@ -15,5 +15,5 @@ use Mammatus\ContainerFactory; /** * Create and run that one cron job */ - exit((static fn (string $className): int => ContainerFactory::create()->get(App::class)->run($className))($className)); + exit((static fn (string $className): int => ContainerFactory::create()->get(App::class)->run($className)->value)($className)); })($argv[1]); diff --git a/composer.json b/composer.json index 2b39ebd..8aadf28 100644 --- a/composer.json +++ b/composer.json @@ -8,13 +8,13 @@ "ext-json": "^8.2", "composer-plugin-api": "^2.0", "eventsauce/object-hydrator": "^1.6.1", - "mammatus/app": "dev-master", + "mammatus/app": "dev-introduce-run-tooling", "mammatus/kubernetes-attributes": "^1", "mammatus/kubernetes-contracts": "^1", "mammatus/kubernetes-events": "^1", "mammatus/life-cycle-events": "^2", "mammatus/queue-attributes": "dev-main", - "mammatus/queue-contracts": "dev-main", + "mammatus/queue-contracts": "dev-introduce-producer", "psr/container": "^1.1.2", "psr/event-dispatcher": "^1.0", "psr/log": "^2", @@ -27,6 +27,7 @@ "wyrihaximus/broadcast-contracts": "^1.3", "wyrihaximus/generative-composer-plugin-tooling": "^1", "wyrihaximus/monolog-factory": "^2", + "wyrihaximus/psr-3-callable-throwable-logger": "^2.3", "wyrihaximus/psr-3-context-logger": "^2.0", "wyrihaximus/simple-twig": "^2.2.1", "wyrihaximus/ticking-promise": "^3.1" diff --git a/composer.lock b/composer.lock index 2241883..2ef2257 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "c47dd2ae897f349acc3227a60f4542d6", + "content-hash": "f2bc4a1683bef845c6cca20cff081b05", "packages": [ { "name": "bramus/ansi-php", @@ -104,16 +104,16 @@ }, { "name": "composer/ca-bundle", - "version": "1.5.4", + "version": "1.5.5", "source": { "type": "git", "url": "https://github.com/composer/ca-bundle.git", - "reference": "bc0593537a463e55cadf45fd938d23b75095b7e1" + "reference": "08c50d5ec4c6ced7d0271d2862dec8c1033283e6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/composer/ca-bundle/zipball/bc0593537a463e55cadf45fd938d23b75095b7e1", - "reference": "bc0593537a463e55cadf45fd938d23b75095b7e1", + "url": "https://api.github.com/repos/composer/ca-bundle/zipball/08c50d5ec4c6ced7d0271d2862dec8c1033283e6", + "reference": "08c50d5ec4c6ced7d0271d2862dec8c1033283e6", "shasum": "" }, "require": { @@ -160,7 +160,7 @@ "support": { "irc": "irc://irc.freenode.org/composer", "issues": "https://github.com/composer/ca-bundle/issues", - "source": "https://github.com/composer/ca-bundle/tree/1.5.4" + "source": "https://github.com/composer/ca-bundle/tree/1.5.5" }, "funding": [ { @@ -176,7 +176,7 @@ "type": "tidelift" } ], - "time": "2024-11-27T15:35:25+00:00" + "time": "2025-01-08T16:17:16+00:00" }, { "name": "composer/class-map-generator", @@ -1259,16 +1259,16 @@ }, { "name": "mammatus/app", - "version": "dev-master", + "version": "dev-introduce-run-tooling", "source": { "type": "git", "url": "https://github.com/MammatusPHP/app.git", - "reference": "2726084f6d021a6389ba741ca9a8058e0e927af0" + "reference": "bb959e34473ccbb7ce917d0d77fc79b358627ac5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/MammatusPHP/app/zipball/2726084f6d021a6389ba741ca9a8058e0e927af0", - "reference": "2726084f6d021a6389ba741ca9a8058e0e927af0", + "url": "https://api.github.com/repos/MammatusPHP/app/zipball/bb959e34473ccbb7ce917d0d77fc79b358627ac5", + "reference": "bb959e34473ccbb7ce917d0d77fc79b358627ac5", "shasum": "" }, "require": { @@ -1283,7 +1283,9 @@ "psr/container": "^1", "psr/event-dispatcher": "^1", "psr/log": "^2", + "react/async": "^4.3", "react/event-loop": "^1.5", + "react/promise": "^3.2", "thecodingmachine/safe": "^2", "wyrihaximus/broadcast": "^2.3.1", "wyrihaximus/broadcast-contracts": "^1.3", @@ -1299,7 +1301,6 @@ "mammatus/test-app": "dev-master", "wyrihaximus/test-utilities": "^5.6.0" }, - "default-branch": true, "bin": [ "bin/mammatus" ], @@ -1349,7 +1350,7 @@ "description": "🚂 Main entry point for all MammatusPHP applications", "support": { "issues": "https://github.com/MammatusPHP/app/issues", - "source": "https://github.com/MammatusPHP/app/tree/master" + "source": "https://github.com/MammatusPHP/app/tree/introduce-run-tooling" }, "funding": [ { @@ -1357,7 +1358,7 @@ "type": "github" } ], - "time": "2024-04-27T08:09:13+00:00" + "time": "2025-01-14T19:41:16+00:00" }, { "name": "mammatus/kubernetes-attributes", @@ -1603,16 +1604,16 @@ }, { "name": "mammatus/queue-contracts", - "version": "dev-main", + "version": "dev-introduce-producer", "source": { "type": "git", "url": "https://github.com/MammatusPHP/queue-contracts.git", - "reference": "46d11da5cdbc17d74969a3a356459a65c0d595c0" + "reference": "0c73f137a0ef5deb769715436397f6abb9ca69e1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/MammatusPHP/queue-contracts/zipball/46d11da5cdbc17d74969a3a356459a65c0d595c0", - "reference": "46d11da5cdbc17d74969a3a356459a65c0d595c0", + "url": "https://api.github.com/repos/MammatusPHP/queue-contracts/zipball/0c73f137a0ef5deb769715436397f6abb9ca69e1", + "reference": "0c73f137a0ef5deb769715436397f6abb9ca69e1", "shasum": "" }, "require": { @@ -1622,7 +1623,6 @@ "require-dev": { "wyrihaximus/test-utilities": "^5.6" }, - "default-branch": true, "type": "library", "autoload": { "psr-4": { @@ -1636,7 +1636,7 @@ "description": "📜 Queue contracts", "support": { "issues": "https://github.com/MammatusPHP/queue-contracts/issues", - "source": "https://github.com/MammatusPHP/queue-contracts/tree/main" + "source": "https://github.com/MammatusPHP/queue-contracts/tree/introduce-producer" }, "funding": [ { @@ -1644,7 +1644,7 @@ "type": "github" } ], - "time": "2025-01-03T13:58:24+00:00" + "time": "2025-01-10T21:51:40+00:00" }, { "name": "mindplay/composer-locator", @@ -4921,6 +4921,58 @@ ], "time": "2023-08-10T06:52:26+00:00" }, + { + "name": "wyrihaximus/psr-3-callable-throwable-logger", + "version": "2.3.1", + "source": { + "type": "git", + "url": "https://github.com/WyriHaximus/php-psr-3-callable-throwable-logger.git", + "reference": "3e58cabf93c88ebf4231fa2a32859ec8617883f8" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/WyriHaximus/php-psr-3-callable-throwable-logger/zipball/3e58cabf93c88ebf4231fa2a32859ec8617883f8", + "reference": "3e58cabf93c88ebf4231fa2a32859ec8617883f8", + "shasum": "" + }, + "require": { + "php": "^8 || ^7.4.7", + "psr/log": "^3 || ^2 || ^1", + "thecodingmachine/safe": "^2 || ^1.0 || ^0.1.16" + }, + "require-dev": { + "wyrihaximus/psr-3-utilities": "^1.0", + "wyrihaximus/test-utilities": "^5 || ^3.7.3" + }, + "type": "library", + "autoload": { + "psr-4": { + "WyriHaximus\\PSR3\\CallableThrowableLogger\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com" + } + ], + "description": "callable throwable decorator around a PSR-3 logger (to be used with react/promise and reactivex/rxphp)", + "support": { + "issues": "https://github.com/WyriHaximus/php-psr-3-callable-throwable-logger/issues", + "source": "https://github.com/WyriHaximus/php-psr-3-callable-throwable-logger/tree/2.3.1" + }, + "funding": [ + { + "url": "https://github.com/WyriHaximus", + "type": "github" + } + ], + "time": "2023-01-14T19:24:38+00:00" + }, { "name": "wyrihaximus/psr-3-context-logger", "version": "2.0.0", @@ -6645,16 +6697,16 @@ }, { "name": "ergebnis/phpstan-rules", - "version": "2.5.0", + "version": "2.5.2", "source": { "type": "git", "url": "https://github.com/ergebnis/phpstan-rules.git", - "reference": "ffbc24d0d7e7b0d2b45f524753c20d83a83f66de" + "reference": "2754afbaf4f31ec82aab1cc0e2fdd68130a974c8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ergebnis/phpstan-rules/zipball/ffbc24d0d7e7b0d2b45f524753c20d83a83f66de", - "reference": "ffbc24d0d7e7b0d2b45f524753c20d83a83f66de", + "url": "https://api.github.com/repos/ergebnis/phpstan-rules/zipball/2754afbaf4f31ec82aab1cc0e2fdd68130a974c8", + "reference": "2754afbaf4f31ec82aab1cc0e2fdd68130a974c8", "shasum": "" }, "require": { @@ -6664,16 +6716,18 @@ }, "require-dev": { "doctrine/orm": "^2.20.0 || ^3.3.0", - "ergebnis/composer-normalize": "^2.44.0", + "ergebnis/composer-normalize": "^2.45.0", "ergebnis/license": "^2.6.0", "ergebnis/php-cs-fixer-config": "^6.39.0", "ergebnis/phpunit-slow-test-detector": "^2.17.0", "nette/di": "^3.1.10", "nikic/php-parser": "^4.19.4", + "phpstan/extension-installer": "^1.4.3", "phpstan/phpstan-deprecation-rules": "^1.2.1", "phpstan/phpstan-strict-rules": "^1.6.1", "phpunit/phpunit": "^9.6.21", "psr/container": "^2.0.2", + "symfony/finder": "^5.4.45", "symfony/process": "^5.4.47" }, "type": "phpstan-extension", @@ -6711,7 +6765,7 @@ "security": "https://github.com/ergebnis/phpstan-rules/blob/main/.github/SECURITY.md", "source": "https://github.com/ergebnis/phpstan-rules" }, - "time": "2024-12-01T16:44:41+00:00" + "time": "2025-01-08T09:28:54+00:00" }, { "name": "ergebnis/phpunit-slow-test-detector", @@ -9772,16 +9826,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.14", + "version": "1.12.15", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "e73868f809e68fff33be961ad4946e2e43ec9e38" + "reference": "c91d4e8bc056f46cf653656e6f71004b254574d1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e73868f809e68fff33be961ad4946e2e43ec9e38", - "reference": "e73868f809e68fff33be961ad4946e2e43ec9e38", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/c91d4e8bc056f46cf653656e6f71004b254574d1", + "reference": "c91d4e8bc056f46cf653656e6f71004b254574d1", "shasum": "" }, "require": { @@ -9826,7 +9880,7 @@ "type": "github" } ], - "time": "2024-12-31T07:26:13+00:00" + "time": "2025-01-05T16:40:22+00:00" }, { "name": "phpstan/phpstan-deprecation-rules", @@ -10349,16 +10403,16 @@ }, { "name": "phpunit/phpunit", - "version": "10.5.40", + "version": "10.5.41", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "e6ddda95af52f69c1e0c7b4f977cccb58048798c" + "reference": "e76586fa3d49714f230221734b44892e384109d7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/e6ddda95af52f69c1e0c7b4f977cccb58048798c", - "reference": "e6ddda95af52f69c1e0c7b4f977cccb58048798c", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/e76586fa3d49714f230221734b44892e384109d7", + "reference": "e76586fa3d49714f230221734b44892e384109d7", "shasum": "" }, "require": { @@ -10430,7 +10484,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.40" + "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.41" }, "funding": [ { @@ -10446,7 +10500,7 @@ "type": "tidelift" } ], - "time": "2024-12-21T05:49:06+00:00" + "time": "2025-01-13T09:33:05+00:00" }, { "name": "psalm/plugin-mockery", diff --git a/etc/generated_templates/AbstractList.php.twig b/etc/generated_templates/AbstractList.php.twig index cdecdd5..11ad5dc 100644 --- a/etc/generated_templates/AbstractList.php.twig +++ b/etc/generated_templates/AbstractList.php.twig @@ -6,7 +6,6 @@ namespace Mammatus\Queue\Generated; use Mammatus\Queue\Worker; -// phpcs:disable /** * Autogenerated file, do not edit. Changes will be overwritten on the next composer (install|update) */ diff --git a/etc/generated_templates/WorkQueueMap.php.twig b/etc/generated_templates/WorkQueueMap.php.twig new file mode 100644 index 0000000..d3a15ce --- /dev/null +++ b/etc/generated_templates/WorkQueueMap.php.twig @@ -0,0 +1,28 @@ + '{{ worker.consumer.queue }}', +{% endfor %} + ]; + + final protected function lookUp(Work $work): string + { + if (array_key_exists($work::class, self::MAP)) { + return self::MAP[$work::class]; + } + + throw new \RuntimeException('Unknown work: ' . $work::class); + } +} diff --git a/etc/qa/phpcs.xml b/etc/qa/phpcs.xml index c677e35..9d59eea 100644 --- a/etc/qa/phpcs.xml +++ b/etc/qa/phpcs.xml @@ -9,7 +9,7 @@ ../../src ../../tests - src/Generated/* + src/Generated/Hydrator.php diff --git a/etc/qa/phpstan.neon b/etc/qa/phpstan.neon index d78dbf4..f849039 100644 --- a/etc/qa/phpstan.neon +++ b/etc/qa/phpstan.neon @@ -10,11 +10,16 @@ parameters: - message: '#Variable method call on Mammatus\\Queue\\Contracts\\Worker.#' path: ../../src/Consumer.php + - + message: '#In method \"Mammatus\\Queue\\Consumer::consume\", caught \"Throwable\" must be rethrown.#' + path: ../../src/Consumer.php ergebnis: noExtends: classesAllowedToBeExtended: - Composer\IO\NullIO - Mammatus\Queue\Generated\AbstractList + - Mammatus\Queue\Generated\WorkQueueMap + - RuntimeException includes: - ../../vendor/wyrihaximus/async-test-utilities/rules.neon diff --git a/src/App.php b/src/App.php index c812060..c4085c8 100644 --- a/src/App.php +++ b/src/App.php @@ -4,15 +4,14 @@ namespace Mammatus\Queue; +use Mammatus\ExitCode; use Mammatus\LifeCycleEvents\Shutdown; use Mammatus\Queue\Generated\AbstractList; +use Mammatus\Run; use Psr\Log\LoggerInterface; -use React\EventLoop\Loop; use Throwable; use WyriHaximus\Broadcast\Contracts\Listener; -use WyriHaximus\PSR3\ContextLogger\ContextLogger; -use function React\Async\async; use function React\Async\await; use function React\Promise\all; @@ -20,6 +19,7 @@ final class App extends AbstractList implements Listener { public function __construct( private readonly Consumer $consumer, + private readonly Run $run, private readonly LoggerInterface $logger, ) { } @@ -29,37 +29,33 @@ public function stop(Shutdown $event): void $this->consumer->close(); } - public function run(string $className): int + public function run(string $className): ExitCode { - $exitCode = 2; - async(function (string $className): int { - $logger = new ContextLogger($this->logger, ['worker' => $className]); - try { - $promises = []; - foreach ($this->workers() as $worker) { - if ($worker->class !== $className) { - continue; + return $this->run->execute( + /** @phpstan-ignore-next-line */ + function (string $className): ExitCode { + try { + $promises = []; + foreach ($this->workers() as $worker) { + if ($worker->class !== $className) { + continue; + } + + $promises[] = $this->consumer->setupConsumer($worker); } - $promises[] = $this->consumer->setupConsumer($worker); - } - - await(all($promises)); - - $exitCode = 0; - } catch (Throwable $throwable) { /** @phpstan-ignore-line */ - $logger->error('Worker errored: ' . $throwable->getMessage(), ['exception' => $throwable]); + await(all($promises)); - $exitCode = 1; - } + $exitCode = ExitCode::Success; + } catch (Throwable $throwable) { /** @phpstan-ignore-line */ + $this->logger->error('Worker errored: ' . $throwable->getMessage(), ['exception' => $throwable]); - return $exitCode; - })($className)->then(static function (int $resultingExitCode) use (&$exitCode): void { - $exitCode = $resultingExitCode; - }); - - Loop::run(); + $exitCode = ExitCode::Failure; + } - return $exitCode; + return $exitCode; + }, + $className, + ); } } diff --git a/src/Composer/Plugin.php b/src/Composer/Plugin.php index 62edefa..bf45987 100644 --- a/src/Composer/Plugin.php +++ b/src/Composer/Plugin.php @@ -61,6 +61,16 @@ public function compile(string $rootPath, ItemContract ...$items): void file_put_contents($installPathList, $classContentsList); /** @phpstan-ignore-line */ chmod($installPathList, 0664); /** @phpstan-ignore-line */ + $classContentsList = SimpleTwig::render( + file_get_contents( /** @phpstan-ignore-line */ + $rootPath . '/etc/generated_templates/WorkQueueMap.php.twig', + ), + ['workers' => $items], + ); + $installPathList = $rootPath . '/src/Generated/WorkQueueMap.php'; + file_put_contents($installPathList, $classContentsList); /** @phpstan-ignore-line */ + chmod($installPathList, 0664); /** @phpstan-ignore-line */ + $dtos = []; foreach ($items as $item) { if (! ($item instanceof Item)) { diff --git a/src/Consumer.php b/src/Consumer.php index f7d71c3..d9e5466 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -5,6 +5,7 @@ namespace Mammatus\Queue; use Interop\Queue as QueueInterop; +use Mammatus\Queue\Contracts\Encoder; use Mammatus\Queue\Contracts\Worker as WorkerContract; use Mammatus\Queue\Generated\AbstractList; use Mammatus\Queue\Generated\Hydrator; @@ -14,16 +15,16 @@ use RuntimeException; use Throwable; use WyriHaximus\Broadcast\Contracts\Listener; +use WyriHaximus\PSR3\CallableThrowableLogger\CallableThrowableLogger; +use WyriHaximus\PSR3\ContextLogger\ContextLogger; -use function is_array; -use function json_decode; use function React\Async\async; use function React\Async\await; use function React\Promise\all; use function React\Promise\Timer\sleep; use function WyriHaximus\React\futurePromise; -final class Consumer extends AbstractList implements Listener +final class Consumer implements Listener { private bool $running = true; @@ -31,6 +32,7 @@ public function __construct( private readonly ContainerInterface $container, private readonly QueueInterop\Context $context, private readonly Hydrator $hydrator, + private readonly Encoder $encoder, private readonly LoggerInterface $logger, ) { } @@ -43,6 +45,10 @@ public function close(): void /** @return PromiseInterface */ public function setupConsumer(Worker $worker): PromiseInterface { + $this->logger->debug('Setting up logger for ' . $worker->class); + $logger = new ContextLogger($this->logger, ['worker' => $worker->class, 'method' => $worker->method]); + + $this->logger->debug('Getting worker instance for ' . $worker->class); $workerInstance = $this->container->get($worker->class); if (! ($workerInstance instanceof WorkerContract)) { throw new RuntimeException('Worker instance must be instance of ' . WorkerContract::class); @@ -51,15 +57,16 @@ public function setupConsumer(Worker $worker): PromiseInterface $promises = [ sleep(0.1), ]; + $this->logger->debug('Starting ' . $worker->concurrency . ' workers for ' . $worker->class); for ($i = 0; $i < $worker->concurrency; $i++) { $this->logger->info('Starting consumer ' . $i . ' of ' . $worker->concurrency . ' for ' . $worker->class); - $promises[] = async(fn () => $this->consume($worker, $workerInstance))(); + $promises[] = async(fn () => $this->consume($worker, $workerInstance, new ContextLogger($logger, ['fiber' => $i])))(); } return all($promises); } - private function consume(Worker $worker, WorkerContract $workerInstance): void + private function consume(Worker $worker, WorkerContract $workerInstance, LoggerInterface $logger): void { $consumer = $this->context->createConsumer(new Queue($worker->queue)); while ($this->running) { @@ -69,20 +76,24 @@ private function consume(Worker $worker, WorkerContract $workerInstance): void continue; } + $logger = new ContextLogger($logger, ['dtoClass' => $worker->dtoClass]); + try { - $json = json_decode($message->getBody(), true); - if (! is_array($json)) { - throw new RuntimeException('Message is not valid JSON'); - } + $this->logger->debug('Hydrating message'); + $dto = $this->hydrator->hydrateObject( + $worker->dtoClass, + $this->encoder->decode($message->getBody()), + ); - $dto = $this->hydrator->hydrateObject($worker->dtoClass, $json); + $this->logger->debug('Invoking worker'); $workerInstance->{$worker->method}($dto); + + $this->logger->debug('Acknowledging message'); $consumer->acknowledge($message); } catch (Throwable $error) { + $this->logger->debug('Rejecting message'); $consumer->reject($message); - $this->close(); - - throw $error; + CallableThrowableLogger::create($logger)($error); } } } diff --git a/src/Encoder/InvalidJSON.php b/src/Encoder/InvalidJSON.php new file mode 100644 index 0000000..216f571 --- /dev/null +++ b/src/Encoder/InvalidJSON.php @@ -0,0 +1,11 @@ + $json */ + $json = json_decode($payload, true); + /** @phpstan-ignore-next-line */ + if (! is_array($json)) { + throw new InvalidJSON('Message is not valid JSON: ' . json_last_error_msg()); + } + + return $json; + } +} diff --git a/src/Generated/AbstractList.php b/src/Generated/AbstractList.php index 1d4340b..1d72403 100644 --- a/src/Generated/AbstractList.php +++ b/src/Generated/AbstractList.php @@ -4,17 +4,18 @@ namespace Mammatus\Queue\Generated; +use Mammatus\Queue\BuildIn\EmptyMessage; +use Mammatus\Queue\BuildIn\Noop; use Mammatus\Queue\Worker; -// phpcs:disable +use function json_decode; + /** * Autogenerated file, do not edit. Changes will be overwritten on the next composer (install|update) */ abstract class AbstractList { - /** - * @return iterable - */ + /** @return iterable */ final protected function workers(): iterable { /** @see \Mammatus\Queue\BuildIn\Noop */ @@ -22,10 +23,10 @@ final protected function workers(): iterable 'internal', 'noop', 1, - \Mammatus\Queue\BuildIn\Noop::class, + Noop::class, 'perform', - \Mammatus\Queue\BuildIn\EmptyMessage::class, - \json_decode('[]', true), /** @phpstan-ignore-line */ + EmptyMessage::class, + json_decode('[]', true), /** @phpstan-ignore-line */ ); } } diff --git a/src/Generated/WorkQueueMap.php b/src/Generated/WorkQueueMap.php new file mode 100644 index 0000000..c43cacb --- /dev/null +++ b/src/Generated/WorkQueueMap.php @@ -0,0 +1,28 @@ + 'noop']; + + final protected function lookUp(Work $work): string + { + if (array_key_exists($work::class, self::MAP)) { + return self::MAP[$work::class]; + } + + throw new RuntimeException('Unknown work: ' . $work::class); + } +} diff --git a/src/Message.php b/src/Message.php new file mode 100644 index 0000000..3fd159d --- /dev/null +++ b/src/Message.php @@ -0,0 +1,13 @@ + $array */ + $array = $this->hydrator->serializeObject($work); + /** @phpstan-ignore-next-line */ + if (! is_array($array)) { + throw new RuntimeException('Message is translated into an array but ' . gettype($array) . ' instead'); + } + + $message = new Message(); + $message->setBody($this->encoder->encode($array)); + $message->setHeaders([]); + $this->producer->send( + new Queue($this->lookUp($work)), + $message, + ); + } +} diff --git a/tests/AppTest.php b/tests/AppTest.php index b94201b..1b9af5c 100644 --- a/tests/AppTest.php +++ b/tests/AppTest.php @@ -4,9 +4,11 @@ namespace Mammatus\Tests\Queue; +use Mammatus\ExitCode; use Mammatus\Queue\App; use Mammatus\Queue\BuildIn\Noop; use Mammatus\Queue\Contracts\Worker as WorkerContract; +use Mammatus\Run; use React\EventLoop\Loop; use RuntimeException; use WyriHaximus\AsyncTestUtilities\AsyncTestCase; @@ -27,11 +29,15 @@ public function runHappy(): void $container->expects('get')->with(Noop::class)->once()->andReturn(new Noop()); + $logger->expects('debug')->with('Loop execution starting')->atLeast()->once(); + $logger->expects('debug')->with('Loop execution running')->atLeast()->once(); $logger->expects('info')->with('Starting consumer 0 of 1 for ' . Noop::class)->atLeast()->once(); + $logger->expects('debug')->with('Loop execution ended')->atLeast()->once(); + $logger->expects('debug')->with('Execution completed with exit code: Success')->atLeast()->once(); - $exitCode = (new App($consumer, $logger))->run(Noop::class); + $exitCode = (new App($consumer, new Run($logger), $logger))->run(Noop::class); - self::assertSame(0, $exitCode); + self::assertSame(ExitCode::Success, $exitCode); } /** @test */ @@ -45,11 +51,15 @@ public function runAngry(): void $exception = new RuntimeException('Ik ben boos!'); $container->expects('get')->with(Noop::class)->once()->andReturn(new Angry($exception)); + $logger->expects('debug')->with('Loop execution starting')->atLeast()->once(); + $logger->expects('debug')->with('Loop execution running')->atLeast()->once(); $logger->expects('info')->with('Starting consumer 0 of 1 for ' . Noop::class)->atLeast()->once(); + $logger->expects('debug')->with('Loop execution ended')->atLeast()->once(); + $logger->expects('debug')->with('Execution completed with exit code: Success')->atLeast()->once(); - $exitCode = (new App($consumer, $logger))->run(Noop::class); + $exitCode = (new App($consumer, new Run($logger), $logger))->run(Noop::class); - self::assertSame(0, $exitCode); + self::assertSame(ExitCode::Success, $exitCode); } /** @test */ @@ -59,11 +69,9 @@ public function notAnWorker(): void $container->expects('get')->with(Noop::class)->atLeast()->once()->andReturn(new Sad()); - $logger->expects('log')->withArgs(static function (string $type, string $error, array $context): bool { - if ($type !== 'error') { - return false; - } - + $logger->expects('debug')->with('Loop execution starting')->atLeast()->once(); + $logger->expects('debug')->with('Loop execution running')->atLeast()->once(); + $logger->expects('error')->withArgs(static function (string $error, array $context): bool { if ($error !== 'Worker errored: Worker instance must be instance of ' . WorkerContract::class) { return false; } @@ -71,9 +79,11 @@ public function notAnWorker(): void return array_key_exists('exception', $context) && $context['exception']->getMessage() === 'Worker instance must be instance of ' . WorkerContract::class; })->atLeast()->once(); $logger->expects('info')->with('Starting consumer 0 of 1 for ' . Sad::class)->never(); + $logger->expects('debug')->with('Loop execution ended')->atLeast()->once(); + $logger->expects('debug')->with('Execution completed with exit code: Failure')->atLeast()->once(); - $exitCode = (new App($consumer, $logger))->run(Noop::class); + $exitCode = (new App($consumer, new Run($logger), $logger))->run(Noop::class); - self::assertSame(1, $exitCode); + self::assertSame(ExitCode::Failure, $exitCode); } } diff --git a/tests/ConsumerFactory.php b/tests/ConsumerFactory.php index 857db56..873aeeb 100644 --- a/tests/ConsumerFactory.php +++ b/tests/ConsumerFactory.php @@ -7,6 +7,7 @@ use Interop\Queue as QueueInterop; use Interop\Queue\Queue; use Mammatus\Queue\Consumer; +use Mammatus\Queue\Encoder\JSON; use Mammatus\Queue\Generated\Hydrator; use Mockery; use Psr\Container\ContainerInterface; @@ -32,7 +33,7 @@ public static function create(bool $createConsumerExpected): array return true; })->between($createConsumerExpected ? 1 : 0, PHP_INT_MAX)->andReturn($consumerInternal); - $consumer = new Consumer($container, $context, new Hydrator(), $logger); + $consumer = new Consumer($container, $context, new Hydrator(), new JSON(), $logger); return [ $consumer, diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php index dd52f3e..7fc2560 100644 --- a/tests/ConsumerTest.php +++ b/tests/ConsumerTest.php @@ -23,6 +23,8 @@ public function consumeHappy(): void { [$consumer, $container, $context, $internalConsumer, $logger] = ConsumerFactory::create(ConsumerFactory::CREATE_CONSUMER_EXPECTED); $container->expects('get')->with(Noop::class)->once()->andReturn(new Noop()); +// $logger->expects('log')->with('debug', 'Loop execution starting')->atLeast()->once(); + $logger->expects('log')->with('debug', 'Loop execution starting')->never(); $logger->expects('info')->with('Starting consumer 0 of 1 for ' . Noop::class)->atLeast()->once(); $message = new Message(); @@ -52,6 +54,7 @@ public function invalidJson(): void [$consumer, $container, $context, $internalConsumer, $logger] = ConsumerFactory::create(ConsumerFactory::CREATE_CONSUMER_EXPECTED); $container->expects('get')->with(Noop::class)->once()->andReturn(new Noop()); + $logger->expects('log')->with('debug', 'Loop execution starting')->never(); $logger->expects('info')->with('Starting consumer 0 of 1 for ' . Noop::class)->atLeast()->once(); $message = new Message();