From 67680d6f684939a93de61fdff14e51c209ff4321 Mon Sep 17 00:00:00 2001 From: Vasily Rodin Date: Fri, 18 Oct 2024 14:44:14 +0200 Subject: [PATCH 1/6] Introducing debug mode for queue:task:start console command --- .../Queue/Business/QueueBusinessFactory.php | 17 ++- .../Zed/Queue/Business/QueueFacade.php | 18 +++ .../Queue/Business/QueueFacadeInterface.php | 15 +++ .../Queue/Business/Task/TaskDebugHelper.php | 110 ++++++++++++++++++ .../Task/TaskDebugHelperInterface.php | 25 ++++ .../Zed/Queue/Business/Task/TaskManager.php | 14 ++- .../Console/QueueTaskConsole.php | 2 +- 7 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php create mode 100644 src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php diff --git a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php index 2e46bd031e..16eb58026e 100644 --- a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php +++ b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php @@ -17,6 +17,8 @@ use Spryker\Zed\Queue\Business\Reader\QueueConfigReaderInterface; use Spryker\Zed\Queue\Business\SignalHandler\QueueWorkerSignalDispatcher; use Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface; +use Spryker\Zed\Queue\Business\Task\TaskDebugHelper; +use Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface; use Spryker\Zed\Queue\Business\Task\TaskManager; use Spryker\Zed\Queue\Business\Worker\Worker; use Spryker\Zed\Queue\Business\Worker\WorkerProgressBar; @@ -36,15 +38,18 @@ class QueueBusinessFactory extends AbstractBusinessFactory protected static $serverUniqueId; /** + * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * * @return \Spryker\Zed\Queue\Business\Task\TaskManager */ - public function createTask() + public function createTask(?OutputInterface $output = null) { return new TaskManager( $this->getQueueClient(), $this->getConfig(), $this->createTaskMemoryUsageChecker(), $this->getProcessorMessagePlugins(), + $this->createTaskDebugHelper($output), ); } @@ -78,6 +83,16 @@ public function createProcessManager() ); } + /** + * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * + * @return \Pyz\Zed\Queue\Business\Task\TaskDebugHelperInterface + */ + public function createTaskDebugHelper(?OutputInterface $output = null): TaskDebugHelperInterface + { + return new TaskDebugHelper($output); + } + /** * @param \Symfony\Component\Console\Output\OutputInterface $output * diff --git a/src/Spryker/Zed/Queue/Business/QueueFacade.php b/src/Spryker/Zed/Queue/Business/QueueFacade.php index 1baeecf3b9..70c4593d51 100644 --- a/src/Spryker/Zed/Queue/Business/QueueFacade.php +++ b/src/Spryker/Zed/Queue/Business/QueueFacade.php @@ -35,6 +35,24 @@ public function startTask($queueName, array $options = []) ->run($queueName, $options); } + /** + * {@inheritDoc} + * + * @api + * + * @param string $queueName + * @param \Symfony\Component\Console\Output\OutputInterface $output + * @param array $options + * + * @return void + */ + public function startTaskWithOutput($queueName, OutputInterface $output, array $options = []) + { + $this->getFactory() + ->createTask($output) + ->run($queueName, $options); + } + /** * {@inheritDoc} * diff --git a/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php b/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php index be276ea7ac..d787334360 100644 --- a/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php +++ b/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php @@ -28,6 +28,21 @@ interface QueueFacadeInterface */ public function startTask($queueName, array $options = []); + /** + * Specification: + * - Starts receiving and processing messages task for one specific queue. + * - Outputs the result to the console if debugging mode is activated + * + * @api + * + * @param string $queueName + * @param \Symfony\Component\Console\Output\OutputInterface $output + * @param array $options + * + * @return void + */ + public function startTaskWithOutput($queueName, OutputInterface $output, array $options = []); + /** * Specification: * - Starts receiving and processing messages task for one specific queue. diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php new file mode 100644 index 0000000000..68a76e5c85 --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php @@ -0,0 +1,110 @@ +output = $output; + } + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * + * @return void + */ + public function startMessages(array $messages): void + { + if (!$this->output) { + return; + } + + if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->startTime = microtime(true); + + $this->output->writeln('Start processing messages'); + $this->output->writeln('Messages: ' . count($messages)); + foreach ($messages as $i => $message) { + $this->output->writeln(''); + $this->output->writeln('Message #' . $i); + $this->output->writeln($this->prettifyJson($message->getQueueMessage()->getBody())); + } + } + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * + * @return void + */ + public function finishMessages(array $messages): void + { + if (!$this->output) { + return; + } + + if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->output->writeln(''); + $this->output->writeln('Finish processing messages'); + $this->output->writeln('Processed messages: ' . count($messages)); + $this->output->writeln('Processing time: ' . (microtime(true) - $this->startTime) . 's'); + + foreach ($messages as $i => $message) { + if ($message->getHasError()) { + $this->output->writeln(''); + $this->output->writeln('Error in message #' . $i); + + $messageBody = json_decode($message->getQueueMessage()->getBody(), true); + + $this->output->writeln('Error message: ' . $messageBody['errorMessage'] ?? 'unknown'); + } + } + } + + /** + * @param string $json + * + * @throws \Exception + * + * @return string + */ + private function prettifyJson(string $json): string + { + $decodedJson = json_decode($json, false); + + if (json_last_error()) { + throw new Exception( + 'Cannot prettify invalid json', + ); + } + + return json_encode($decodedJson, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); + } +} diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php new file mode 100644 index 0000000000..701ad5ef1d --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php @@ -0,0 +1,25 @@ + $messages + * + * @return void + */ + public function startMessages(array $messages): void; + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * + * @return void + */ + public function finishMessages(array $messages): void; +} diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php index 8697d8774a..f1a6c45a51 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php @@ -37,22 +37,30 @@ class TaskManager implements TaskManagerInterface */ protected $messageProcessorPlugins; + /** + * @var \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface + */ + protected TaskDebugHelperInterface $taskDebugHelper; + /** * @param \Spryker\Client\Queue\QueueClientInterface $client * @param \Spryker\Zed\Queue\QueueConfig $queueConfig * @param \Spryker\Zed\Queue\Business\Checker\TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker * @param array<\Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface> $messageProcessorPlugins + * @param \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface $taskDebugHelper */ public function __construct( QueueClientInterface $client, QueueConfig $queueConfig, TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker, - array $messageProcessorPlugins + array $messageProcessorPlugins, + TaskDebugHelperInterface $taskDebugHelper ) { $this->client = $client; $this->queueConfig = $queueConfig; $this->taskMemoryUsageChecker = $taskMemoryUsageChecker; $this->messageProcessorPlugins = $messageProcessorPlugins; + $this->taskDebugHelper = $taskDebugHelper; } /** @@ -81,8 +89,12 @@ public function run($queueName, array $options = []): QueueTaskResponseTransfer $this->taskMemoryUsageChecker->check($queueName, $messages, $chunkSize); $queueTaskResponseTransfer->setReceivedMessageCount(count($messages)); + $this->taskDebugHelper->startMessages($messages); + $processedMessages = $processorPlugin->processMessages($messages); + $this->taskDebugHelper->finishMessages($messages); + if (!$processedMessages) { $queueTaskResponseTransfer->setMessage(sprintf( 'No messages processed from the queue "%s". Wether there is nothing to process or something failed while processing.', diff --git a/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php b/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php index b70d6ba66a..79a78fbc12 100644 --- a/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php +++ b/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php @@ -68,7 +68,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** @var string $name */ $name = $input->getArgument('queue'); - $this->getFacade()->startTask($name, $options); + $this->getFacade()->startTaskWithOutput($name, $output, $options); return static::CODE_SUCCESS; } From 7269185abc43995a76501249846fd4bac0f6b59d Mon Sep 17 00:00:00 2001 From: Evgeny Nekhamkin Date: Fri, 18 Oct 2024 20:36:35 +0200 Subject: [PATCH 2/6] Introducing debug mode for queue:worker:start console command --- .../Queue/Business/QueueBusinessFactory.php | 15 +++++- .../Queue/Business/Task/TaskDebugHelper.php | 10 ++-- .../Task/TaskDebugHelperInterface.php | 6 ++- .../Zed/Queue/Business/Task/TaskManager.php | 4 +- .../Zed/Queue/Business/Worker/Worker.php | 25 +++++++++ .../Business/Worker/WorkerDebugHelper.php | 53 +++++++++++++++++++ .../Worker/WorkerDebugHelperInterface.php | 27 ++++++++++ .../Business/Worker/WorkerProgressBar.php | 5 ++ .../Console/QueueWorkerConsole.php | 3 +- 9 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php create mode 100644 src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php diff --git a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php index 16eb58026e..a20e852811 100644 --- a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php +++ b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php @@ -21,6 +21,8 @@ use Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface; use Spryker\Zed\Queue\Business\Task\TaskManager; use Spryker\Zed\Queue\Business\Worker\Worker; +use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelper; +use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface; use Spryker\Zed\Queue\Business\Worker\WorkerProgressBar; use Spryker\Zed\Queue\Dependency\Service\QueueToUtilEncodingServiceInterface; use Spryker\Zed\Queue\QueueDependencyProvider; @@ -64,6 +66,7 @@ public function createWorker(OutputInterface $output) $this->createProcessManager(), $this->getConfig(), $this->createWorkerProgressbar($output), + $this->createWorkerDebugHelper($output), $this->getQueueClient(), $this->getQueueNames(), $this->createQueueWorkerSignalDispatcher(), @@ -86,13 +89,23 @@ public function createProcessManager() /** * @param \Symfony\Component\Console\Output\OutputInterface|null $output * - * @return \Pyz\Zed\Queue\Business\Task\TaskDebugHelperInterface + * @return \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface */ public function createTaskDebugHelper(?OutputInterface $output = null): TaskDebugHelperInterface { return new TaskDebugHelper($output); } + /** + * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * + * @return \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface + */ + public function createWorkerDebugHelper(OutputInterface $output): WorkerDebugHelperInterface + { + return new WorkerDebugHelper($output); + } + /** * @param \Symfony\Component\Console\Output\OutputInterface $output * diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php index 68a76e5c85..b6288f18ff 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php @@ -32,10 +32,11 @@ public function __construct(?OutputInterface $output = null) /** * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName * * @return void */ - public function startMessages(array $messages): void + public function startMessages(array $messages, string $queueName): void { if (!$this->output) { return; @@ -47,7 +48,7 @@ public function startMessages(array $messages): void $this->startTime = microtime(true); - $this->output->writeln('Start processing messages'); + $this->output->writeln('Start processing messages for queue "' . $queueName . '"'); $this->output->writeln('Messages: ' . count($messages)); foreach ($messages as $i => $message) { $this->output->writeln(''); @@ -58,10 +59,11 @@ public function startMessages(array $messages): void /** * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName * * @return void */ - public function finishMessages(array $messages): void + public function finishMessages(array $messages, string $queueName): void { if (!$this->output) { return; @@ -73,7 +75,7 @@ public function finishMessages(array $messages): void $this->output->writeln(''); $this->output->writeln('Finish processing messages'); - $this->output->writeln('Processed messages: ' . count($messages)); + $this->output->writeln('Finish processing messages for queue "' . $queueName . '"'); $this->output->writeln('Processing time: ' . (microtime(true) - $this->startTime) . 's'); foreach ($messages as $i => $message) { diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php index 701ad5ef1d..3998b5ab5d 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php @@ -11,15 +11,17 @@ interface TaskDebugHelperInterface { /** * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName * * @return void */ - public function startMessages(array $messages): void; + public function startMessages(array $messages, string $queueName): void; /** * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName * * @return void */ - public function finishMessages(array $messages): void; + public function finishMessages(array $messages, string $queueName): void; } diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php index f1a6c45a51..9f74285b8c 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php @@ -89,11 +89,11 @@ public function run($queueName, array $options = []): QueueTaskResponseTransfer $this->taskMemoryUsageChecker->check($queueName, $messages, $chunkSize); $queueTaskResponseTransfer->setReceivedMessageCount(count($messages)); - $this->taskDebugHelper->startMessages($messages); + $this->taskDebugHelper->startMessages($messages, $queueName); $processedMessages = $processorPlugin->processMessages($messages); - $this->taskDebugHelper->finishMessages($messages); + $this->taskDebugHelper->finishMessages($messages, $queueName); if (!$processedMessages) { $queueTaskResponseTransfer->setMessage(sprintf( diff --git a/src/Spryker/Zed/Queue/Business/Worker/Worker.php b/src/Spryker/Zed/Queue/Business/Worker/Worker.php index 6a7a10a00c..96921b5572 100644 --- a/src/Spryker/Zed/Queue/Business/Worker/Worker.php +++ b/src/Spryker/Zed/Queue/Business/Worker/Worker.php @@ -64,6 +64,11 @@ class Worker implements WorkerInterface */ protected $workerProgressBar; + /** + * @var \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface + */ + protected $workerDebugHelper; + /** * @var \Spryker\Client\Queue\QueueClientInterface */ @@ -93,6 +98,7 @@ class Worker implements WorkerInterface * @param \Spryker\Zed\Queue\Business\Process\ProcessManagerInterface $processManager * @param \Spryker\Zed\Queue\QueueConfig $queueConfig * @param \Spryker\Zed\Queue\Business\Worker\WorkerProgressBarInterface $workerProgressBar + * @param \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface $workerDebugHelper * @param \Spryker\Client\Queue\QueueClientInterface $queueClient * @param array $queueNames * @param \Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface $signalDispatcher @@ -103,6 +109,7 @@ public function __construct( ProcessManagerInterface $processManager, QueueConfig $queueConfig, WorkerProgressBarInterface $workerProgressBar, + WorkerDebugHelperInterface $workerDebugHelper, QueueClientInterface $queueClient, array $queueNames, SignalDispatcherInterface $signalDispatcher, @@ -111,6 +118,7 @@ public function __construct( ) { $this->processManager = $processManager; $this->workerProgressBar = $workerProgressBar; + $this->workerDebugHelper = $workerDebugHelper; $this->queueConfig = $queueConfig; $this->queueClient = $queueClient; $this->queueNames = $queueNames; @@ -141,6 +149,7 @@ public function start(string $command, array $options = [], int $round = 1, arra while ($this->continueExecution($totalPassedSeconds, $maxThreshold, $options)) { $processes = array_merge($this->executeOperation($command), $processes); + $this->writeWorkerDebugOutput($processes); $pendingProcesses = $this->getPendingProcesses($processes); if ($this->isEmptyQueue($pendingProcesses, $options)) { @@ -161,6 +170,21 @@ public function start(string $command, array $options = [], int $round = 1, arra $this->waitForPendingProcesses($pendingProcesses, $command, $round, $delayIntervalMilliseconds, $options); } + /** + * @param array $processes + * + * @return void + */ + protected function writeWorkerDebugOutput(array $processes): void + { + foreach ($processes as $process) { + // TODO remove died processes from the loop + if ($process->isTerminated()) { + $this->workerDebugHelper->writeOutput($process->getIncrementalOutput()); + } + } + } + /** * @param int $totalPassedSeconds * @param int $maxThreshold @@ -298,6 +322,7 @@ protected function startProcesses(string $command, string $queue): array $this->queueClient->reject($message); for ($i = 0; $i < $numberOfWorkers; $i++) { usleep((int)$this->queueConfig->getQueueProcessTriggerInterval()); + $this->workerDebugHelper->writeQueueProcessStarted($queue); $processes[] = $this->processManager->triggerQueueProcess($command, $queue); } } else { diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php new file mode 100644 index 0000000000..e1726c9c52 --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php @@ -0,0 +1,53 @@ +output->getVerbosity() !== OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->output->writeln(sprintf('Start processing queue "%s"', $queue)); + $this->output->writeln(''); + } + + /** + * @param string $output + * + * @return void + */ + public function writeOutput(string $output): void + { + if ($this->output->getVerbosity() !== OutputInterface::VERBOSITY_DEBUG) { + return; + } + + if (empty($output)) { + return; + } + + $this->output->writeln($output); + } +} diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php new file mode 100644 index 0000000000..0d6dd5c518 --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php @@ -0,0 +1,27 @@ +output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) { + return; + } + $this->progressBar = $this->createProgressBar($steps); $this->progressBar->setFormatDefinition('queue', '%message% %current%/%max% sec [%bar%] %percent:3s%%'); $this->progressBar->setFormat('queue'); diff --git a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php index b1e2d6ec09..e8431aad20 100644 --- a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php +++ b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php @@ -40,7 +40,8 @@ class QueueWorkerConsole extends Console */ public const OPTION_STOP_WHEN_EMPTY_SHORT = 's'; - public const QUEUE_RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console queue:task:start'; + // TODO provide dynamic logic to append -vvv only if necessary + public const QUEUE_RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console queue:task:start -vvv'; /** * @return void From 5b42d15ff3159bf46f08185ffad7c50be935a4fb Mon Sep 17 00:00:00 2001 From: Evgeny Nekhamkin Date: Sat, 19 Oct 2024 19:35:00 +0200 Subject: [PATCH 3/6] Add verbose debug mode to queue runner command only if queue worker console was started in debug mode --- .../Console/QueueWorkerConsole.php | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php index e8431aad20..809b74b03e 100644 --- a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php +++ b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php @@ -40,8 +40,9 @@ class QueueWorkerConsole extends Console */ public const OPTION_STOP_WHEN_EMPTY_SHORT = 's'; - // TODO provide dynamic logic to append -vvv only if necessary - public const QUEUE_RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console queue:task:start -vvv'; + public const QUEUE_RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console queue:task:start'; + + public const VERBOSITY_DEBUG_MODE = '-vvv'; /** * @return void @@ -68,8 +69,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int QueueConfig::CONFIG_WORKER_STOP_WHEN_EMPTY => $input->getOption(static::OPTION_STOP_WHEN_EMPTY), ]; - $this->getFacade()->startWorker(static::QUEUE_RUNNER_COMMAND, $output, $options); + $this->getFacade()->startWorker($this->getQueueRunnerCommand(), $output, $options); return static::CODE_SUCCESS; } + + /** + * @return string + */ + protected function getQueueRunnerCommand(): string + { + if ($this->output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) { + return static::QUEUE_RUNNER_COMMAND . ' ' . static::VERBOSITY_DEBUG_MODE; + } + + return static::QUEUE_RUNNER_COMMAND; + } } From a57c78229c3581eb392f8a9195d27d656478283b Mon Sep 17 00:00:00 2001 From: Evgeny Nekhamkin Date: Mon, 21 Oct 2024 10:46:17 +0200 Subject: [PATCH 4/6] Introducing logProcessTermination method for WorkerDebugHelper --- .../Queue/Business/Task/TaskDebugHelper.php | 1 - .../Zed/Queue/Business/Worker/Worker.php | 18 ++++++++++++------ .../Business/Worker/WorkerDebugHelper.php | 8 +++++--- .../Worker/WorkerDebugHelperInterface.php | 6 ++++-- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php index b6288f18ff..5c6b62ed1a 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php @@ -74,7 +74,6 @@ public function finishMessages(array $messages, string $queueName): void } $this->output->writeln(''); - $this->output->writeln('Finish processing messages'); $this->output->writeln('Finish processing messages for queue "' . $queueName . '"'); $this->output->writeln('Processing time: ' . (microtime(true) - $this->startTime) . 's'); diff --git a/src/Spryker/Zed/Queue/Business/Worker/Worker.php b/src/Spryker/Zed/Queue/Business/Worker/Worker.php index 96921b5572..e2fb6f8788 100644 --- a/src/Spryker/Zed/Queue/Business/Worker/Worker.php +++ b/src/Spryker/Zed/Queue/Business/Worker/Worker.php @@ -149,7 +149,7 @@ public function start(string $command, array $options = [], int $round = 1, arra while ($this->continueExecution($totalPassedSeconds, $maxThreshold, $options)) { $processes = array_merge($this->executeOperation($command), $processes); - $this->writeWorkerDebugOutput($processes); + $processes = $this->removeTerminatedProcesses($processes); $pendingProcesses = $this->getPendingProcesses($processes); if ($this->isEmptyQueue($pendingProcesses, $options)) { @@ -171,20 +171,26 @@ public function start(string $command, array $options = [], int $round = 1, arra } /** - * @param array $processes + * @param array<\Symfony\Component\Process\Process> $processes * - * @return void + * @return array */ - protected function writeWorkerDebugOutput(array $processes): void + protected function removeTerminatedProcesses(array $processes): array { + $notTerminatedProcesses = []; + foreach ($processes as $process) { - // TODO remove died processes from the loop if ($process->isTerminated()) { - $this->workerDebugHelper->writeOutput($process->getIncrementalOutput()); + $this->workerDebugHelper->logProcessTermination($process); + } else { + $notTerminatedProcesses[] = $process; } } + + return $notTerminatedProcesses; } + /** * @param int $totalPassedSeconds * @param int $maxThreshold diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php index e1726c9c52..366c61fc91 100644 --- a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php @@ -8,6 +8,7 @@ namespace Spryker\Zed\Queue\Business\Worker; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Process\Process; class WorkerDebugHelper implements WorkerDebugHelperInterface { @@ -34,16 +35,17 @@ public function writeQueueProcessStarted(string $queue): void } /** - * @param string $output - * + * @param Process $process + * * @return void */ - public function writeOutput(string $output): void + public function logProcessTermination(Process $process): void { if ($this->output->getVerbosity() !== OutputInterface::VERBOSITY_DEBUG) { return; } + $output = $process->getIncrementalOutput(); if (empty($output)) { return; } diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php index 0d6dd5c518..842afe9d78 100644 --- a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php @@ -8,6 +8,7 @@ namespace Spryker\Zed\Queue\Business\Worker; use Generated\Shared\Transfer\QueueReceiveMessageTransfer; +use Symfony\Component\Process\Process; interface WorkerDebugHelperInterface { @@ -19,9 +20,10 @@ interface WorkerDebugHelperInterface public function writeQueueProcessStarted(string $queue): void; /** - * @param string $output + * @param Process $process * * @return void */ - public function writeOutput(string $output): void; + public function logProcessTermination(Process $process): void; + } From 1a8215f412cc5a5ab68a2a0ebc7bc7609c943f13 Mon Sep 17 00:00:00 2001 From: Evgeny Nekhamkin Date: Mon, 21 Oct 2024 11:02:08 +0200 Subject: [PATCH 5/6] Fix comment for QueueBusinessFactory --- src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php index a20e852811..8b8afc35a2 100644 --- a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php +++ b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php @@ -97,7 +97,7 @@ public function createTaskDebugHelper(?OutputInterface $output = null): TaskDebu } /** - * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * @param \Symfony\Component\Console\Output\OutputInterface $output * * @return \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface */ From 1cb42b495a458edba25ee3bb48eca4afce397604 Mon Sep 17 00:00:00 2001 From: Evgeny Nekhamkin Date: Mon, 21 Oct 2024 11:26:58 +0200 Subject: [PATCH 6/6] Add contribution-license-agreement.txt --- contribution-license-agreement.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 contribution-license-agreement.txt diff --git a/contribution-license-agreement.txt b/contribution-license-agreement.txt new file mode 100644 index 0000000000..636cf9cb25 --- /dev/null +++ b/contribution-license-agreement.txt @@ -0,0 +1 @@ +I hereby agree to Spryker\'s Contribution License Agreement in https://github.com/spryker/queue/blob/9f04d3171cc1db0e12132e4d4c4dc162a67bc363/CONTRIBUTING.md. \ No newline at end of file