Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug mode for queue:worker:start and queue:task:start console commands #5

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contribution-license-agreement.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I hereby agree to Spryker\'s Contribution License Agreement in https://github.com/spryker/queue/blob/9f04d3171cc1db0e12132e4d4c4dc162a67bc363/CONTRIBUTING.md.
30 changes: 29 additions & 1 deletion src/Spryker/Zed/Queue/Business/QueueBusinessFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
use Spryker\Zed\Queue\Business\Reader\QueueConfigReaderInterface;
use Spryker\Zed\Queue\Business\SignalHandler\QueueWorkerSignalDispatcher;
use Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface;
use Spryker\Zed\Queue\Business\Task\TaskDebugHelper;
use Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface;
use Spryker\Zed\Queue\Business\Task\TaskManager;
use Spryker\Zed\Queue\Business\Worker\Worker;
use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelper;
use Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface;
use Spryker\Zed\Queue\Business\Worker\WorkerProgressBar;
use Spryker\Zed\Queue\Dependency\Service\QueueToUtilEncodingServiceInterface;
use Spryker\Zed\Queue\QueueDependencyProvider;
Expand All @@ -36,15 +40,18 @@ class QueueBusinessFactory extends AbstractBusinessFactory
protected static $serverUniqueId;

/**
* @param \Symfony\Component\Console\Output\OutputInterface|null $output
*
* @return \Spryker\Zed\Queue\Business\Task\TaskManager
*/
public function createTask()
public function createTask(?OutputInterface $output = null)
{
return new TaskManager(
$this->getQueueClient(),
$this->getConfig(),
$this->createTaskMemoryUsageChecker(),
$this->getProcessorMessagePlugins(),
$this->createTaskDebugHelper($output),
);
}

Expand All @@ -59,6 +66,7 @@ public function createWorker(OutputInterface $output)
$this->createProcessManager(),
$this->getConfig(),
$this->createWorkerProgressbar($output),
$this->createWorkerDebugHelper($output),
$this->getQueueClient(),
$this->getQueueNames(),
$this->createQueueWorkerSignalDispatcher(),
Expand All @@ -78,6 +86,26 @@ public function createProcessManager()
);
}

/**
* @param \Symfony\Component\Console\Output\OutputInterface|null $output
*
* @return \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface
*/
public function createTaskDebugHelper(?OutputInterface $output = null): TaskDebugHelperInterface
{
return new TaskDebugHelper($output);
}

/**
* @param \Symfony\Component\Console\Output\OutputInterface $output
*
* @return \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface
*/
public function createWorkerDebugHelper(OutputInterface $output): WorkerDebugHelperInterface
{
return new WorkerDebugHelper($output);
}

/**
* @param \Symfony\Component\Console\Output\OutputInterface $output
*
Expand Down
18 changes: 18 additions & 0 deletions src/Spryker/Zed/Queue/Business/QueueFacade.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ public function startTask($queueName, array $options = [])
->run($queueName, $options);
}

/**
* {@inheritDoc}
*
* @api
*
* @param string $queueName
* @param \Symfony\Component\Console\Output\OutputInterface $output
* @param array<string, mixed> $options
*
* @return void
*/
public function startTaskWithOutput($queueName, OutputInterface $output, array $options = [])
{
$this->getFactory()
->createTask($output)
->run($queueName, $options);
}

/**
* {@inheritDoc}
*
Expand Down
15 changes: 15 additions & 0 deletions src/Spryker/Zed/Queue/Business/QueueFacadeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ interface QueueFacadeInterface
*/
public function startTask($queueName, array $options = []);

/**
* Specification:
* - Starts receiving and processing messages task for one specific queue.
* - Outputs the result to the console if debugging mode is activated
*
* @api
*
* @param string $queueName
* @param \Symfony\Component\Console\Output\OutputInterface $output
* @param array<string, mixed> $options
*
* @return void
*/
public function startTaskWithOutput($queueName, OutputInterface $output, array $options = []);

/**
* Specification:
* - Starts receiving and processing messages task for one specific queue.
Expand Down
111 changes: 111 additions & 0 deletions src/Spryker/Zed/Queue/Business/Task/TaskDebugHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php

/**
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
*/

