diff --git a/composer.json b/composer.json
index 0702200..0c5156f 100644
--- a/composer.json
+++ b/composer.json
@@ -20,6 +20,7 @@
"require": {
"php": ">=7.2.0",
"cakephp/cakephp": "^4.1",
+ "cakephp/migrations": "^3.1",
"enqueue/simple-client": "^0.10",
"psr/log": "^1.1 || ^2.0"
},
diff --git a/config/Migrations/20221007202459_CreateFailedJobs.php b/config/Migrations/20221007202459_CreateFailedJobs.php
new file mode 100644
index 0000000..a8ae8dc
--- /dev/null
+++ b/config/Migrations/20221007202459_CreateFailedJobs.php
@@ -0,0 +1,58 @@
+table('queue_failed_jobs');
+ $table->addColumn('class', 'string', [
+ 'length' => 255,
+ 'null' => false,
+ 'default' => null,
+ ])
+ ->addColumn('method', 'string', [
+ 'length' => 255,
+ 'null' => false,
+ 'default' => null,
+ ])
+ ->addColumn('data', 'text', [
+ 'null' => false,
+ 'default' => null,
+ ])
+ ->addColumn('config', 'string', [
+ 'length' => 255,
+ 'null' => false,
+ 'default' => null,
+ ])
+ ->addColumn('priority', 'string', [
+ 'length' => 255,
+ 'null' => true,
+ 'default' => null,
+ ])
+ ->addColumn('queue', 'string', [
+ 'length' => 255,
+ 'null' => false,
+ 'default' => null,
+ ])
+ ->addColumn('exception', 'text', [
+ 'null' => true,
+ 'default' => null,
+ ])
+ ->addColumn('created', 'datetime', [
+ 'default' => null,
+ 'limit' => null,
+ 'null' => true,
+ ])
+ ->create();
+ }
+}
diff --git a/docs/en/index.rst b/docs/en/index.rst
index 0387c25..bfb41b8 100644
--- a/docs/en/index.rst
+++ b/docs/en/index.rst
@@ -55,6 +55,9 @@ The following configuration should be present in the config array of your **conf
// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
'receiveTimeout' => 10000,
+
+ // Whether to store failed jobs in the queue_failed_jobs table. default: false
+ 'storeFailedJobs' => true,
]
],
// ...
@@ -62,6 +65,13 @@ The following configuration should be present in the config array of your **conf
The ``Queue`` config key can contain one or more queue configurations. Each of
these is used for interacting with a different queuing backend.
+If ``storeFailedJobs`` is set to ``true``, make sure to run the plugin migrations to create the ``queue_failed_jobs`` table.
+
+.. code-block:: bash
+
+ bin/cake migrations migrate --plugin Cake/Queue
+
+
Usage
=====
@@ -266,6 +276,62 @@ This shell can take a few different options:
- Max Runtime
- Runtime: Time since the worker started, the worker will finish when Runtime is over Max Runtime value
+Failed Jobs
+===========
+
+By default, jobs that throw an exception are requeued indefinitely. However, if
+``maxAttempts`` is configured on the job class or via a command line argument, a
+job will be considered failed if a ``Processor::REQUEUE`` response is received
+after processing (typically due to an exception being thrown) and there are no
+remaining attempts. The job will then be rejected and added to the
+``queue_failed_jobs`` table and can be requeued manually.
+
+Your chosen transport may offer a dead-letter queue feature. While Failed Jobs
+has a similar purpose, it specifically captures jobs that return a
+``Processor::REQUEUE`` response and does not handle other failure cases. It is
+agnostic of transport and only supports database persistence.
+
+The following options passed when originally queueing the job will be preserved:
+``config``, ``queue``, and ``priority``.
+
+Requeue Failed Jobs
+-------------------
+
+Push jobs back onto the queue and remove them from the ``queue_failed_jobs``
+table. If a job fails to requeue it is not guaranteed that the job was not run.
+
+.. code-block:: bash
+
+ bin/cake queue requeue
+
+Optional filters:
+
+- ``--id``: Requeue job by the ID of the ``FailedJob``
+- ``--class``: Requeue jobs by the job class
+- ``--queue``: Requeue jobs by the queue the job was received on
+- ``--config``: Requeue jobs by the config used to queue the job
+
+If no filters are provided then all failed jobs will be requeued.
+
+Purge Failed Jobs
+------------------
+
+Delete jobs from the ``queue_failed_jobs`` table.
+
+.. code-block:: bash
+
+ bin/cake queue purge_failed
+
+Optional filters:
+
+- ``--id``: Purge job by the ID of the ``FailedJob``
+- ``--class``: Purge jobs by the job class
+- ``--queue``: Purge jobs by the queue the job was received on
+- ``--config``: Purge jobs by the config used to queue the job
+
+If no filters are provided then all failed jobs will be purged.
+
+
Worker Events
=============
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 5ab59dd..3481e82 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -15,4 +15,11 @@
tests/TestCase
+
+
+
+
+
+
+
diff --git a/src/Command/PurgeFailedCommand.php b/src/Command/PurgeFailedCommand.php
new file mode 100644
index 0000000..a6b7602
--- /dev/null
+++ b/src/Command/PurgeFailedCommand.php
@@ -0,0 +1,128 @@
+setDescription('Delete failed jobs.');
+
+ $parser->addArgument('ids', [
+ 'required' => false,
+ 'help' => 'Delete jobs by the FailedJob ID (comma-separated).',
+ ]);
+ $parser->addOption('class', [
+ 'help' => 'Delete jobs by the job class.',
+ ]);
+ $parser->addOption('queue', [
+ 'help' => 'Delete jobs by the queue the job was received on.',
+ ]);
+ $parser->addOption('config', [
+ 'help' => 'Delete jobs by the config used to queue the job.',
+ ]);
+ $parser->addOption('force', [
+ 'help' => 'Automatically assume yes in response to confirmation prompt.',
+ 'short' => 'f',
+ 'boolean' => true,
+ ]);
+
+ return $parser;
+ }
+
+ /**
+ * @param \Cake\Console\Arguments $args Arguments
+ * @param \Cake\Console\ConsoleIo $io ConsoleIo
+ * @return void
+ */
+ public function execute(Arguments $args, ConsoleIo $io)
+ {
+ /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
+ $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
+
+ $jobsToDelete = $failedJobsTable->find();
+
+ if ($args->hasArgument('ids')) {
+ $idsArg = $args->getArgument('ids');
+
+ if ($idsArg !== null) {
+ $ids = explode(',', $idsArg);
+
+ $jobsToDelete->whereInList($failedJobsTable->aliasField('id'), $ids);
+ }
+ }
+
+ if ($args->hasOption('class')) {
+ $jobsToDelete->where(['class' => $args->getOption('class')]);
+ }
+
+ if ($args->hasOption('queue')) {
+ $jobsToDelete->where(['queue' => $args->getOption('queue')]);
+ }
+
+ if ($args->hasOption('config')) {
+ $jobsToDelete->where(['config' => $args->getOption('config')]);
+ }
+
+ $deletingCount = $jobsToDelete->count();
+
+ if (!$deletingCount) {
+ $io->out('0 jobs found.');
+
+ return;
+ }
+
+ if (!$args->getOption('force')) {
+ $confirmed = $io->askChoice("Delete {$deletingCount} jobs?", ['y', 'n'], 'n');
+
+ if ($confirmed !== 'y') {
+ return;
+ }
+ }
+
+ $io->out("Deleting {$deletingCount} jobs.");
+
+ $failedJobsTable->deleteManyOrFail($jobsToDelete);
+
+ $io->success("{$deletingCount} jobs deleted.");
+ }
+}
diff --git a/src/Command/RequeueCommand.php b/src/Command/RequeueCommand.php
new file mode 100644
index 0000000..ea76d94
--- /dev/null
+++ b/src/Command/RequeueCommand.php
@@ -0,0 +1,161 @@
+setDescription('Requeue failed jobs.');
+
+ $parser->addArgument('ids', [
+ 'required' => false,
+ 'help' => 'Requeue jobs by the FailedJob ID (comma-separated).',
+ ]);
+ $parser->addOption('class', [
+ 'help' => 'Requeue jobs by the job class.',
+ ]);
+ $parser->addOption('queue', [
+ 'help' => 'Requeue jobs by the queue the job was received on.',
+ ]);
+ $parser->addOption('config', [
+ 'help' => 'Requeue jobs by the config used to queue the job.',
+ ]);
+ $parser->addOption('force', [
+ 'help' => 'Automatically assume yes in response to confirmation prompt.',
+ 'short' => 'f',
+ 'boolean' => true,
+ ]);
+
+ return $parser;
+ }
+
+ /**
+ * @param \Cake\Console\Arguments $args Arguments
+ * @param \Cake\Console\ConsoleIo $io ConsoleIo
+ * @return void
+ */
+ public function execute(Arguments $args, ConsoleIo $io)
+ {
+ /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
+ $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
+
+ $jobsToRequeue = $failedJobsTable->find();
+
+ if ($args->hasArgument('ids')) {
+ $idsArg = $args->getArgument('ids');
+
+ if ($idsArg !== null) {
+ $ids = explode(',', $idsArg);
+
+ $jobsToRequeue->whereInList('id', $ids);
+ }
+ }
+
+ if ($args->hasOption('class')) {
+ $jobsToRequeue->where(['class' => $args->getOption('class')]);
+ }
+
+ if ($args->hasOption('queue')) {
+ $jobsToRequeue->where(['queue' => $args->getOption('queue')]);
+ }
+
+ if ($args->hasOption('config')) {
+ $jobsToRequeue->where(['config' => $args->getOption('config')]);
+ }
+
+ $requeueingCount = $jobsToRequeue->count();
+
+ if (!$requeueingCount) {
+ $io->out('0 jobs found.');
+
+ return;
+ }
+
+ if (!$args->getOption('force')) {
+ $confirmed = $io->askChoice("Requeue {$requeueingCount} jobs?", ['y', 'n'], 'n');
+
+ if ($confirmed !== 'y') {
+ return;
+ }
+ }
+
+ $io->out("Requeueing {$requeueingCount} jobs.");
+
+ $succeededCount = 0;
+ $failedCount = 0;
+
+ foreach ($jobsToRequeue as $failedJob) {
+ $io->verbose("Requeueing FailedJob with ID {$failedJob->id}.");
+ try {
+ QueueManager::push(
+ [$failedJob->class, $failedJob->method],
+ $failedJob->decoded_data,
+ [
+ 'config' => $failedJob->config,
+ 'priority' => $failedJob->priority,
+ 'queue' => $failedJob->queue,
+ ]
+ );
+
+ $failedJobsTable->deleteOrFail($failedJob);
+
+ $succeededCount++;
+ } catch (Exception $e) {
+ $io->err("Exception occurred while requeueing FailedJob with ID {$failedJob->id}");
+ $io->err((string)$e);
+
+ $failedCount++;
+ }
+ }
+
+ if ($failedCount) {
+ $io->err("Failed to requeue {$failedCount} jobs.");
+ }
+
+ if ($succeededCount) {
+ $io->success("{$succeededCount} jobs requeued.");
+ }
+ }
+}
diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php
index 5e95860..ed335ff 100644
--- a/src/Command/WorkerCommand.php
+++ b/src/Command/WorkerCommand.php
@@ -24,6 +24,7 @@
use Cake\Log\Log;
use Cake\Queue\Consumption\LimitAttemptsExtension;
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
+use Cake\Queue\Listener\FailedJobsListener;
use Cake\Queue\Queue\Processor;
use Cake\Queue\QueueManager;
use DateTime;
@@ -109,9 +110,13 @@ public function getOptionParser(): ConsoleOptionParser
*/
protected function getQueueExtension(Arguments $args, LoggerInterface $logger): ExtensionInterface
{
+ $limitAttempsExtension = new LimitAttemptsExtension((int)$args->getOption('max-attempts') ?: null);
+
+ $limitAttempsExtension->getEventManager()->on(new FailedJobsListener());
+
$extensions = [
new LoggerExtension($logger),
- new LimitAttemptsExtension((int)$args->getOption('max-attempts') ?: null),
+ $limitAttempsExtension,
];
if (!is_null($args->getOption('max-jobs'))) {
diff --git a/src/Consumption/LimitAttemptsExtension.php b/src/Consumption/LimitAttemptsExtension.php
index bc85281..c7d19f9 100644
--- a/src/Consumption/LimitAttemptsExtension.php
+++ b/src/Consumption/LimitAttemptsExtension.php
@@ -3,6 +3,7 @@
namespace Cake\Queue\Consumption;
+use Cake\Event\EventDispatcherTrait;
use Cake\Queue\Job\Message;
use Enqueue\Consumption\Context\MessageResult;
use Enqueue\Consumption\MessageResultExtensionInterface;
@@ -11,6 +12,8 @@
class LimitAttemptsExtension implements MessageResultExtensionInterface
{
+ use EventDispatcherTrait;
+
/**
* The property key used to set the number of times a message was attempted.
*
@@ -41,7 +44,7 @@ public function __construct(?int $maxAttempts = null)
*/
public function onResult(MessageResult $context): void
{
- if ($context->getResult() !== Processor::REQUEUE) {
+ if ($context->getResult() != Processor::REQUEUE) {
return;
}
@@ -58,10 +61,20 @@ public function onResult(MessageResult $context): void
$attemptNumber = $message->getProperty(self::ATTEMPTS_PROPERTY, 0) + 1;
if ($attemptNumber >= $maxAttempts) {
+ $originalResult = $context->getResult();
+
$context->changeResult(
Result::reject(sprintf('The maximum number of %d allowed attempts was reached.', $maxAttempts))
);
+ $exception = $originalResult instanceof Result ? $originalResult->getReason() : null;
+
+ $this->dispatchEvent(
+ 'Consumption.LimitAttemptsExtension.failed',
+ ['exception' => $exception, 'logger' => $context->getLogger()],
+ $jobMessage
+ );
+
return;
}
diff --git a/src/Job/Message.php b/src/Job/Message.php
index 200dcf6..de73f81 100644
--- a/src/Job/Message.php
+++ b/src/Job/Message.php
@@ -106,7 +106,7 @@ public function getCallable()
*
* @return array
*/
- protected function getTarget(): array
+ public function getTarget(): array
{
$target = $this->parsedBody['class'] ?? null;
diff --git a/src/Listener/FailedJobsListener.php b/src/Listener/FailedJobsListener.php
new file mode 100644
index 0000000..7e26fd2
--- /dev/null
+++ b/src/Listener/FailedJobsListener.php
@@ -0,0 +1,101 @@
+
+ */
+ public function implementedEvents(): array
+ {
+ return [
+ 'Consumption.LimitAttemptsExtension.failed' => 'storeFailedJob',
+ ];
+ }
+
+ /**
+ * @param \Cake\Event\EventInterface $event EventInterface
+ * @return void
+ */
+ public function storeFailedJob(EventInterface $event): void
+ {
+ /** @var \Cake\Queue\Job\Message $jobMessage */
+ $jobMessage = $event->getSubject();
+
+ [$class, $method] = $jobMessage->getTarget();
+
+ $originalMessage = $jobMessage->getOriginalMessage();
+
+ $originalMessageBody = json_decode($originalMessage->getBody(), true);
+
+ ['data' => $data, 'requeueOptions' => $requeueOptions] = $originalMessageBody;
+
+ $config = QueueManager::getConfig($requeueOptions['config']);
+
+ if (!($config['storeFailedJobs'] ?? false)) {
+ return;
+ }
+
+ /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
+ $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
+
+ $failedJob = $failedJobsTable->newEntity([
+ 'class' => $class,
+ 'method' => $method,
+ 'data' => json_encode($data),
+ 'config' => $requeueOptions['config'],
+ 'priority' => $requeueOptions['priority'],
+ 'queue' => $requeueOptions['queue'],
+ 'exception' => $event->getData('exception'),
+ ]);
+
+ try {
+ $failedJobsTable->saveOrFail($failedJob);
+ } catch (PersistenceFailedException $e) {
+ $logger = $event->getData('logger');
+
+ if (!$logger) {
+ throw new RuntimeException(
+ sprintf('`logger` was not defined on %s event.', $event->getName()),
+ 0,
+ $e
+ );
+ }
+
+ if (!($logger instanceof LoggerInterface)) {
+ throw new RuntimeException(
+ sprintf('`logger` is not an instance of `LoggerInterface` on %s event.', $event->getName()),
+ 0,
+ $e
+ );
+ }
+
+ $logger->error((string)$e);
+ }
+ }
+}
diff --git a/src/Model/Entity/FailedJob.php b/src/Model/Entity/FailedJob.php
new file mode 100644
index 0000000..dcf6b90
--- /dev/null
+++ b/src/Model/Entity/FailedJob.php
@@ -0,0 +1,52 @@
+ true`
+ * means that any field not defined in the map will be accessible by default
+ *
+ * @var array
+ */
+ protected $_accessible = [
+ 'class' => true,
+ 'method' => true,
+ 'data' => true,
+ 'config' => true,
+ 'priority' => true,
+ 'queue' => true,
+ 'exception' => true,
+ 'created' => true,
+ ];
+
+ /**
+ * @return array
+ */
+ protected function _getDecodedData(): array
+ {
+ return json_decode($this->data, true);
+ }
+}
diff --git a/src/Model/Table/FailedJobsTable.php b/src/Model/Table/FailedJobsTable.php
new file mode 100644
index 0000000..4358a0d
--- /dev/null
+++ b/src/Model/Table/FailedJobsTable.php
@@ -0,0 +1,96 @@
+setTable('queue_failed_jobs');
+ $this->setDisplayField('id');
+ $this->setPrimaryKey('id');
+
+ $this->addBehavior('Timestamp');
+ }
+
+ /**
+ * Default validation rules.
+ *
+ * @param \Cake\Validation\Validator $validator Validator instance.
+ * @return \Cake\Validation\Validator
+ */
+ public function validationDefault(Validator $validator): Validator
+ {
+ $validator
+ ->integer('id')
+ ->allowEmptyString('id', null, 'create');
+
+ $validator
+ ->scalar('class')
+ ->maxLength('class', 255)
+ ->requirePresence('class', 'create')
+ ->notEmptyString('class');
+
+ $validator
+ ->scalar('method')
+ ->maxLength('method', 255)
+ ->requirePresence('method', 'create')
+ ->notEmptyString('method');
+
+ $validator
+ ->scalar('data')
+ ->requirePresence('data', 'create')
+ ->notEmptyString('data');
+
+ $validator
+ ->scalar('config')
+ ->maxLength('config', 255)
+ ->notEmptyString('config');
+
+ $validator
+ ->scalar('priority')
+ ->maxLength('priority', 255)
+ ->allowEmptyString('priority');
+
+ $validator
+ ->scalar('queue')
+ ->maxLength('queue', 255)
+ ->notEmptyString('queue');
+
+ $validator
+ ->scalar('exception')
+ ->allowEmptyString('exception');
+
+ return $validator;
+ }
+}
diff --git a/src/Plugin.php b/src/Plugin.php
index 20cfd91..ee938da 100644
--- a/src/Plugin.php
+++ b/src/Plugin.php
@@ -21,6 +21,8 @@
use Cake\Core\Configure;
use Cake\Core\PluginApplicationInterface;
use Cake\Queue\Command\JobCommand;
+use Cake\Queue\Command\PurgeFailedCommand;
+use Cake\Queue\Command\RequeueCommand;
use Cake\Queue\Command\WorkerCommand;
/**
@@ -71,6 +73,8 @@ public function console(CommandCollection $commands): CommandCollection
return $commands
->add('queue worker', WorkerCommand::class)
- ->add('worker', WorkerCommand::class);
+ ->add('worker', WorkerCommand::class)
+ ->add('queue requeue', RequeueCommand::class)
+ ->add('queue purge_failed', PurgeFailedCommand::class);
}
}
diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php
index b092f57..89d01a4 100644
--- a/src/Queue/Processor.php
+++ b/src/Queue/Processor.php
@@ -18,6 +18,7 @@
use Cake\Event\EventDispatcherTrait;
use Cake\Queue\Job\Message;
+use Enqueue\Consumption\Result;
use Error;
use Exception;
use Interop\Queue\Context;
@@ -78,7 +79,7 @@ public function process(QueueMessage $message, Context $context)
'exception' => $e,
]);
- return InteropProcessor::REQUEUE;
+ return Result::requeue(sprintf('Exception occurred while processing message: %s', (string)$e));
}
if ($response === InteropProcessor::ACK) {
diff --git a/src/QueueManager.php b/src/QueueManager.php
index 2c1c6b0..8f9de11 100644
--- a/src/QueueManager.php
+++ b/src/QueueManager.php
@@ -216,6 +216,11 @@ public static function push($className, array $data = [], array $options = []):
'class' => [$class, $method],
'args' => [$data],
'data' => $data,
+ 'requeueOptions' => [
+ 'config' => $name,
+ 'priority' => $options['priority'] ?? null,
+ 'queue' => $queue,
+ ],
]);
if (isset($options['delay'])) {
diff --git a/tests/Fixture/FailedJobsFixture.php b/tests/Fixture/FailedJobsFixture.php
new file mode 100644
index 0000000..2f0da38
--- /dev/null
+++ b/tests/Fixture/FailedJobsFixture.php
@@ -0,0 +1,85 @@
+ ['type' => 'integer', 'length' => null, 'unsigned' => false, 'null' => false, 'default' => null, 'comment' => '', 'autoIncrement' => true, 'precision' => null],
+ 'class' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null],
+ 'method' => ['type' => 'string', 'length' => 255, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null],
+ 'data' => ['type' => 'text', 'length' => null, 'null' => false, 'default' => null, 'comment' => '', 'precision' => null],
+ 'config' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
+ 'priority' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
+ 'queue' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
+ 'exception' => ['type' => 'text', 'length' => null, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null],
+ 'created' => ['type' => 'datetime', 'length' => null, 'precision' => null, 'null' => true, 'default' => null, 'comment' => ''],
+ '_constraints' => [
+ 'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []],
+ ],
+ '_options' => [
+ 'engine' => 'InnoDB',
+ ],
+ ];
+ // phpcs:enable
+ /**
+ * Init method
+ *
+ * @return void
+ */
+ public function init(): void
+ {
+ $this->records = [
+ [
+ 'id' => 1,
+ 'class' => LogToDebugJob::class,
+ 'method' => 'execute',
+ 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}',
+ 'config' => 'default',
+ 'priority' => null,
+ 'queue' => 'default',
+ 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.',
+ 'created' => '2022-10-11 18:42:29',
+ ],
+ [
+ 'id' => 2,
+ 'class' => MaxAttemptsIsThreeJob::class,
+ 'method' => 'execute',
+ 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}',
+ 'config' => 'default',
+ 'priority' => null,
+ 'queue' => 'default',
+ 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.',
+ 'created' => '2022-10-11 18:42:29',
+ ],
+ [
+ 'id' => 3,
+ 'class' => LogToDebugJob::class,
+ 'method' => 'execute',
+ 'data' => '{"sample_data_1": "sample value", "sample_data_2": 1}',
+ 'config' => 'alternate_config',
+ 'priority' => null,
+ 'queue' => 'alternate_queue',
+ 'exception' => 'Lorem ipsum dolor sit amet, aliquet feugiat. Convallis morbi fringilla gravida, phasellus feugiat dapibus velit nunc, pulvinar eget sollicitudin venenatis cum nullam, vivamus ut a sed, mollitia lectus. Nulla vestibulum massa neque ut et, id hendrerit sit, feugiat in taciti enim proin nibh, tempor dignissim, rhoncus duis vestibulum nunc mattis convallis.',
+ 'created' => '2022-10-11 18:42:29',
+ ],
+ ];
+ parent::init();
+ }
+}
diff --git a/tests/TestCase/Command/PurgeFailedCommandTest.php b/tests/TestCase/Command/PurgeFailedCommandTest.php
new file mode 100644
index 0000000..af7a1b2
--- /dev/null
+++ b/tests/TestCase/Command/PurgeFailedCommandTest.php
@@ -0,0 +1,120 @@
+useCommandRunner();
+ }
+
+ public function testFailedJobsAreDeleted()
+ {
+ $this->exec('queue purge_failed', ['y']);
+
+ $this->assertOutputContains('Deleting 3 jobs.');
+ $this->assertOutputContains('3 jobs deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->all();
+
+ $this->assertCount(0, $results);
+ }
+
+ public function testFailedJobsAreNotDeletedIfNotConfirmed()
+ {
+ $this->exec('queue purge_failed', ['n']);
+
+ $this->assertOutputNotContains('Deleting');
+ $this->assertOutputNotContains('deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->all();
+
+ $this->assertCount(3, $results);
+ }
+
+ public function testFailedJobsAreDeletedById()
+ {
+ $this->exec('queue purge_failed 1,2 -f');
+
+ $this->assertOutputContains('Deleting 2 jobs.');
+ $this->assertOutputContains('2 jobs deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray();
+
+ $this->assertCount(1, $results);
+ $this->assertSame(3, $results[0]->id);
+ }
+
+ public function testFailedJobsAreDeletedByClass()
+ {
+ $class = LogToDebugJob::class;
+ $this->exec("queue purge_failed --class {$class} -f");
+
+ $this->assertOutputContains('Deleting 2 jobs.');
+ $this->assertOutputContains('2 jobs deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray();
+
+ $this->assertCount(1, $results);
+ $this->assertSame(2, $results[0]->id);
+ }
+
+ public function testFailedJobsAreDeletedByQueue()
+ {
+ $this->exec('queue purge_failed --queue alternate_queue -f');
+
+ $this->assertOutputContains('Deleting 1 jobs.');
+ $this->assertOutputContains('1 jobs deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray();
+
+ $this->assertCount(2, $results);
+ $this->assertSame(1, $results[0]->id);
+ $this->assertSame(2, $results[1]->id);
+ }
+
+ public function testFailedJobsAreDeletedByConfig()
+ {
+ $this->exec('queue purge_failed --config alternate_config -f');
+
+ $this->assertOutputContains('Deleting 1 jobs.');
+ $this->assertOutputContains('1 jobs deleted.');
+
+ $results = $this->getTableLocator()->get('Cake/Queue.FailedJobs')->find()->toArray();
+
+ $this->assertCount(2, $results);
+ $this->assertSame(1, $results[0]->id);
+ $this->assertSame(2, $results[1]->id);
+ }
+}
diff --git a/tests/TestCase/Command/RequeueCommandTest.php b/tests/TestCase/Command/RequeueCommandTest.php
new file mode 100644
index 0000000..e4bc6a0
--- /dev/null
+++ b/tests/TestCase/Command/RequeueCommandTest.php
@@ -0,0 +1,204 @@
+useCommandRunner();
+
+ Log::setConfig('debug', [
+ 'className' => 'Array',
+ 'levels' => ['notice', 'info', 'debug'],
+ ]);
+
+ $config = [
+ 'queue' => 'default',
+ 'url' => 'file:///' . TMP . DS . uniqid('queue'),
+ 'receiveTimeout' => 100,
+ ];
+ Configure::write('Queue', ['default' => $config]);
+ }
+
+ public function tearDown(): void
+ {
+ parent::tearDown();
+
+ Log::reset();
+
+ QueueManager::drop('default');
+ }
+
+ public function testJobsAreRequeued()
+ {
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+ QueueManager::drop('default');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+ $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $this->exec('queue requeue --queue default', ['y']);
+
+ $this->assertOutputContains('Requeueing 2 jobs.');
+ $this->assertOutputContains('2 jobs requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 1);
+ $this->assertDebugLogContains('MaxAttemptsIsThreeJob is requeueing');
+ }
+
+ public function testJobsAreNotRequeuedIfNotConfirmed()
+ {
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+ QueueManager::drop('default');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+ $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $this->exec('queue requeue --queue default', ['n']);
+
+ $this->assertOutputNotContains('Requeueing');
+ $this->assertOutputNotContains('requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+ $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob is requeueing', 0);
+ }
+
+ public function testJobsAreRequeuedById()
+ {
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+ QueueManager::drop('default');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+ $this->assertDebugLogContainsExactly('MaxAttemptsIsThreeJob was run', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $this->exec('queue requeue 1,2 -f');
+
+ $this->assertOutputContains('Requeueing 2 jobs.');
+ $this->assertOutputContains('2 jobs requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 1);
+ $this->assertDebugLogContains('MaxAttemptsIsThreeJob is requeueing');
+ }
+
+ public function testJobsAreRequeuedByClass()
+ {
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+ QueueManager::drop('default');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $class = LogToDebugJob::class;
+ $this->exec("queue requeue --class {$class} --queue default -f");
+
+ $this->assertOutputContains('Requeueing 1 jobs.');
+ $this->assertOutputContains('1 jobs requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --verbose');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 1);
+ }
+
+ public function testJobsAreRequeuedByQueue()
+ {
+ $config = [
+ 'queue' => 'alternate_queue',
+ 'url' => 'file:///' . TMP . DS . 'queue' . uniqid('queue'),
+ 'receiveTimeout' => 100,
+ ];
+ Configure::write('Queue', ['alternate_config' => $config]);
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose');
+ QueueManager::drop('alternate_config');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $this->exec('queue requeue --queue alternate_queue -f');
+
+ $this->assertOutputContains('Requeueing 1 jobs.');
+ $this->assertOutputContains('1 jobs requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose');
+ QueueManager::drop('alternate_config');
+
+ $this->assertDebugLogContains('Debug job was run');
+ }
+
+ public function testJobsAreRequeuedByConfig()
+ {
+ $config = [
+ 'queue' => 'alternate_queue',
+ 'url' => 'file:///' . TMP . DS . 'queue' . uniqid('queue'),
+ 'receiveTimeout' => 100,
+ ];
+ Configure::write('Queue', ['alternate_config' => $config]);
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose');
+ QueueManager::drop('alternate_config');
+
+ $this->assertDebugLogContainsExactly('Debug job was run', 0);
+
+ $this->cleanupConsoleTrait();
+ $this->useCommandRunner();
+ $this->exec('queue requeue --config alternate_config -f');
+
+ $this->assertOutputContains('Requeueing 1 jobs.');
+ $this->assertOutputContains('1 jobs requeued.');
+
+ $this->exec('queue worker --max-jobs=3 --logger=debug --config alternate_config --verbose');
+ QueueManager::drop('alternate_config');
+
+ $this->assertDebugLogContains('Debug job was run');
+ }
+}
diff --git a/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php b/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php
index 41525d0..14f963f 100644
--- a/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php
+++ b/tests/TestCase/Consumption/LimitAttemptsExtensionTest.php
@@ -3,6 +3,8 @@
namespace Cake\Queue\Test\TestCase\Job;
+use Cake\Event\EventList;
+use Cake\Event\EventManager;
use Cake\Log\Log;
use Cake\Queue\Consumption\LimitAttemptsExtension;
use Cake\Queue\Consumption\LimitConsumedMessagesExtension;
@@ -19,6 +21,13 @@ class LimitAttemptsExtensionTest extends TestCase
{
use DebugLogTrait;
+ public function setUp(): void
+ {
+ parent::setUp();
+
+ EventManager::instance()->setEventList(new EventList());
+ }
+
/**
* @beforeClass
* @after
@@ -41,6 +50,17 @@ public function testMessageShouldBeRequeuedIfMaxAttemptsIsNotSet()
$this->assertGreaterThanOrEqual(10, $count);
}
+ public function testFailedEventIsFiredWhenMaxAttemptsIsExceeded()
+ {
+ $consume = $this->setupQeueue();
+
+ QueueManager::push(MaxAttemptsIsThreeJob::class, []);
+
+ $consume();
+
+ $this->assertEventFired('Consumption.LimitAttemptsExtension.failed');
+ }
+
public function testMessageShouldBeRequeuedUntilMaxAttemptsIsReached()
{
$consume = $this->setupQeueue();
diff --git a/tests/TestCase/DebugLogTrait.php b/tests/TestCase/DebugLogTrait.php
index d479f9e..e50006d 100644
--- a/tests/TestCase/DebugLogTrait.php
+++ b/tests/TestCase/DebugLogTrait.php
@@ -7,7 +7,7 @@
trait DebugLogTrait
{
- protected function assertDebugLogContains($expected, $times = null): void
+ protected function assertDebugLogContains($expected): void
{
$found = $this->debugLogCount($expected);
diff --git a/tests/TestCase/Listener/FailedJobsListenerTest.php b/tests/TestCase/Listener/FailedJobsListenerTest.php
new file mode 100644
index 0000000..182ad35
--- /dev/null
+++ b/tests/TestCase/Listener/FailedJobsListenerTest.php
@@ -0,0 +1,95 @@
+ 'null:',
+ 'storeFailedJobs' => true,
+ ]);
+ }
+
+ public function tearDown(): void
+ {
+ parent::tearDown();
+
+ QueueManager::drop('example_config');
+ }
+
+ public function testFailedJobIsAddedWhenEventIsFired()
+ {
+ $parsedBody = [
+ 'class' => [LogToDebugJob::class, 'execute'],
+ 'data' => ['example_key' => 'example_value'],
+ 'requeueOptions' => [
+ 'config' => 'example_config',
+ 'priority' => 'example_priority',
+ 'queue' => 'example_queue',
+ ],
+ ];
+ $messageBody = json_encode($parsedBody);
+ $connectionFactory = new NullConnectionFactory();
+
+ $context = $connectionFactory->createContext();
+ $originalMessage = new NullMessage($messageBody);
+ $message = new Message($originalMessage, $context);
+
+ $event = new Event(
+ 'Consumption.LimitAttemptsExtension.failed',
+ $message,
+ ['exception' => 'some message']
+ );
+
+ /** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
+ $failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');
+ $failedJobsTable->deleteAll(['1=1']);
+
+ EventManager::instance()->on(new FailedJobsListener());
+ EventManager::instance()->dispatch($event);
+
+ $this->assertSame(1, $failedJobsTable->find()->count());
+
+ $failedJob = $failedJobsTable->find()->first();
+
+ $this->assertSame(LogToDebugJob::class, $failedJob->class);
+ $this->assertSame('execute', $failedJob->method);
+ $this->assertSame(json_encode(['example_key' => 'example_value']), $failedJob->data);
+ $this->assertSame('example_config', $failedJob->config);
+ $this->assertSame('example_priority', $failedJob->priority);
+ $this->assertSame('example_queue', $failedJob->queue);
+ $this->assertStringContainsString('some message', $failedJob->exception);
+ }
+}
diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php
index e586157..7f76af8 100644
--- a/tests/TestCase/Queue/ProcessorTest.php
+++ b/tests/TestCase/Queue/ProcessorTest.php
@@ -153,7 +153,7 @@ public function testProcessWillRequeueOnException()
$processor->getEventManager()->setEventList($events);
$result = $processor->process($queueMessage, $context);
- $this->assertSame(InteropProcessor::REQUEUE, $result);
+ $this->assertEquals(InteropProcessor::REQUEUE, $result);
}
/**
diff --git a/tests/bootstrap.php b/tests/bootstrap.php
index 4eae28d..66530a9 100644
--- a/tests/bootstrap.php
+++ b/tests/bootstrap.php
@@ -15,6 +15,7 @@
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
+use Cake\Cache\Cache;
use Cake\Core\Configure;
use Cake\Routing\Router;
@@ -45,6 +46,25 @@
define('CONFIG', ROOT . DS . 'config' . DS);
}
+// phpcs:disable
+@mkdir(CACHE);
+@mkdir(CACHE . 'views');
+@mkdir(CACHE . 'models');
+// phpcs:enable
+
+Cache::setConfig([
+ '_cake_core_' => [
+ 'engine' => 'File',
+ 'prefix' => 'cake_core_',
+ 'serialize' => true,
+ ],
+ '_cake_model_' => [
+ 'engine' => 'File',
+ 'prefix' => 'cake_model_',
+ 'serialize' => true,
+ ],
+]);
+
Configure::write('debug', true);
Configure::write('App', [
'namespace' => 'TestApp',
diff --git a/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php b/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php
index 7aa60fc..030dea8 100644
--- a/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php
+++ b/tests/test_app/src/Job/MaxAttemptsIsThreeJob.php
@@ -8,6 +8,7 @@
use Cake\Queue\Job\JobInterface;
use Cake\Queue\Job\Message;
use Interop\Queue\Processor;
+use RuntimeException;
class MaxAttemptsIsThreeJob implements JobInterface
{
@@ -23,7 +24,7 @@ public function execute(Message $message): ?string
if (!$succeedAt || $succeedAt > $attemptNumber) {
Log::debug('MaxAttemptsIsThreeJob is requeueing');
- return Processor::REQUEUE;
+ throw new RuntimeException('example MaxAttemptsIsThreeJob exception message');
}
Log::debug('MaxAttemptsIsThreeJob was run');