Skip to content

Commit

Permalink
Add checker for check requirements before run consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhukV committed Feb 22, 2024
1 parent 1419f9c commit f5304f8
Show file tree
Hide file tree
Showing 15 changed files with 503 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

Next release
------------

* Add run checker (check before run consumer).

v2.0.3
------

Expand Down
1 change: 0 additions & 1 deletion src/Adapter/Amqp/Queue/AmqpQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public function get(): ?ReceivedMessage
$envelope = $this->queue->get();

if ($envelope) {
// @phpstan-ignore-next-line
return new AmqpReceivedMessage($this->queue, $envelope);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Command/OutputEventHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function __invoke(Event $event): void
'<error>Receive consumer timeout exceed error.</error>',
OutputInterface::VERBOSITY_VERBOSE
);
} else if (Event::StopAfterNExecutes === $event) {
} elseif (Event::StopAfterNExecutes === $event) {
$this->output->writeln(
'<error>Stop consumer after N executes.</error>',
OutputInterface::VERBOSITY_VERBOSE
Expand Down
40 changes: 35 additions & 5 deletions src/Command/RunConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

namespace FiveLab\Component\Amqp\Command;

use FiveLab\Component\Amqp\Consumer\Checker\RunConsumerCheckerRegistry;
use FiveLab\Component\Amqp\Consumer\Checker\RunConsumerCheckerRegistryInterface;
use FiveLab\Component\Amqp\Consumer\ConsumerInterface;
use FiveLab\Component\Amqp\Consumer\EventableConsumerInterface;
use FiveLab\Component\Amqp\Consumer\Middleware\StopAfterNExecutesMiddleware;
use FiveLab\Component\Amqp\Consumer\MiddlewareAwareInterface;
use FiveLab\Component\Amqp\Consumer\Registry\ConsumerRegistryInterface;
use FiveLab\Component\Amqp\Exception\ConsumerTimeoutExceedException;
use FiveLab\Component\Amqp\Exception\RunConsumerCheckerNotFoundException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
Expand All @@ -42,14 +45,24 @@ class RunConsumerCommand extends Command
*/
protected static $defaultDescription = 'Run consumer.';

/**
* @var RunConsumerCheckerRegistryInterface
*/
private readonly RunConsumerCheckerRegistryInterface $runCheckerRegistry;

/**
* Constructor.
*
* @param ConsumerRegistryInterface $consumerRegistry
* @param ConsumerRegistryInterface $consumerRegistry
* @param RunConsumerCheckerRegistryInterface|null $runCheckerRegistry
*/
public function __construct(private readonly ConsumerRegistryInterface $consumerRegistry)
{
public function __construct(
private readonly ConsumerRegistryInterface $consumerRegistry,
RunConsumerCheckerRegistryInterface $runCheckerRegistry = null
) {
parent::__construct();

$this->runCheckerRegistry = $runCheckerRegistry ?: new RunConsumerCheckerRegistry();
}

/**
Expand All @@ -62,15 +75,32 @@ protected function configure(): void
->addArgument('key', InputArgument::REQUIRED, 'The key of consumer.')
->addOption('read-timeout', null, InputOption::VALUE_REQUIRED, 'Set the read timeout for RabbitMQ.')
->addOption('loop', null, InputOption::VALUE_NONE, 'Loop consume (used only with read-timeout).')
->addOption('messages', null, InputOption::VALUE_REQUIRED, 'After process number of messages process be normal exits.');
->addOption('messages', null, InputOption::VALUE_REQUIRED, 'After process number of messages process be normal exits.')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Check if consumer can be run.');
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$consumer = $this->consumerRegistry->get((string) $input->getArgument('key'));
$consumerKey = (string) $input->getArgument('key');

if ($input->getOption('dry-run')) {
$checker = $this->runCheckerRegistry->get($consumerKey);
$checker->checkBeforeRun();

return 0;
}

try {
$checker = $this->runCheckerRegistry->get($consumerKey);
$checker->checkBeforeRun();
} catch (RunConsumerCheckerNotFoundException) {
// Normal flow. Checker not found.
}

$consumer = $this->consumerRegistry->get($consumerKey);

if ($consumer instanceof EventableConsumerInterface) {
$closure = (new OutputEventHandler($output))(...);
Expand Down
47 changes: 47 additions & 0 deletions src/Consumer/Checker/ContainerRunConsumerCheckerRegistry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Consumer\Checker;

use FiveLab\Component\Amqp\Exception\RunConsumerCheckerNotFoundException;
use Psr\Container\ContainerInterface;

/**
* Registry based on PSR container.
*/
readonly class ContainerRunConsumerCheckerRegistry implements RunConsumerCheckerRegistryInterface
{
/**
* Constructor.
*
* @param ContainerInterface $container
*/
public function __construct(private ContainerInterface $container)
{
}

/**
* {@inheritdoc}
*/
public function get(string $key): RunConsumerCheckerInterface
{
if ($this->container->has($key)) {
return $this->container->get($key);
}

throw new RunConsumerCheckerNotFoundException(\sprintf(
'The checker for consumer "%s" was not found.',
$key
));
}
}
29 changes: 29 additions & 0 deletions src/Consumer/Checker/RunConsumerCheckerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Consumer\Checker;

use FiveLab\Component\Amqp\Exception\CannotRunConsumerException;

/**
* All consumer checkers should implement this interface.
*/
interface RunConsumerCheckerInterface
{
/**
* Call to this method before run consumer.
*
* @throws CannotRunConsumerException
*/
public function checkBeforeRun(): void;
}
49 changes: 49 additions & 0 deletions src/Consumer/Checker/RunConsumerCheckerRegistry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Consumer\Checker;

use FiveLab\Component\Amqp\Exception\RunConsumerCheckerNotFoundException;

/**
* Simple registry for run consumer checker
*/
class RunConsumerCheckerRegistry implements RunConsumerCheckerRegistryInterface
{
/**
* @var array<string, RunConsumerCheckerInterface>
*/
private array $checkers = [];

/**
* Add checker for consumer to registry
*
* @param string $key
* @param RunConsumerCheckerInterface $checker
*/
public function add(string $key, RunConsumerCheckerInterface $checker): void
{
$this->checkers[$key] = $checker;
}

/**
* {@inheritdoc}
*/
public function get(string $key): RunConsumerCheckerInterface
{
return $this->checkers[$key] ?? throw new RunConsumerCheckerNotFoundException(\sprintf(
'The checker for consumer "%s" was not found.',
$key
));
}
}
33 changes: 33 additions & 0 deletions src/Consumer/Checker/RunConsumerCheckerRegistryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Consumer\Checker;

use FiveLab\Component\Amqp\Exception\RunConsumerCheckerNotFoundException;

/**
* All run consumer checker registries should implement this interface.
*/
interface RunConsumerCheckerRegistryInterface
{
/**
* Get checker for consumer
*
* @param string $key
*
* @return RunConsumerCheckerInterface
*
* @throws RunConsumerCheckerNotFoundException
*/
public function get(string $key): RunConsumerCheckerInterface;
}
21 changes: 21 additions & 0 deletions src/Exception/CannotRunConsumerException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Exception;

/**
* Throw this error if consumer can't be runned.
*/
class CannotRunConsumerException extends \Exception
{
}
18 changes: 18 additions & 0 deletions src/Exception/RunConsumerCheckerNotFoundException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

/*
* This file is part of the FiveLab Amqp package
*
* (c) FiveLab
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code
*/

declare(strict_types = 1);

namespace FiveLab\Component\Amqp\Exception;

class RunConsumerCheckerNotFoundException extends \Exception
{
}
Loading

0 comments on commit f5304f8

Please sign in to comment.