namespace Spryker\Zed\Queue\Business\Task;

use Exception;
use Symfony\Component\Console\Output\OutputInterface;

class TaskDebugHelper implements TaskDebugHelperInterface
{
/**
* @var \Symfony\Component\Console\Output\OutputInterface|null
*/
protected $output;

/**
* @var float
*/
protected $startTime;

/**
* @param \Symfony\Component\Console\Output\OutputInterface|null $output
*/
public function __construct(?OutputInterface $output = null)
{
$this->output = $output;
}

/**
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
* @param string $queueName
*
* @return void
*/
public function startMessages(array $messages, string $queueName): void
{
if (!$this->output) {
return;
}

if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) {
return;
}

$this->startTime = microtime(true);

$this->output->writeln('Start processing messages for queue "' . $queueName . '"');
$this->output->writeln('Messages: ' . count($messages));
foreach ($messages as $i => $message) {
$this->output->writeln('');
$this->output->writeln('Message #' . $i);
$this->output->writeln($this->prettifyJson($message->getQueueMessage()->getBody()));
}
}

/**
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
* @param string $queueName
*
* @return void
*/
public function finishMessages(array $messages, string $queueName): void
{
if (!$this->output) {
return;
}

if ($this->output->getVerbosity() < OutputInterface::VERBOSITY_DEBUG) {
return;
}

$this->output->writeln('');
$this->output->writeln('Finish processing messages for queue "' . $queueName . '"');
$this->output->writeln('Processing time: ' . (microtime(true) - $this->startTime) . 's');

foreach ($messages as $i => $message) {
if ($message->getHasError()) {
$this->output->writeln('');
$this->output->writeln('Error in message #' . $i);

$messageBody = json_decode($message->getQueueMessage()->getBody(), true);

$this->output->writeln('Error message: ' . $messageBody['errorMessage'] ?? 'unknown');
}
}
}

/**
* @param string $json
*
* @throws \Exception
*
* @return string
*/
private function prettifyJson(string $json): string
{
$decodedJson = json_decode($json, false);

if (json_last_error()) {
throw new Exception(
'Cannot prettify invalid json',
);
}

return json_encode($decodedJson, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
}
}
27 changes: 27 additions & 0 deletions src/Spryker/Zed/Queue/Business/Task/TaskDebugHelperInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

/**
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
*/

namespace Spryker\Zed\Queue\Business\Task;

interface TaskDebugHelperInterface
{
/**
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
* @param string $queueName
*
* @return void
*/
public function startMessages(array $messages, string $queueName): void;

/**
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
* @param string $queueName
*
* @return void
*/
public function finishMessages(array $messages, string $queueName): void;
}
14 changes: 13 additions & 1 deletion src/Spryker/Zed/Queue/Business/Task/TaskManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,30 @@ class TaskManager implements TaskManagerInterface
*/
protected $messageProcessorPlugins;

/**
* @var \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface
*/
protected TaskDebugHelperInterface $taskDebugHelper;

/**
* @param \Spryker\Client\Queue\QueueClientInterface $client
* @param \Spryker\Zed\Queue\QueueConfig $queueConfig
* @param \Spryker\Zed\Queue\Business\Checker\TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker
* @param array<\Spryker\Zed\Queue\Dependency\Plugin\QueueMessageProcessorPluginInterface> $messageProcessorPlugins
* @param \Spryker\Zed\Queue\Business\Task\TaskDebugHelperInterface $taskDebugHelper
*/
public function __construct(
QueueClientInterface $client,
QueueConfig $queueConfig,
TaskMemoryUsageCheckerInterface $taskMemoryUsageChecker,
array $messageProcessorPlugins
array $messageProcessorPlugins,
TaskDebugHelperInterface $taskDebugHelper
) {
$this->client = $client;
$this->queueConfig = $queueConfig;
$this->taskMemoryUsageChecker = $taskMemoryUsageChecker;
$this->messageProcessorPlugins = $messageProcessorPlugins;
$this->taskDebugHelper = $taskDebugHelper;
}

