Skip to content

Commit

Permalink
Add failed jobs and requeue/delete commands
Browse files Browse the repository at this point in the history
  • Loading branch information
sheldonreiff committed Nov 4, 2022
1 parent 05cd2e1 commit 2271a55
Show file tree
Hide file tree
Showing 24 changed files with 1,260 additions and 9 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
}
},
"suggest": {
"cakephp/bake": "Required if you want to generate jobs."
"cakephp/bake": "Required if you want to generate jobs.",
"cakephp/migrations": "Needed for running the migrations necessary for using Failed Jobs."
},
"scripts": {
"check": [
Expand Down
58 changes: 58 additions & 0 deletions config/Migrations/20221007202459_CreateFailedJobs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php
declare(strict_types=1);

use Migrations\AbstractMigration;

class CreateFailedJobs extends AbstractMigration
{
/**
* Change Method.
*
* More information on this method is available here:
* https://book.cakephp.org/phinx/0/en/migrations.html#the-change-method
* @return void
*/
public function change()
{
$table = $this->table('queue_failed_jobs');
$table->addColumn('class', 'string', [
'length' => 255,
'null' => false,
'default' => null,
])
->addColumn('method', 'string', [
'length' => 255,
'null' => false,
'default' => null,
])
->addColumn('data', 'text', [
'null' => false,
'default' => null,
])
->addColumn('config', 'string', [
'length' => 255,
'null' => false,
'default' => null,
])
->addColumn('priority', 'string', [
'length' => 255,
'null' => true,
'default' => null,
])
->addColumn('queue', 'string', [
'length' => 255,
'null' => false,
'default' => null,
])
->addColumn('exception', 'text', [
'null' => true,
'default' => null,
])
->addColumn('created', 'datetime', [
'default' => null,
'limit' => null,
'null' => true,
])
->create();
}
}
74 changes: 74 additions & 0 deletions docs/en/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,31 @@ The following configuration should be present in the config array of your **conf

// The amount of time in milliseconds to sleep if no jobs are currently available. default: 10000
'receiveTimeout' => 10000,

// Whether to store failed jobs in the queue_failed_jobs table. default: false
'storeFailedJobs' => true,
]
],
// ...

The ``Queue`` config key can contain one or more queue configurations. Each of
these is used for interacting with a different queuing backend.

If ``storeFailedJobs`` is set to ``true``, make sure to run the plugin migrations to create the ``queue_failed_jobs`` table.

Install the migrations plugin:

.. code-block:: bash
composer require cakephp/migrations:"^3.1"
Run the migrations:

.. code-block:: bash
bin/cake migrations migrate --plugin Cake/Queue
Usage
=====

Expand Down Expand Up @@ -266,6 +284,62 @@ This shell can take a few different options:
- Max Runtime
- Runtime: Time since the worker started, the worker will finish when Runtime is over Max Runtime value

Failed Jobs
===========

By default, jobs that throw an exception are requeued indefinitely. However, if
``maxAttempts`` is configured on the job class or via a command line argument, a
job will be considered failed if a ``Processor::REQUEUE`` response is received
after processing (typically due to an exception being thrown) and there are no
remaining attempts. The job will then be rejected and added to the
``queue_failed_jobs`` table and can be requeued manually.

Your chosen transport may offer a dead-letter queue feature. While Failed Jobs
has a similar purpose, it specifically captures jobs that return a
``Processor::REQUEUE`` response and does not handle other failure cases. It is
agnostic of transport and only supports database persistence.

The following options passed when originally queueing the job will be preserved:
``config``, ``queue``, and ``priority``.

Requeue Failed Jobs
-------------------

Push jobs back onto the queue and remove them from the ``queue_failed_jobs``
table. If a job fails to requeue it is not guaranteed that the job was not run.

.. code-block:: bash
bin/cake queue requeue
Optional filters:

- ``--id``: Requeue job by the ID of the ``FailedJob``
- ``--class``: Requeue jobs by the job class
- ``--queue``: Requeue jobs by the queue the job was received on
- ``--config``: Requeue jobs by the config used to queue the job

If no filters are provided then all failed jobs will be requeued.

