diff --git a/contribution-license-agreement.txt b/contribution-license-agreement.txt new file mode 100644 index 0000000000..636cf9cb25 --- /dev/null +++ b/contribution-license-agreement.txt @@ -0,0 +1 @@ +I hereby agree to Spryker\'s Contribution License Agreement in https://github.com/spryker/queue/blob/9f04d3171cc1db0e12132e4d4c4dc162a67bc363/CONTRIBUTING.md. \ No newline at end of file diff --git a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php index 2e46bd031e..8b8afc35a2 100644 --- a/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php +++ b/src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php @@ -17,8 +17,12 @@ use Spryker\Zed\Queue\Business\Reader\QueueConfigReaderInterface; use Spryker\Zed\Queue\Business\SignalHandler\QueueWorkerSignalDispatcher; use Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface; +use Spryker\Zed\Queue\Business\Task\TaskDebugHelper; +use Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface; use Spryker\Zed\Queue\Business\Task\TaskManager; use Spryker\Zed\Queue\Business\Worker\Worker; +use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelper; +use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface; use Spryker\Zed\Queue\Business\Worker\WorkerProgressBar; use Spryker\Zed\Queue\Dependency\Service\QueueToUtilEncodingServiceInterface; use Spryker\Zed\Queue\QueueDependencyProvider; @@ -36,15 +40,18 @@ class QueueBusinessFactory extends AbstractBusinessFactory protected static $serverUniqueId; /** + * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * * @return \Spryker\Zed\Queue\Business\Task\TaskManager */ - public function createTask() + public function createTask(?OutputInterface $output = null) { return new TaskManager( $this->getQueueClient(), $this->getConfig(), $this->createTaskMemoryUsageChecker(), $this->getProcessorMessagePlugins(), + $this->createTaskDebugHelper($output), ); } @@ -59,6 +66,7 @@ public function createWorker(OutputInterface $output) $this->createProcessManager(), $this->getConfig(), $this->createWorkerProgressbar($output), + $this->createWorkerDebugHelper($output), $this->getQueueClient(), $this->getQueueNames(), $this->createQueueWorkerSignalDispatcher(), @@ -78,6 +86,26 @@ public function createProcessManager() ); } + /** + * @param \Symfony\Component\Console\Output\OutputInterface|null $output + * + * @return \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface + */ + public function createTaskDebugHelper(?OutputInterface $output = null): TaskDebugHelperInterface + { + return new TaskDebugHelper($output); + } + + /** + * @param \Symfony\Component\Console\Output\OutputInterface $output + * + * @return \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface + */ + public function createWorkerDebugHelper(OutputInterface $output): WorkerDebugHelperInterface + { + return new WorkerDebugHelper($output); + } + /** * @param \Symfony\Component\Console\Output\OutputInterface $output * diff --git a/src/Spryker/Zed/Queue/Business/QueueFacade.php b/src/Spryker/Zed/Queue/Business/QueueFacade.php index 1baeecf3b9..70c4593d51 100644 --- a/src/Spryker/Zed/Queue/Business/QueueFacade.php +++ b/src/Spryker/Zed/Queue/Business/QueueFacade.php @@ -35,6 +35,24 @@ public function startTask($queueName, array $options = []) ->run($queueName, $options); } + /** + * {@inheritDoc} + * + * @api + * + * @param string $queueName + * @param \Symfony\Component\Console\Output\OutputInterface $output + * @param array $options + * + * @return void + */ + public function startTaskWithOutput($queueName, OutputInterface $output, array $options = []) + { + $this->getFactory() + ->createTask($output) + ->run($queueName, $options); + } + /** * {@inheritDoc} * diff --git a/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php b/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php index be276ea7ac..d787334360 100644 --- a/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php +++ b/src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php @@ -28,6 +28,21 @@ interface QueueFacadeInterface */ public function startTask($queueName, array $options = []); + /** + * Specification: + * - Starts receiving and processing messages task for one specific queue. + * - Outputs the result to the console if debugging mode is activated + * + * @api + * + * @param string $queueName + * @param \Symfony\Component\Console\Output\OutputInterface $output + * @param array $options + * + * @return void + */ + public function startTaskWithOutput($queueName, OutputInterface $output, array $options = []); + /** * Specification: * - Starts receiving and processing messages task for one specific queue. diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php new file mode 100644 index 0000000000..5c6b62ed1a --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php @@ -0,0 +1,111 @@ +output = $output; + } + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName + * + * @return void + */ + public function startMessages(array $messages, string $queueName): void + { + if (!$this->output) { + return; + } + + if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->startTime = microtime(true); + + $this->output->writeln('Start processing messages for queue "' . $queueName . '"'); + $this->output->writeln('Messages: ' . count($messages)); + foreach ($messages as $i => $message) { + $this->output->writeln(''); + $this->output->writeln('Message #' . $i); + $this->output->writeln($this->prettifyJson($message->getQueueMessage()->getBody())); + } + } + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName + * + * @return void + */ + public function finishMessages(array $messages, string $queueName): void + { + if (!$this->output) { + return; + } + + if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->output->writeln(''); + $this->output->writeln('Finish processing messages for queue "' . $queueName . '"'); + $this->output->writeln('Processing time: ' . (microtime(true) - $this->startTime) . 's'); + + foreach ($messages as $i => $message) { + if ($message->getHasError()) { + $this->output->writeln(''); + $this->output->writeln('Error in message #' . $i); + + $messageBody = json_decode($message->getQueueMessage()->getBody(), true); + + $this->output->writeln('Error message: ' . $messageBody['errorMessage'] ?? 'unknown'); + } + } + } + + /** + * @param string $json + * + * @throws \Exception + * + * @return string + */ + private function prettifyJson(string $json): string + { + $decodedJson = json_decode($json, false); + + if (json_last_error()) { + throw new Exception( + 'Cannot prettify invalid json', + ); + } + + return json_encode($decodedJson, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE); + } +} diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php new file mode 100644 index 0000000000..3998b5ab5d --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php @@ -0,0 +1,27 @@ + $messages + * @param string $queueName + * + * @return void + */ + public function startMessages(array $messages, string $queueName): void; + + /** + * @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages + * @param string $queueName + * + * @return void + */ + public function finishMessages(array $messages, string $queueName): void; +} diff --git a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php index 8697d8774a..9f74285b8c 100644 --- a/src/Spryker/Zed/Queue/Business/Task/TaskManager.php +++ b/src/Spryker/Zed/Queue/Business/Task/TaskManager.php @@ -37,22 +37,30 @@ class TaskManager implements TaskManagerInterface */ protected $messageProcessorPlugins; + /** + * @var \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface + */ + protected TaskDebugHelperInterface $taskDebugHelper; + /** * @param \Spryker\Client\Queue\QueueClientInterface $client * @param \Spryker\Zed\Queue\QueueConfig $queueConfig * @param \Spryker\Zed\Queue\Business\Checker\TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker * @param array<\Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface> $messageProcessorPlugins + * @param \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface $taskDebugHelper */ public function __construct( QueueClientInterface $client, QueueConfig $queueConfig, TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker, - array $messageProcessorPlugins + array $messageProcessorPlugins, + TaskDebugHelperInterface $taskDebugHelper ) { $this->client = $client; $this->queueConfig = $queueConfig; $this->taskMemoryUsageChecker = $taskMemoryUsageChecker; $this->messageProcessorPlugins = $messageProcessorPlugins; + $this->taskDebugHelper = $taskDebugHelper; } /** @@ -81,8 +89,12 @@ public function run($queueName, array $options = []): QueueTaskResponseTransfer $this->taskMemoryUsageChecker->check($queueName, $messages, $chunkSize); $queueTaskResponseTransfer->setReceivedMessageCount(count($messages)); + $this->taskDebugHelper->startMessages($messages, $queueName); + $processedMessages = $processorPlugin->processMessages($messages); + $this->taskDebugHelper->finishMessages($messages, $queueName); + if (!$processedMessages) { $queueTaskResponseTransfer->setMessage(sprintf( 'No messages processed from the queue "%s". Wether there is nothing to process or something failed while processing.', diff --git a/src/Spryker/Zed/Queue/Business/Worker/Worker.php b/src/Spryker/Zed/Queue/Business/Worker/Worker.php index 6a7a10a00c..e2fb6f8788 100644 --- a/src/Spryker/Zed/Queue/Business/Worker/Worker.php +++ b/src/Spryker/Zed/Queue/Business/Worker/Worker.php @@ -64,6 +64,11 @@ class Worker implements WorkerInterface */ protected $workerProgressBar; + /** + * @var \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface + */ + protected $workerDebugHelper; + /** * @var \Spryker\Client\Queue\QueueClientInterface */ @@ -93,6 +98,7 @@ class Worker implements WorkerInterface * @param \Spryker\Zed\Queue\Business\Process\ProcessManagerInterface $processManager * @param \Spryker\Zed\Queue\QueueConfig $queueConfig * @param \Spryker\Zed\Queue\Business\Worker\WorkerProgressBarInterface $workerProgressBar + * @param \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface $workerDebugHelper * @param \Spryker\Client\Queue\QueueClientInterface $queueClient * @param array $queueNames * @param \Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface $signalDispatcher @@ -103,6 +109,7 @@ public function __construct( ProcessManagerInterface $processManager, QueueConfig $queueConfig, WorkerProgressBarInterface $workerProgressBar, + WorkerDebugHelperInterface $workerDebugHelper, QueueClientInterface $queueClient, array $queueNames, SignalDispatcherInterface $signalDispatcher, @@ -111,6 +118,7 @@ public function __construct( ) { $this->processManager = $processManager; $this->workerProgressBar = $workerProgressBar; + $this->workerDebugHelper = $workerDebugHelper; $this->queueConfig = $queueConfig; $this->queueClient = $queueClient; $this->queueNames = $queueNames; @@ -141,6 +149,7 @@ public function start(string $command, array $options = [], int $round = 1, arra while ($this->continueExecution($totalPassedSeconds, $maxThreshold, $options)) { $processes = array_merge($this->executeOperation($command), $processes); + $processes = $this->removeTerminatedProcesses($processes); $pendingProcesses = $this->getPendingProcesses($processes); if ($this->isEmptyQueue($pendingProcesses, $options)) { @@ -161,6 +170,27 @@ public function start(string $command, array $options = [], int $round = 1, arra $this->waitForPendingProcesses($pendingProcesses, $command, $round, $delayIntervalMilliseconds, $options); } + /** + * @param array<\Symfony\Component\Process\Process> $processes + * + * @return array + */ + protected function removeTerminatedProcesses(array $processes): array + { + $notTerminatedProcesses = []; + + foreach ($processes as $process) { + if ($process->isTerminated()) { + $this->workerDebugHelper->logProcessTermination($process); + } else { + $notTerminatedProcesses[] = $process; + } + } + + return $notTerminatedProcesses; + } + + /** * @param int $totalPassedSeconds * @param int $maxThreshold @@ -298,6 +328,7 @@ protected function startProcesses(string $command, string $queue): array $this->queueClient->reject($message); for ($i = 0; $i < $numberOfWorkers; $i++) { usleep((int)$this->queueConfig->getQueueProcessTriggerInterval()); + $this->workerDebugHelper->writeQueueProcessStarted($queue); $processes[] = $this->processManager->triggerQueueProcess($command, $queue); } } else { diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php new file mode 100644 index 0000000000..366c61fc91 --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelper.php @@ -0,0 +1,55 @@ +output->getVerbosity() !== OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $this->output->writeln(sprintf('Start processing queue "%s"', $queue)); + $this->output->writeln(''); + } + + /** + * @param Process $process + * + * @return void + */ + public function logProcessTermination(Process $process): void + { + if ($this->output->getVerbosity() !== OutputInterface::VERBOSITY_DEBUG) { + return; + } + + $output = $process->getIncrementalOutput(); + if (empty($output)) { + return; + } + + $this->output->writeln($output); + } +} diff --git a/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php new file mode 100644 index 0000000000..842afe9d78 --- /dev/null +++ b/src/Spryker/Zed/Queue/Business/Worker/WorkerDebugHelperInterface.php @@ -0,0 +1,29 @@ +output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) { + return; + } + $this->progressBar = $this->createProgressBar($steps); $this->progressBar->setFormatDefinition('queue', '%message% %current%/%max% sec [%bar%] %percent:3s%%'); $this->progressBar->setFormat('queue'); diff --git a/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php b/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php index b70d6ba66a..79a78fbc12 100644 --- a/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php +++ b/src/Spryker/Zed/Queue/Communication/Console/QueueTaskConsole.php @@ -68,7 +68,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** @var string $name */ $name = $input->getArgument('queue'); - $this->getFacade()->startTask($name, $options); + $this->getFacade()->startTaskWithOutput($name, $output, $options); return static::CODE_SUCCESS; } diff --git a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php index b1e2d6ec09..809b74b03e 100644 --- a/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php +++ b/src/Spryker/Zed/Queue/Communication/Console/QueueWorkerConsole.php @@ -42,6 +42,8 @@ class QueueWorkerConsole extends Console public const QUEUE_RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console queue:task:start'; + public const VERBOSITY_DEBUG_MODE = '-vvv'; + /** * @return void */ @@ -67,8 +69,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int QueueConfig::CONFIG_WORKER_STOP_WHEN_EMPTY => $input->getOption(static::OPTION_STOP_WHEN_EMPTY), ]; - $this->getFacade()->startWorker(static::QUEUE_RUNNER_COMMAND, $output, $options); + $this->getFacade()->startWorker($this->getQueueRunnerCommand(), $output, $options); return static::CODE_SUCCESS; } + + /** + * @return string + */ + protected function getQueueRunnerCommand(): string + { + if ($this->output->getVerbosity() === OutputInterface::VERBOSITY_DEBUG) { + return static::QUEUE_RUNNER_COMMAND . ' ' . static::VERBOSITY_DEBUG_MODE; + } + + return static::QUEUE_RUNNER_COMMAND; + } }