/**
Expand Down Expand Up @@ -81,8 +89,12 @@ public function run($queueName, array $options = []): QueueTaskResponseTransfer
$this->taskMemoryUsageChecker->check($queueName, $messages, $chunkSize);
$queueTaskResponseTransfer->setReceivedMessageCount(count($messages));

$this->taskDebugHelper->startMessages($messages, $queueName);

$processedMessages = $processorPlugin->processMessages($messages);

$this->taskDebugHelper->finishMessages($messages, $queueName);

if (!$processedMessages) {
$queueTaskResponseTransfer->setMessage(sprintf(
'No messages processed from the queue "%s". Wether there is nothing to process or something failed while processing.',
Expand Down
31 changes: 31 additions & 0 deletions src/Spryker/Zed/Queue/Business/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class Worker implements WorkerInterface
*/
protected $workerProgressBar;

/**
* @var \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface
*/
protected $workerDebugHelper;

/**
* @var \Spryker\Client\Queue\QueueClientInterface
*/
Expand Down Expand Up @@ -93,6 +98,7 @@ class Worker implements WorkerInterface
* @param \Spryker\Zed\Queue\Business\Process\ProcessManagerInterface $processManager
* @param \Spryker\Zed\Queue\QueueConfig $queueConfig
* @param \Spryker\Zed\Queue\Business\Worker\WorkerProgressBarInterface $workerProgressBar
* @param \Spryker\Zed\Queue\Business\Worker\WorkerDebugHelperInterface $workerDebugHelper
* @param \Spryker\Client\Queue\QueueClientInterface $queueClient
* @param array<string> $queueNames
* @param \Spryker\Zed\Queue\Business\SignalHandler\SignalDispatcherInterface $signalDispatcher
Expand All @@ -103,6 +109,7 @@ public function __construct(
ProcessManagerInterface $processManager,
QueueConfig $queueConfig,
WorkerProgressBarInterface $workerProgressBar,
WorkerDebugHelperInterface $workerDebugHelper,
QueueClientInterface $queueClient,
array $queueNames,
SignalDispatcherInterface $signalDispatcher,
Expand All @@ -111,6 +118,7 @@ public function __construct(
) {
$this->processManager = $processManager;
$this->workerProgressBar = $workerProgressBar;
$this->workerDebugHelper = $workerDebugHelper;
$this->queueConfig = $queueConfig;
$this->queueClient = $queueClient;
$this->queueNames = $queueNames;
Expand Down Expand Up @@ -141,6 +149,7 @@ public function start(string $command, array $options = [], int $round = 1, arra

while ($this->continueExecution($totalPassedSeconds, $maxThreshold, $options)) {
$processes = array_merge($this->executeOperation($command), $processes);
$processes = $this->removeTerminatedProcesses($processes);
$pendingProcesses = $this->getPendingProcesses($processes);

if ($this->isEmptyQueue($pendingProcesses, $options)) {
Expand All @@ -161,6 +170,27 @@ public function start(string $command, array $options = [], int $round = 1, arra
$this->waitForPendingProcesses($pendingProcesses, $command, $round, $delayIntervalMilliseconds, $options);
}

/**
* @param array<\Symfony\Component\Process\Process> $processes
*
* @return array
*/
protected function removeTerminatedProcesses(array $processes): array
{
$notTerminatedProcesses = [];

foreach ($processes as $process) {
if ($process->isTerminated()) {
$this->workerDebugHelper->logProcessTermination($process);
} else {
$notTerminatedProcesses[] = $process;
}
}

return $notTerminatedProcesses;
}


/**
* @param int $totalPassedSeconds
* @param int $maxThreshold
Expand Down Expand Up @@ -298,6 +328,7 @@ protected function startProcesses(string $command, string $queue): array
$this->queueClient->reject($message);
for ($i = 0; $i < $numberOfWorkers; $i++) {
usleep((int)$this->queueConfig->getQueueProcessTriggerInterval());
$this->workerDebugHelper->writeQueueProcessStarted($queue);
$processes[] = $this->processManager->triggerQueueProcess($command, $queue);
}
} else {
Expand Down
Loading
Loading