Purge Failed Jobs
------------------

Delete jobs from the ``queue_failed_jobs`` table.

.. code-block:: bash
bin/cake queue purge_failed
Optional filters:

- ``--id``: Purge job by the ID of the ``FailedJob``
- ``--class``: Purge jobs by the job class
- ``--queue``: Purge jobs by the queue the job was received on
- ``--config``: Purge jobs by the config used to queue the job

If no filters are provided then all failed jobs will be purged.


Worker Events
=============

Expand Down
7 changes: 7 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@
<directory>tests/TestCase</directory>
</testsuite>
</testsuites>
<listeners>
<listener class="Cake\TestSuite\Fixture\FixtureInjector">
<arguments>
<object class="Cake\TestSuite\Fixture\FixtureManager"/>
</arguments>
</listener>
</listeners>
</phpunit>
128 changes: 128 additions & 0 deletions src/Command/PurgeFailedCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php
declare(strict_types=1);

/**
* CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
* Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
*
* Licensed under The MIT License
* For full copyright and license information, please see the LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/)
* @link https://cakephp.org CakePHP(tm) Project
* @since 0.1.0
* @license https://opensource.org/licenses/MIT MIT License
*/
namespace Cake\Queue\Command;

use Cake\Command\Command;
use Cake\Console\Arguments;
use Cake\Console\ConsoleIo;
use Cake\Console\ConsoleOptionParser;
use Cake\ORM\Locator\LocatorAwareTrait;

class PurgeFailedCommand extends Command
{
use LocatorAwareTrait;

/**
* Get the command name.
*
* @return string
*/
public static function defaultName(): string
{
return 'queue purge_failed';
}

/**
* Gets the option parser instance and configures it.
*
* @return \Cake\Console\ConsoleOptionParser
*/
public function getOptionParser(): ConsoleOptionParser
{
$parser = parent::getOptionParser();

$parser->setDescription('Delete failed jobs.');

$parser->addArgument('ids', [
'required' => false,
'help' => 'Delete jobs by the FailedJob ID (comma-separated).',
]);
$parser->addOption('class', [
'help' => 'Delete jobs by the job class.',
]);
$parser->addOption('queue', [
'help' => 'Delete jobs by the queue the job was received on.',
]);
$parser->addOption('config', [
'help' => 'Delete jobs by the config used to queue the job.',
]);
$parser->addOption('force', [
'help' => 'Automatically assume yes in response to confirmation prompt.',
'short' => 'f',
'boolean' => true,
]);

return $parser;
}

/**
* @param \Cake\Console\Arguments $args Arguments
* @param \Cake\Console\ConsoleIo $io ConsoleIo
* @return void
*/
public function execute(Arguments $args, ConsoleIo $io)
{
/** @var \Cake\Queue\Model\Table\FailedJobsTable $failedJobsTable */
$failedJobsTable = $this->getTableLocator()->get('Cake/Queue.FailedJobs');

$jobsToDelete = $failedJobsTable->find();

if ($args->hasArgument('ids')) {
$idsArg = $args->getArgument('ids');

if ($idsArg !== null) {
$ids = explode(',', $idsArg);

$jobsToDelete->whereInList($failedJobsTable->aliasField('id'), $ids);
}
}

if ($args->hasOption('class')) {
$jobsToDelete->where(['class' => $args->getOption('class')]);
}

if ($args->hasOption('queue')) {
$jobsToDelete->where(['queue' => $args->getOption('queue')]);
}

if ($args->hasOption('config')) {
$jobsToDelete->where(['config' => $args->getOption('config')]);
}

$deletingCount = $jobsToDelete->count();

if (!$deletingCount) {
$io->out('0 jobs found.');

return;
}

if (!$args->getOption('force')) {
$confirmed = $io->askChoice("Delete {$deletingCount} jobs?", ['y', 'n'], 'n');

if ($confirmed !== 'y') {
return;
}
}

$io->out("Deleting {$deletingCount} jobs.");

$failedJobsTable->deleteManyOrFail($jobsToDelete);

$io->success("{$deletingCount} jobs deleted.");
}
}
Loading

0 comments on commit 2271a55

Please sign in to comment.