Skip to content

Commit

Permalink
retry inside switch message for some amount of time before rescheduli…
Browse files Browse the repository at this point in the history
…ng. removed batch handler, as it causes more problems than it solves
  • Loading branch information
rliebi committed Sep 30, 2024
1 parent d1d9767 commit 58f29a3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 59 deletions.
72 changes: 20 additions & 52 deletions src/Messenger/Handler/CreateDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Valantic\ElasticaBridgeBundle\Messenger\Message\CreateDocument;
Expand All @@ -24,10 +21,8 @@
use Valantic\ElasticaBridgeBundle\Service\LockService;

#[AsMessageHandler]
class CreateDocumentHandler implements BatchHandlerInterface
class CreateDocumentHandler
{
use BatchHandlerTrait;

public function __construct(
private readonly DocumentHelper $documentHelper,
private readonly DocumentRepository $documentRepository,
Expand All @@ -39,53 +34,9 @@ public function __construct(
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): ?int
{
return $this->handle($message, $ack);
}

/**
* @param list<array{0: object, 1: Acknowledger}> $jobs
*
* @throws MissingParameterException
* @throws ServerResponseException
* @throws \Throwable
*/
protected function process(array $jobs): void
public function __invoke(CreateDocument $message): void
{
$newBatch = true;

foreach ($jobs as [$message, $ack]) {
if ($message instanceof CreateDocument) {
if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) {
$count = $this->lockService->getCurrentCount($message->esIndex);
$this->consoleOutput->writeln(
sprintf(
'%sProcessing message of %s %s. ~ %s left. (PID: %s)',
$newBatch ? \PHP_EOL : '',
$message->esIndex,
$message->objectId,
$count,
getmypid(),
),
ConsoleOutputInterface::VERBOSITY_VERBOSE
);
$newBatch = false;
}

try {
$this->handleMessage($message);
$ack->ack();
} catch (\Throwable $e) {
$this->consoleOutput->writeln(sprintf('Error processing message of %s %s (%s)', $message->esIndex, $message->objectId, $e->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL);
$ack->nack($e);
}

continue;
}

$ack->nack(new \InvalidArgumentException('Invalid message type'));
}
$this->handleMessage($message);
}

/**
Expand All @@ -96,6 +47,21 @@ protected function process(array $jobs): void
private function handleMessage(CreateDocument $message): void
{
try {
if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) {

$count = $this->lockService->getCurrentCount($message->esIndex);
$this->consoleOutput->writeln(
sprintf(
'Processing message of %s %s. ~ %s left. (PID: %s)',
$message->esIndex,
$message->objectId,
$count,
getmypid(),
),
ConsoleOutputInterface::VERBOSITY_VERBOSE
);
}

if ($this->lockService->isExecutionLocked($message->esIndex)) {
return;
}
Expand All @@ -105,6 +71,8 @@ private function handleMessage(CreateDocument $message): void
}

if ($message->objectId === null || $message->objectType === null) {
$this->lockService->messageProcessed($message->esIndex);

return;
}

Expand Down
21 changes: 16 additions & 5 deletions src/Messenger/Handler/SwitchIndexHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,26 @@ public function __invoke(ReleaseIndexLock $message): void
$count = $this->lockService->getCurrentCount($message->indexName);
$this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);

while (!$this->lockService->allMessagesProcessed($message->indexName) && $attempt < $maxAttempts) {
$this->consoleOutput->writeln(sprintf('not all messages processed (~%s remaining), attempt %d', $count, $attempt + 1), ConsoleOutputInterface::VERBOSITY_VERBOSE);
sleep(($count * $attempt) + 15);
while (!$this->lockService->allMessagesProcessed($message->indexName, $attempt) && $attempt < $maxAttempts) {
$seconds = 3 * $attempt;
$this->consoleOutput->writeln(
sprintf(
'%s: not all messages processed (~%s remaining; ), attempt %d, trying again in %s seconds',
$message->indexName,
$count,
$attempt + 1,
$seconds,
),
ConsoleOutputInterface::VERBOSITY_VERBOSE,
);
sleep($seconds);
$attempt++;
}

if ($attempt >= $maxAttempts) {
$this->consoleOutput->writeln('Max attempts reached, rescheduling', ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->messageBus->dispatch($message->clone(), [new DelayStamp($count * 1000 * 2)]);
$delayStamp = new DelayStamp(60 * 1000);
$this->consoleOutput->writeln(sprintf('Max attempts reached, rescheduling in %s seconds', $delayStamp->getDelay() / 1000), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->messageBus->dispatch($message->clone(), [$delayStamp]);
$releaseLock = false;

return;
Expand Down
32 changes: 30 additions & 2 deletions src/Service/LockService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Valantic\ElasticaBridgeBundle\Service;

use Doctrine\DBAL\Connection;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\DependencyInjection\Attribute\Autowire;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\LockFactory;
Expand All @@ -21,6 +23,8 @@ public function __construct(
private readonly ConfigurationRepository $configurationRepository,
#[Autowire(service: 'cache.default_redis_provider')]
private readonly \Redis $redis,
private readonly Connection $connection,
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function getIndexingLock(IndexInterface $indexConfig): LockInterface
Expand Down Expand Up @@ -77,7 +81,7 @@ public function lockSwitchBlueGreen(IndexInterface $indexConfig): Key
return $key;
}

public function allMessagesProcessed(string $indexName): bool
public function allMessagesProcessed(string $indexName, int $attempt = 0): bool
{
// the count is eventually consistent.
$currentCount = $this->getCurrentCount($indexName);
Expand All @@ -92,11 +96,23 @@ public function allMessagesProcessed(string $indexName): bool
$key = $this->getKey($indexName, 'switch-blue-green');
$lock = $this->createLockFromKey($key, ttl: 0, autorelease: true);

if ($attempt > 2) {
$actualMessageCount = $this->getActualMessageCount($indexName);
$this->consoleOutput->writeln(sprintf('%s: %d attempts reached. Getting data from db. (%d => %d)', $indexName, $attempt, $currentCount, $actualMessageCount), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$currentCount = $actualMessageCount;
}

if (!$lock->acquire()) {
$this->consoleOutput->writeln('Lock is still active. Not all messages processed.', ConsoleOutputInterface::VERBOSITY_VERBOSE);

return false;
}

if ($currentCount > 0) {
return false;
}

if (!$lock->acquire()) {
if ($this->getActualMessageCount($indexName) > 0) {
return false;
}

Expand Down Expand Up @@ -136,4 +152,16 @@ public function messageDispatched(string $getName): void
$cacheKey = self::LOCK_PREFIX . $getName;
$this->redis->incr($cacheKey);
}

private function getActualMessageCount(string $indexName): int
{
$query = 'SELECT
COUNT(mm.id) AS remaining_messages
FROM messenger_messages mm
WHERE mm.queue_name = "elastica_bridge_populate"
AND mm.body LIKE CONCAT("%\\\\\"", "news_article_york", "\\\\\"%") AND mm.body LIKE "%CreateDocument%";';
$count = $this->connection->executeQuery($query, ['indexName' => $indexName])->fetchOne();

return (int) $count;
}
}

0 comments on commit 58f29a3

Please sign in to comment.