diff --git a/src/Command/Index.php b/src/Command/Index.php index cb45eda..01baa68 100644 --- a/src/Command/Index.php +++ b/src/Command/Index.php @@ -260,7 +260,6 @@ private function populateIndex(IndexInterface $indexConfig): array { self::$isPopulating = true; $messagesDispatched = 0; - $blueGreenKey = $this->lockService->lockSwitchBlueGreen($indexConfig); $this->lockService->initializeProcessCount($indexConfig->getName()); try { @@ -318,8 +317,6 @@ private function populateIndex(IndexInterface $indexConfig): array && $key === array_key_last($data ?? []) && $documentKey === array_key_last($allowedDocuments) ) { - $message = new ReleaseIndexLock($indexConfig->getName(), $blueGreenKey); - $this->messageBus->dispatch($message); $lastItem = true; $cooldown = $this->async ? $this->configurationRepository->getCooldown() : 0; } diff --git a/src/Messenger/Handler/CreateDocumentHandler.php b/src/Messenger/Handler/CreateDocumentHandler.php index 0ef75bd..4926110 100644 --- a/src/Messenger/Handler/CreateDocumentHandler.php +++ b/src/Messenger/Handler/CreateDocumentHandler.php @@ -9,11 +9,9 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Attribute\AsMessageHandler; +use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; -use Symfony\Component\Messenger\MessageBusInterface; -use Symfony\Component\Messenger\Stamp\DelayStamp; use Valantic\ElasticaBridgeBundle\Messenger\Message\CreateDocument; -use Valantic\ElasticaBridgeBundle\Messenger\Message\ReleaseIndexLock; use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository; use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository; use Valantic\ElasticaBridgeBundle\Repository\IndexRepository; @@ -29,7 +27,6 @@ public function __construct( private readonly IndexRepository $indexRepository, private readonly ConfigurationRepository $configurationRepository, private readonly LockService $lockService, - private readonly MessageBusInterface $messageBus, private readonly EventDispatcherInterface $eventDispatcher, private readonly ConsoleOutputInterface $consoleOutput, ) {} @@ -46,7 +43,13 @@ public function __invoke(CreateDocument $message): void */ private function handleMessage(CreateDocument $message): void { + $messageDecreased = false; + try { + if ($this->lockService->isExecutionLocked($message->esIndex)) { + throw new RecoverableMessageHandlingException(sprintf('Execution locked (%s: %s)', $message->esIndex, $message->objectId)); + } + if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) { $count = $this->lockService->getCurrentCount($message->esIndex); @@ -62,41 +65,46 @@ private function handleMessage(CreateDocument $message): void ); } - if ($this->lockService->isExecutionLocked($message->esIndex)) { - return; - } - if ($message->callback->shouldCallEvent()) { $this->eventDispatcher->dispatch($message->callback->getEvent(), $message->callback->getEventName()); } if ($message->objectId === null || $message->objectType === null) { $this->lockService->messageProcessed($message->esIndex); + $messageDecreased = true; return; } $documentInstance = $this->documentRepository->get($message->document); $dataObject = $message->objectType::getById($message->objectId) ?? throw new \RuntimeException('DataObject not found'); - $esDocuments = [$this->documentHelper->elementToDocument($documentInstance, $dataObject)]; - $esIndex = $this->indexRepository->flattenedGet($message->esIndex)->getBlueGreenInactiveElasticaIndex(); - if (count($esDocuments) > 0) { - try { + try { + $esDocuments = [$this->documentHelper->elementToDocument($documentInstance, $dataObject)]; + $esIndex = $this->indexRepository->flattenedGet($message->esIndex)->getBlueGreenInactiveElasticaIndex(); + + if (count($esDocuments) > 0) { $esIndex->addDocuments($esDocuments); - } catch (\Throwable $throwable) { - if (!$this->configurationRepository->shouldSkipFailingDocuments()) { - $key = $this->lockService->lockExecution($message->esIndex); - $this->messageBus->dispatch(new ReleaseIndexLock($message->esIndex, $key), [new DelayStamp(5000)]); - } + } + } catch (\Throwable $throwable) { + $this->consoleOutput->writeln(sprintf('Error processing message %s: %s', $message->esIndex, $throwable->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL); + + if (!$this->configurationRepository->shouldSkipFailingDocuments()) { + $this->lockService->lockExecution($message->esIndex); if ($this->configurationRepository->shouldPopulateAsync()) { throw new UnrecoverableMessageHandlingException($throwable->getMessage(), previous: $throwable); } } + } + $messageDecreased = true; $this->lockService->messageProcessed($message->esIndex); } finally { + if (!$messageDecreased) { + $this->consoleOutput->writeln(sprintf('Message %s not processed. (ID: %s)', $message->esIndex, $message->objectId), ConsoleOutputInterface::VERBOSITY_VERBOSE); + } + if ($message->lastItem && $message->cooldown > 0) { $key = $this->lockService->getKey($message->esIndex, 'cooldown'); $this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire(); diff --git a/src/Messenger/Handler/SwitchIndexHandler.php b/src/Messenger/Handler/SwitchIndexHandler.php index 9bb285d..616f33b 100644 --- a/src/Messenger/Handler/SwitchIndexHandler.php +++ b/src/Messenger/Handler/SwitchIndexHandler.php @@ -35,7 +35,6 @@ public function __construct( public function __invoke(ReleaseIndexLock $message): void { $releaseLock = true; - $startTime = microtime(true); $maxAttempts = 5; // Set the maximum number of attempts $attempt = 0; @@ -45,9 +44,7 @@ public function __invoke(ReleaseIndexLock $message): void } // try to switch index. If not all messages are processed this will be rescheduled. - $key = $this->lockService->getKey($message->indexName, 'switch-blue-green'); $count = $this->lockService->getCurrentCount($message->indexName); - $this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE); while (!$this->lockService->allMessagesProcessed($message->indexName, $attempt) && $attempt < $maxAttempts) { $seconds = 3 * $attempt; diff --git a/src/Service/LockService.php b/src/Service/LockService.php index e0ff336..0224670 100644 --- a/src/Service/LockService.php +++ b/src/Service/LockService.php @@ -59,6 +59,7 @@ public function createLockFromKey(Key $key, ?int $ttl = null, ?bool $autorelease public function lockExecution(string $document): Key { $key = $this->getKey($document, 'failure'); + $this->consoleOutput->writeln(sprintf('Locking execution for %s (%s)', $document, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE); $lock = $this->lockFactory->createLockFromKey($key, ttl: $this->configurationRepository->getIndexingLockTimeout(), autoRelease: false); $lock->acquire(); @@ -68,17 +69,26 @@ public function lockExecution(string $document): Key public function isExecutionLocked(string $document): bool { $key = $this->getKey($document, 'failure'); + $maxAttempts = 5; + $attempt = 0; + $isLocked = true; + + while ($attempt < $maxAttempts && $isLocked) { + $lock = $this->lockFactory->createLockFromKey($key); + $isLocked = !$lock->acquireRead(); + $lock->release(); + + if ($isLocked) { + usleep(300000); // Wait for 300 milliseconds before retrying + $attempt++; + } + } - return !$this->lockFactory->createLockFromKey($key)->acquire(); - } - - public function lockSwitchBlueGreen(IndexInterface $indexConfig): Key - { - $key = $this->getKey($indexConfig->getName(), 'switch-blue-green'); - $lock = $this->lockFactory->createLockFromKey($key, ttl: 2 * $this->configurationRepository->getIndexingLockTimeout(), autoRelease: false); - $lock->acquire(); + if ($isLocked) { + $this->consoleOutput->writeln(sprintf('Execution locked for %s (%s)', $document, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE); + } - return $key; + return $isLocked; } public function allMessagesProcessed(string $indexName, int $attempt = 0): bool @@ -93,21 +103,12 @@ public function allMessagesProcessed(string $indexName, int $attempt = 0): bool return $currentCount === 0; } - $key = $this->getKey($indexName, 'switch-blue-green'); - $lock = $this->createLockFromKey($key, ttl: 0, autorelease: true); - if ($attempt > 2) { $actualMessageCount = $this->getActualMessageCount($indexName); $this->consoleOutput->writeln(sprintf('%s: %d attempts reached. Getting data from db. (%d => %d)', $indexName, $attempt, $currentCount, $actualMessageCount), ConsoleOutputInterface::VERBOSITY_VERBOSE); $currentCount = $actualMessageCount; } - if (!$lock->acquire()) { - $this->consoleOutput->writeln('Lock is still active. Not all messages processed.', ConsoleOutputInterface::VERBOSITY_VERBOSE); - - return false; - } - if ($currentCount > 0) { return false; } @@ -116,7 +117,6 @@ public function allMessagesProcessed(string $indexName, int $attempt = 0): bool return false; } - $lock->release(); // release the lock instantly as we just checked $cacheKey = self::LOCK_PREFIX . $indexName; $this->redis->del($cacheKey); // clean up the cache key. @@ -155,12 +155,15 @@ public function messageDispatched(string $getName): void private function getActualMessageCount(string $indexName): int { - $query = 'SELECT - COUNT(mm.id) AS remaining_messages - FROM messenger_messages mm - WHERE mm.queue_name = "elastica_bridge_populate" - AND mm.body LIKE CONCAT("%\\\\\"", "news_article_york", "\\\\\"%") AND mm.body LIKE "%CreateDocument%";'; - $count = $this->connection->executeQuery($query, ['indexName' => $indexName])->fetchOne(); + $query = "SELECT + COUNT(mm.id) AS remaining_messages + FROM messenger_messages mm + WHERE mm.queue_name = \"elastica_bridge_populate\" + AND mm.body LIKE CONCAT('%\\\\\\\\\"', :indexName, '\\\\\\\\\"%') + AND mm.delivered_at IS NULL + AND mm.body LIKE \"%CreateDocument%\""; + + $count = $this->connection->executeQuery($query, ['indexName' => $indexName, 'indexNameLength' => strlen($indexName)])->fetchOne(); return (int) $count; }