From fe82f676cadb88982b0416d28058a7cdc9f47827 Mon Sep 17 00:00:00 2001 From: Sheldon Reiff Date: Mon, 17 Oct 2022 15:14:43 -0400 Subject: [PATCH] Add failed jobs and requeue/delete commands --- composer.json | 1 + .../20221007202459_CreateFailedJobs.php | 58 +++++ docs/en/index.rst | 66 ++++++ phpunit.xml.dist | 7 + src/Command/PurgeFailedCommand.php | 128 +++++++++++ src/Command/RequeueCommand.php | 161 ++++++++++++++ src/Command/WorkerCommand.php | 7 +- src/Consumption/LimitAttemptsExtension.php | 15 +- src/Job/Message.php | 2 +- src/Listener/FailedJobsListener.php | 101 +++++++++ src/Model/Entity/FailedJob.php | 52 +++++ src/Model/Table/FailedJobsTable.php | 96 +++++++++ src/Plugin.php | 6 +- src/Queue/Processor.php | 3 +- src/QueueManager.php | 5 + tests/Fixture/FailedJobsFixture.php | 85 ++++++++ .../Command/PurgeFailedCommandTest.php | 120 +++++++++++ tests/TestCase/Command/RequeueCommandTest.php | 204 ++++++++++++++++++ .../LimitAttemptsExtensionTest.php | 20 ++ tests/TestCase/DebugLogTrait.php | 2 +- .../Listener/FailedJobsListenerTest.php | 95 ++++++++ tests/TestCase/Queue/ProcessorTest.php | 2 +- tests/bootstrap.php | 20 ++ .../src/Job/MaxAttemptsIsThreeJob.php | 3 +- 24 files changed, 1251 insertions(+), 8 deletions(-) create mode 100644 config/Migrations/20221007202459_CreateFailedJobs.php create mode 100644 src/Command/PurgeFailedCommand.php create mode 100644 src/Command/RequeueCommand.php create mode 100644 src/Listener/FailedJobsListener.php create mode 100644 src/Model/Entity/FailedJob.php create mode 100644 src/Model/Table/FailedJobsTable.php create mode 100644 tests/Fixture/FailedJobsFixture.php create mode 100644 tests/TestCase/Command/PurgeFailedCommandTest.php create mode 100644 tests/TestCase/Command/RequeueCommandTest.php create mode 100644 tests/TestCase/Listener/FailedJobsListenerTest.php diff --git a/composer.json b/composer.json index 0702200..0c5156f 100644 --- a/composer.json +++ b/composer.json @@ -20,6 +20,7 @@ "require": { "php": ">=7.2.0", "cakephp/cakephp": "^4.1", + "cakephp/migrations": "^3.1", "enqueue/simple-client": "^0.10", "psr/log": "^1.1 || ^2.0" }, diff --git a/config/Migrations/20221007202459_CreateFailedJobs.php b/config/Migrations/20221007202459_CreateFailedJobs.php new file mode 100644 index 0000000..a8ae8dc --- /dev/null +++ b/config/Migrations/20221007202459_CreateFailedJobs.php @@ -0,0 +1,58 @@ +table('queue_failed_jobs'); + $table->addColumn('class', 'string', [ + 'length' => 255, + 'null' => false, + 'default' => null, + ]) + ->addColumn('method', 'string', [ + 'length' => 255, + 'null' => false, + 'default' => null, + ]) + ->addColumn('data', 'text', [ + 'null' => false, + 'default' => null, + ]) + ->addColumn('config', 'string', [ + 'length' => 255, + 'null' => false, + 'default' => null, + ]) + ->addColumn('priority', 'string', [ + 'length' => 255, + 'null' => true, + 'default' => null, + ]) + ->addColumn('queue', 'string', [ + 'length' => 255, + 'null' => false, + 'default' => null, + ]) + ->addColumn('exception', 'text', [ + 'null' => true, + 'default' => null, + ]) + ->addColumn('created', 'datetime', [ + 'default' => null, + 'limit' => null, + 'null' => true, + ]) + ->create(); + } +} diff --git a/docs/en/index.rst b/docs/en/index.rst index 0387c25..bfb41b8 100644 --- a/docs/en/index.rst +++ b/docs/en/index.rst @@ -55,6 +55,9 @@ The following configuration should be present in the config array of your **conf // The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000 'receiveTimeout' => 10000, + + // Whether to store failed jobs in the queue_failed_jobs table. default: false + 'storeFailedJobs' => true, ] ], // ... @@ -62,6 +65,13 @@ The following configuration should be present in the config array of your **conf The ``Queue`` config key can contain one or more queue configurations. Each of these is used for interacting with a different queuing backend. +If ``storeFailedJobs`` is set to ``true``, make sure to run the plugin migrations to create the ``queue_failed_jobs`` table. + +.. code-block:: bash + + bin/cake migrations migrate --plugin Cake/Queue + + Usage ===== @@ -266,6 +276,62 @@ This shell can take a few different options: - Max Runtime - Runtime: Time since the worker started, the worker will finish when Runtime is over Max Runtime value +Failed Jobs +=========== + +By default, jobs that throw an exception are requeued indefinitely. However, if +``maxAttempts`` is configured on the job class or via a command line argument, a +job will be considered failed if a ``Processor::REQUEUE`` response is received +after processing (typically due to an exception being thrown) and there are no +remaining attempts. The job will then be rejected and added to the +``queue_failed_jobs`` table and can be requeued manually. + +Your chosen transport may offer a dead-letter queue feature. While Failed Jobs +has a similar purpose, it specifically captures jobs that return a +``Processor::REQUEUE`` response and does not handle other failure cases. It is +agnostic of transport and only supports database persistence. + +The following options passed when originally queueing the job will be preserved: +``config``, ``queue``, and ``priority``. + +Requeue Failed Jobs +------------------- + +Push jobs back onto the queue and remove them from the ``queue_failed_jobs`` +table. If a job fails to requeue it is not guaranteed that the job was not run. + +.. code-block:: bash + + bin/cake queue requeue + +Optional filters: + +- ``--id``: Requeue job by the ID of the ``FailedJob`` +- ``--class``: Requeue jobs by the job class +- ``--queue``: Requeue jobs by the queue the job was received on +- ``--config``: Requeue jobs by the config used to queue the job + +If no filters are provided then all failed jobs will be requeued. + +Purge Failed Jobs +------------------ + +Delete jobs from the ``queue_failed_jobs`` table. + +.. code-block:: bash + + bin/cake queue purge_failed + +Optional filters: + +- ``--id``: Purge job by the ID of the ``FailedJob`` +- ``--class``: Purge jobs by the job class +- ``--queue``: Purge jobs by the queue the job was received on +- ``--config``: Purge jobs by the config used to queue the job + +If no filters are provided then all failed jobs will be purged. + + Worker Events ============= diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 5ab59dd..3481e82 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -15,4 +15,11 @@ tests/TestCase + + + + + + + diff --git a/src/Command/PurgeFailedCommand.php b/src/Command/PurgeFailedCommand.php new file mode 100644 index 0000000..a6b7602 --- /dev/null +++ b/src/Command/PurgeFailedCommand.php @@ -0,0 +1,128 @@ +setDescription('Delete failed jobs.'); + + $parser->addArgument('ids', [ + 'required' => false, + 'help' => 'Delete jobs by the FailedJob ID (comma-separated).', + ]); + $parser->addOption('class', [ + 'help' => 'Delete jobs by the job class.', + ]); + $parser->addOption('queue', [ + 'help' => 'Delete jobs by the queue the job was received on.', + ]); + $parser->addOption('config', [ + 'help' => 'Delete jobs by the config used to queue the job.', + ]); + $parser->addOption('force', [ + 'help' => 'Automatically assume yes in response to confirmation prompt.', + 'short' => 'f', + 'boolean' => true, + ]); + + return $parser; + } + + /** + * @param \Cake\Console\Arguments $args Arguments + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @return void + */ + public function execute(Arguments $args, ConsoleIo $io) + { + /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */ + $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs'); + + $jobsToDelete = $failedJobsTable->find(); + + if ($args->hasArgument('ids')) { + $idsArg = $args->getArgument('ids'); + + if ($idsArg !== null) { + $ids = explode(',', $idsArg); + + $jobsToDelete->whereInList($failedJobsTable->aliasField('id'), $ids); + } + } + + if ($args->hasOption('class')) { + $jobsToDelete->where(['class' => $args->getOption('class')]); + } + + if ($args->hasOption('queue')) { + $jobsToDelete->where(['queue' => $args->getOption('queue')]); + } + + if ($args->hasOption('config')) { + $jobsToDelete->where(['config' => $args->getOption('config')]); + } + + $deletingCount = $jobsToDelete->count(); + + if (!$deletingCount) { + $io->out('0 jobs found.'); + + return; + } + + if (!$args->getOption('force')) { + $confirmed = $io->askChoice("Delete {$deletingCount} jobs?", ['y', 'n'], 'n'); + + if ($confirmed !== 'y') { + return; + } + } + + $io->out("Deleting {$deletingCount} jobs."); + + $failedJobsTable->deleteManyOrFail($jobsToDelete); + + $io->success("{$deletingCount} jobs deleted."); + } +} diff --git a/src/Command/RequeueCommand.php b/src/Command/RequeueCommand.php new file mode 100644 index 0000000..ea76d94 --- /dev/null +++ b/src/Command/RequeueCommand.php @@ -0,0 +1,161 @@ +setDescription('Requeue failed jobs.'); + + $parser->addArgument('ids', [ + 'required' => false, + 'help' => 'Requeue jobs by the FailedJob ID (comma-separated).', + ]); + $parser->addOption('class', [ + 'help' => 'Requeue jobs by the job class.', + ]); + $parser->addOption('queue', [ + 'help' => 'Requeue jobs by the queue the job was received on.', + ]); + $parser->addOption('config', [ + 'help' => 'Requeue jobs by the config used to queue the job.', + ]); + $parser->addOption('force', [ + 'help' => 'Automatically assume yes in response to confirmation prompt.', + 'short' => 'f', + 'boolean' => true, + ]); + + return $parser; + } + + /** + * @param \Cake\Console\Arguments $args Arguments + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @return void + */ + public function execute(Arguments $args, ConsoleIo $io) + { + /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */ + $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs'); + + $jobsToRequeue = $failedJobsTable->find(); + + if ($args->hasArgument('ids')) { + $idsArg = $args->getArgument('ids'); + + if ($idsArg !== null) { + $ids = explode(',', $idsArg); + + $jobsToRequeue->whereInList('id', $ids); + } + } + + if ($args->hasOption('class')) { + $jobsToRequeue->where(['class' => $args->getOption('class')]); + } + + if ($args->hasOption('queue')) { + $jobsToRequeue->where(['queue' => $args->getOption('queue')]); + } + + if ($args->hasOption('config')) { + $jobsToRequeue->where(['config' => $args->getOption('config')]); + } + + $requeueingCount = $jobsToRequeue->count(); + + if (!$requeueingCount) { + $io->out('0 jobs found.'); + + return; + } + + if (!$args->getOption('force')) { + $confirmed = $io->askChoice("Requeue {$requeueingCount} jobs?", ['y', 'n'], 'n'); + + if ($confirmed !== 'y') { + return; + } + } + + $io->out("Requeueing {$requeueingCount} jobs."); + + $succeededCount = 0; + $failedCount = 0; + + foreach ($jobsToRequeue as $failedJob) { + $io->verbose("Requeueing FailedJob with ID {$failedJob->id}."); + try { + QueueManager::push( + [$failedJob->class, $failedJob->method], + $failedJob->decoded_data, + [ + 'config' => $failedJob->config, + 'priority' => $failedJob->priority, + 'queue' => $failedJob->queue, + ] + ); + + $failedJobsTable->deleteOrFail($failedJob); + + $succeededCount++; + } catch (Exception $e) { + $io->err("Exception occurred while requeueing FailedJob with ID {$failedJob->id}"); + $io->err((string)$e); + + $failedCount++; + } + } + + if ($failedCount) { + $io->err("Failed to requeue {$failedCount} jobs."); + } + + if ($succeededCount) { + $io->success("{$succeededCount} jobs requeued."); + } + } +} diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 5e95860..ed335ff 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -24,6 +24,7 @@ use Cake\Log\Log; use Cake\Queue\Consumption\LimitAttemptsExtension; use Cake\Queue\Consumption\LimitConsumedMessagesExtension; +use Cake\Queue\Listener\FailedJobsListener; use Cake\Queue\Queue\Processor; use Cake\Queue\QueueManager; use DateTime; @@ -109,9 +110,13 @@ public function getOptionParser(): ConsoleOptionParser */ protected function getQueueExtension(Arguments $args, LoggerInterface $logger): ExtensionInterface { + $limitAttempsExtension = new LimitAttemptsExtension((int)$args->getOption('max-attempts') ?: null); + + $limitAttempsExtension->getEventManager()->on(new FailedJobsListener()); + $extensions = [ new LoggerExtension($logger), - new LimitAttemptsExtension((int)$args->getOption('max-attempts') ?: null), + $limitAttempsExtension, ]; if (!is_null($args->getOption('max-jobs'))) { diff --git a/src/Consumption/LimitAttemptsExtension.php b/src/Consumption/LimitAttemptsExtension.php index bc85281..c7d19f9 100644 --- a/src/Consumption/LimitAttemptsExtension.php +++ b/src/Consumption/LimitAttemptsExtension.php @@ -3,6 +3,7 @@ namespace Cake\Queue\Consumption; +use Cake\Event\EventDispatcherTrait; use Cake\Queue\Job\Message; use Enqueue\Consumption\Context\MessageResult; use Enqueue\Consumption\MessageResultExtensionInterface; @@ -11,6 +12,8 @@ class LimitAttemptsExtension implements MessageResultExtensionInterface { + use EventDispatcherTrait; + /** * The property key used to set the number of times a message was attempted. * @@ -41,7 +44,7 @@ public function __construct(?int $maxAttempts = null) */ public function onResult(MessageResult $context): void { - if ($context->getResult() !== Processor::REQUEUE) { + if ($context->getResult() != Processor::REQUEUE) { return; } @@ -58,10 +61,20 @@ public function onResult(MessageResult $context): void $attemptNumber = $message->getProperty(self::ATTEMPTS_PROPERTY, 0) + 1; if ($attemptNumber >= $maxAttempts) { + $originalResult = $context->getResult(); + $context->changeResult( Result::reject(sprintf('The maximum number of %d allowed attempts was reached.', $maxAttempts)) ); + $exception = $originalResult instanceof Result ? $originalResult->getReason() : null; + + $this->dispatchEvent( + 'Consumption.LimitAttemptsExtension.failed', + ['exception' => $exception, 'logger' => $context->getLogger()], + $jobMessage + ); + return; } diff --git a/src/Job/Message.php b/src/Job/Message.php index 200dcf6..de73f81 100644 --- a/src/Job/Message.php +++ b/src/Job/Message.php @@ -106,7 +106,7 @@ public function getCallable() * * @return array */ - protected function getTarget(): array + public function getTarget(): array { $target = $this->parsedBody['class'] ?? null; diff --git a/src/Listener/FailedJobsListener.php b/src/Listener/FailedJobsListener.php new file mode 100644 index 0000000..7e26fd2 --- /dev/null +++ b/src/Listener/FailedJobsListener.php @@ -0,0 +1,101 @@ + + */ + public function implementedEvents(): array + { + return [ + 'Consumption.LimitAttemptsExtension.failed' => 'storeFailedJob', + ]; + } + + /** + * @param \Cake\Event\EventInterface $event EventInterface + * @return void + */ + public function storeFailedJob(EventInterface $event): void + { + /** @var \Cake\Queue\Job\Message $jobMessage */ + $jobMessage = $event->getSubject(); + + [$class, $method] = $jobMessage->getTarget(); + + $originalMessage = $jobMessage->getOriginalMessage(); + + $originalMessageBody = json_decode($originalMessage->getBody(), true); + + ['data' => $data, 'requeueOptions' => $requeueOptions] = $originalMessageBody; + + $config = QueueManager::getConfig($requeueOptions['config']); + + if (!($config['storeFailedJobs'] ?? false)) { + return; + } + + /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */ + $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs'); + + $failedJob = $failedJobsTable->newEntity([ + 'class' => $class, + 'method' => $method, + 'data' => json_encode($data), + 'config' => $requeueOptions['config'], + 'priority' => $requeueOptions['priority'], + 'queue' => $requeueOptions['queue'], + 'exception' => $event->getData('exception'), + ]); + + try { + $failedJobsTable->saveOrFail($failedJob); + } catch (PersistenceFailedException $e) { + $logger = $event->getData('logger'); + + if (!$logger) { + throw new RuntimeException( + sprintf('`logger` was not defined on %s event.', $event->getName()), + 0, + $e + ); + } + + if (!($logger instanceof LoggerInterface)) { + throw new RuntimeException( + sprintf('`logger` is not an instance of `LoggerInterface` on %s event.', $event->getName()), + 0, + $e + ); + } + + $logger->error((string)$e); + } + } +} diff --git a/src/Model/Entity/FailedJob.php b/src/Model/Entity/FailedJob.php new file mode 100644 index 0000000..dcf6b90 --- /dev/null +++ b/src/Model/Entity/FailedJob.php @@ -0,0 +1,52 @@ + true` + * means that any field not defined in the map will be accessible by default + * + * @var array + */ + protected $_accessible = [ + 'class' => true, + 'method' => true, + 'data' => true, + 'config' => true, + 'priority' => true, + 'queue' => true, + 'exception' => true, + 'created' => true, + ]; + + /** + * @return array + */ + protected function _getDecodedData(): array + { + return json_decode($this->data, true); + } +} diff --git a/src/Model/Table/FailedJobsTable.php b/src/Model/Table/FailedJobsTable.php new file mode 100644 index 0000000..4358a0d --- /dev/null +++ b/src/Model/Table/FailedJobsTable.php @@ -0,0 +1,96 @@ +setTable('queue_failed_jobs'); + $this->setDisplayField('id'); + $this->setPrimaryKey('id'); + + $this->addBehavior('Timestamp'); + } + + /** + * Default validation rules. + * + * @param \Cake\Validation\Validator $validator Validator instance. + * @return \Cake\Validation\Validator + */ + public function validationDefault(Validator $validator): Validator + { + $validator + ->integer('id') + ->allowEmptyString('id', null, 'create'); + + $validator + ->scalar('class') + ->maxLength('class', 255) + ->requirePresence('class', 'create') + ->notEmptyString('class'); + + $validator + ->scalar('method') + ->maxLength('method', 255) + ->requirePresence('method', 'create') + ->notEmptyString('method'); + + $validator + ->scalar('data') + ->requirePresence('data', 'create') + ->notEmptyString('data'); + + $validator + ->scalar('config') + ->maxLength('config', 255) + ->notEmptyString('config'); + + $validator + ->scalar('priority') + ->maxLength('priority', 255) + ->allowEmptyString('priority'); + + $validator + ->scalar('queue') + ->maxLength('queue', 255) + ->notEmptyString('queue'); + + $validator + ->scalar('exception') + ->allowEmptyString('exception'); + + return $validator; + } +} diff --git a/src/Plugin.php b/src/Plugin.php index 20cfd91..ee938da 100644 --- a/src/Plugin.php +++ b/src/Plugin.php @@ -21,6 +21,8 @@ use Cake\Core\Configure; use Cake\Core\PluginApplicationInterface; use Cake\Queue\Command\JobCommand; +use Cake\Queue\Command\PurgeFailedCommand; +use Cake\Queue\Command\RequeueCommand; use Cake\Queue\Command\WorkerCommand; /** @@ -71,6 +73,8 @@ public function console(CommandCollection $commands): CommandCollection return $commands ->add('queue worker', WorkerCommand::class) - ->add('worker', WorkerCommand::class); + ->add('worker', WorkerCommand::class) + ->add('queue requeue', RequeueCommand::class) + ->add('queue purge_failed', PurgeFailedCommand::class); } } diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index b092f57..89d01a4 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -18,6 +18,7 @@ use Cake\Event\EventDispatcherTrait; use Cake\Queue\Job\Message; +use Enqueue\Consumption\Result; use Error; use Exception; use Interop\Queue\Context; @@ -78,7 +79,7 @@ public function process(QueueMessage $message, Context $context) 'exception' => $e, ]); - return InteropProcessor::REQUEUE; + return Result::requeue(sprintf('Exception occurred while processing message: %s', (string)$e)); } if ($response === InteropProcessor::ACK) { diff --git a/src/QueueManager.php b/src/QueueManager.php index 2c1c6b0..8f9de11 100644 --- a/src/QueueManager.php +++ b/src/QueueManager.php @@ -216,6 +216,11 @@ public static function push($className, array $data = [], array $options = []): 'class' => [$class, $method], 'args' => [$data], 'data' => $data, + 'requeueOptions' => [ + 'config' => $name, + 'priority' => $options['priority'] ?? null, + 'queue' => $queue, + ], ]); if (isset($options['delay'])) { diff --git a/tests/Fixture/FailedJobsFixture.php b/tests/Fixture/FailedJobsFixture.php new file mode 100644 index 0000000..2f0da38 --- /dev/null +++ b/tests/Fixture/FailedJobsFixture.php @@ -0,0 +1,85 @@ + ['type' => 'integer', 'length' => null, 'unsigned' => false, 'null' => false, 'default' => null, 'comment' => '', 'autoIncrement' => true, 'precision' => null], + 'class' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null], + 'method' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null], + 'data' => ['type' => 'text', 'length' => null, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null], + 'config' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], + 'priority' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], + 'queue' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], + 'exception' => ['type' => 'text', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], + 'created' => ['type' => 'datetime', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''], + '_constraints' => [ + 'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []], + ], + '_options' => [ + 'engine' => 'InnoDB', + ], + ]; + // phpcs:enable + /** + * Init method + * + * @return void + */ + public function init(): void + { + $this->records = [ + [ + 'id' => 1, + 'class' => LogToDebugJob::class, + 'method' => 'execute', + 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}', + 'config' => 'default', + 'priority' => null, + 'queue' => 'default', + 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.', + 'created' => '2022-10-11 18:42:29', + ], + [ + 'id' => 2, + 'class' => MaxAttemptsIsThreeJob::class, + 'method' => 'execute', + 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}', + 'config' => 'default', + 'priority' => null, + 'queue' => 'default', + 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.', + 'created' => '2022-10-11 18:42:29', + ], + [ + 'id' => 3, + 'class' => LogToDebugJob::class, + 'method' => 'execute', + 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}', + 'config' => 'alternate_config', + 'priority' => null, + 'queue' => 'alternate_queue', + 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.', + 'created' => '2022-10-11 18:42:29', + ], + ]; + parent::init(); + } +} diff --git a/tests/TestCase/Command/PurgeFailedCommandTest.php b/tests/TestCase/Command/PurgeFailedCommandTest.php new file mode 100644 index 0000000..af7a1b2 --- /dev/null +++ b/tests/TestCase/Command/PurgeFailedCommandTest.php @@ -0,0 +1,120 @@ +useCommandRunner(); + } + + public function testFailedJobsAreDeleted() + { + $this->exec('queue purge_failed', ['y']); + + $this->assertOutputContains('Deleting 3 jobs.'); + $this->assertOutputContains('3 jobs deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->all(); + + $this->assertCount(0, $results); + } + + public function testFailedJobsAreNotDeletedIfNotConfirmed() + { + $this->exec('queue purge_failed', ['n']); + + $this->assertOutputNotContains('Deleting'); + $this->assertOutputNotContains('deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->all(); + + $this->assertCount(3, $results); + } + + public function testFailedJobsAreDeletedById() + { + $this->exec('queue purge_failed 1,2 -f'); + + $this->assertOutputContains('Deleting 2 jobs.'); + $this->assertOutputContains('2 jobs deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray(); + + $this->assertCount(1, $results); + $this->assertSame(3, $results[0]->id); + } + + public function testFailedJobsAreDeletedByClass() + { + $class = LogToDebugJob::class; + $this->exec("queue purge_failed --class {$class} -f"); + + $this->assertOutputContains('Deleting 2 jobs.'); + $this->assertOutputContains('2 jobs deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray(); + + $this->assertCount(1, $results); + $this->assertSame(2, $results[0]->id); + } + + public function testFailedJobsAreDeletedByQueue() + { + $this->exec('queue purge_failed --queue alternate_queue -f'); + + $this->assertOutputContains('Deleting 1 jobs.'); + $this->assertOutputContains('1 jobs deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray(); + + $this->assertCount(2, $results); + $this->assertSame(1, $results[0]->id); + $this->assertSame(2, $results[1]->id); + } + + public function testFailedJobsAreDeletedByConfig() + { + $this->exec('queue purge_failed --config alternate_config -f'); + + $this->assertOutputContains('Deleting 1 jobs.'); + $this->assertOutputContains('1 jobs deleted.'); + + $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray(); + + $this->assertCount(2, $results); + $this->assertSame(1, $results[0]->id); + $this->assertSame(2, $results[1]->id); + } +} diff --git a/tests/TestCase/Command/RequeueCommandTest.php b/tests/TestCase/Command/RequeueCommandTest.php new file mode 100644 index 0000000..e4bc6a0 --- /dev/null +++ b/tests/TestCase/Command/RequeueCommandTest.php @@ -0,0 +1,204 @@ +useCommandRunner(); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . uniqid('queue'), + 'receiveTimeout' => 100, + ]; + Configure::write('Queue', ['default' => $config]); + } + + public function tearDown(): void + { + parent::tearDown(); + + Log::reset(); + + QueueManager::drop('default'); + } + + public function testJobsAreRequeued() + { + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + QueueManager::drop('default'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $this->exec('queue requeue --queue default', ['y']); + + $this->assertOutputContains('Requeueing 2 jobs.'); + $this->assertOutputContains('2 jobs requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + + $this->assertDebugLogContainsExactly('Debug job was run', 1); + $this->assertDebugLogContains('MaxAttemptsIsThreeJob is requeueing'); + } + + public function testJobsAreNotRequeuedIfNotConfirmed() + { + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + QueueManager::drop('default'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $this->exec('queue requeue --queue default', ['n']); + + $this->assertOutputNotContains('Requeueing'); + $this->assertOutputNotContains('requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0); + } + + public function testJobsAreRequeuedById() + { + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + QueueManager::drop('default'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob was run', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $this->exec('queue requeue 1,2 -f'); + + $this->assertOutputContains('Requeueing 2 jobs.'); + $this->assertOutputContains('2 jobs requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + + $this->assertDebugLogContainsExactly('Debug job was run', 1); + $this->assertDebugLogContains('MaxAttemptsIsThreeJob is requeueing'); + } + + public function testJobsAreRequeuedByClass() + { + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + QueueManager::drop('default'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $class = LogToDebugJob::class; + $this->exec("queue requeue --class {$class} --queue default -f"); + + $this->assertOutputContains('Requeueing 1 jobs.'); + $this->assertOutputContains('1 jobs requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --verbose'); + + $this->assertDebugLogContainsExactly('Debug job was run', 1); + } + + public function testJobsAreRequeuedByQueue() + { + $config = [ + 'queue' => 'alternate_queue', + 'url' => 'file:///' . TMP . DS . 'queue' . uniqid('queue'), + 'receiveTimeout' => 100, + ]; + Configure::write('Queue', ['alternate_config' => $config]); + + $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose'); + QueueManager::drop('alternate_config'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $this->exec('queue requeue --queue alternate_queue -f'); + + $this->assertOutputContains('Requeueing 1 jobs.'); + $this->assertOutputContains('1 jobs requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose'); + QueueManager::drop('alternate_config'); + + $this->assertDebugLogContains('Debug job was run'); + } + + public function testJobsAreRequeuedByConfig() + { + $config = [ + 'queue' => 'alternate_queue', + 'url' => 'file:///' . TMP . DS . 'queue' . uniqid('queue'), + 'receiveTimeout' => 100, + ]; + Configure::write('Queue', ['alternate_config' => $config]); + + $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose'); + QueueManager::drop('alternate_config'); + + $this->assertDebugLogContainsExactly('Debug job was run', 0); + + $this->cleanupConsoleTrait(); + $this->useCommandRunner(); + $this->exec('queue requeue --config alternate_config -f'); + + $this->assertOutputContains('Requeueing 1 jobs.'); + $this->assertOutputContains('1 jobs requeued.'); + + $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose'); + QueueManager::drop('alternate_config'); + + $this->assertDebugLogContains('Debug job was run'); + } +} diff --git a/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php b/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php index 41525d0..14f963f 100644 --- a/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php +++ b/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php @@ -3,6 +3,8 @@ namespace Cake\Queue\Test\TestCase\Job; +use Cake\Event\EventList; +use Cake\Event\EventManager; use Cake\Log\Log; use Cake\Queue\Consumption\LimitAttemptsExtension; use Cake\Queue\Consumption\LimitConsumedMessagesExtension; @@ -19,6 +21,13 @@ class LimitAttemptsExtensionTest extends TestCase { use DebugLogTrait; + public function setUp(): void + { + parent::setUp(); + + EventManager::instance()->setEventList(new EventList()); + } + /** * @beforeClass * @after @@ -41,6 +50,17 @@ public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet() $this->assertGreaterThanOrEqual(10, $count); } + public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded() + { + $consume = $this->setupQeueue(); + + QueueManager::push(MaxAttemptsIsThreeJob::class, []); + + $consume(); + + $this->assertEventFired('Consumption.LimitAttemptsExtension.failed'); + } + public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached() { $consume = $this->setupQeueue(); diff --git a/tests/TestCase/DebugLogTrait.php b/tests/TestCase/DebugLogTrait.php index d479f9e..e50006d 100644 --- a/tests/TestCase/DebugLogTrait.php +++ b/tests/TestCase/DebugLogTrait.php @@ -7,7 +7,7 @@ trait DebugLogTrait { - protected function assertDebugLogContains($expected, $times = null): void + protected function assertDebugLogContains($expected): void { $found = $this->debugLogCount($expected); diff --git a/tests/TestCase/Listener/FailedJobsListenerTest.php b/tests/TestCase/Listener/FailedJobsListenerTest.php new file mode 100644 index 0000000..182ad35 --- /dev/null +++ b/tests/TestCase/Listener/FailedJobsListenerTest.php @@ -0,0 +1,95 @@ + 'null:', + 'storeFailedJobs' => true, + ]); + } + + public function tearDown(): void + { + parent::tearDown(); + + QueueManager::drop('example_config'); + } + + public function testFailedJobIsAddedWhenEventIsFired() + { + $parsedBody = [ + 'class' => [LogToDebugJob::class, 'execute'], + 'data' => ['example_key' => 'example_value'], + 'requeueOptions' => [ + 'config' => 'example_config', + 'priority' => 'example_priority', + 'queue' => 'example_queue', + ], + ]; + $messageBody = json_encode($parsedBody); + $connectionFactory = new NullConnectionFactory(); + + $context = $connectionFactory->createContext(); + $originalMessage = new NullMessage($messageBody); + $message = new Message($originalMessage, $context); + + $event = new Event( + 'Consumption.LimitAttemptsExtension.failed', + $message, + ['exception' => 'some message'] + ); + + /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */ + $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs'); + $failedJobsTable->deleteAll(['1=1']); + + EventManager::instance()->on(new FailedJobsListener()); + EventManager::instance()->dispatch($event); + + $this->assertSame(1, $failedJobsTable->find()->count()); + + $failedJob = $failedJobsTable->find()->first(); + + $this->assertSame(LogToDebugJob::class, $failedJob->class); + $this->assertSame('execute', $failedJob->method); + $this->assertSame(json_encode(['example_key' => 'example_value']), $failedJob->data); + $this->assertSame('example_config', $failedJob->config); + $this->assertSame('example_priority', $failedJob->priority); + $this->assertSame('example_queue', $failedJob->queue); + $this->assertStringContainsString('some message', $failedJob->exception); + } +} diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index e586157..7f76af8 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -153,7 +153,7 @@ public function testProcessWillRequeueOnException() $processor->getEventManager()->setEventList($events); $result = $processor->process($queueMessage, $context); - $this->assertSame(InteropProcessor::REQUEUE, $result); + $this->assertEquals(InteropProcessor::REQUEUE, $result); } /** diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 4eae28d..66530a9 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -15,6 +15,7 @@ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ +use Cake\Cache\Cache; use Cake\Core\Configure; use Cake\Routing\Router; @@ -45,6 +46,25 @@ define('CONFIG', ROOT . DS . 'config' . DS); } +// phpcs:disable +@mkdir(CACHE); +@mkdir(CACHE . 'views'); +@mkdir(CACHE . 'models'); +// phpcs:enable + +Cache::setConfig([ + '_cake_core_' => [ + 'engine' => 'File', + 'prefix' => 'cake_core_', + 'serialize' => true, + ], + '_cake_model_' => [ + 'engine' => 'File', + 'prefix' => 'cake_model_', + 'serialize' => true, + ], +]); + Configure::write('debug', true); Configure::write('App', [ 'namespace' => 'TestApp', diff --git a/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php b/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php index 7aa60fc..030dea8 100644 --- a/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php +++ b/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php @@ -8,6 +8,7 @@ use Cake\Queue\Job\JobInterface; use Cake\Queue\Job\Message; use Interop\Queue\Processor; +use RuntimeException; class MaxAttemptsIsThreeJob implements JobInterface { @@ -23,7 +24,7 @@ public function execute(Message $message): ?string if (!$succeedAt || $succeedAt > $attemptNumber) { Log::debug('MaxAttemptsIsThreeJob is requeueing'); - return Processor::REQUEUE; + throw new RuntimeException('example MaxAttemptsIsThreeJob exception message'); } Log::debug('MaxAttemptsIsThreeJob was run');