Skip to content

Commit

Permalink
MOT-3404: bugfix for filters, performance improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
rliebi committed Oct 3, 2024
1 parent 58f29a3 commit 66900af
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 48 deletions.
3 changes: 0 additions & 3 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ private function populateIndex(IndexInterface $indexConfig): array
{
self::$isPopulating = true;
$messagesDispatched = 0;
$blueGreenKey = $this->lockService->lockSwitchBlueGreen($indexConfig);
$this->lockService->initializeProcessCount($indexConfig->getName());

try {
Expand Down Expand Up @@ -318,8 +317,6 @@ private function populateIndex(IndexInterface $indexConfig): array
&& $key === array_key_last($data ?? [])
&& $documentKey === array_key_last($allowedDocuments)
) {
$message = new ReleaseIndexLock($indexConfig->getName(), $blueGreenKey);
$this->messageBus->dispatch($message);
$lastItem = true;
$cooldown = $this->async ? $this->configurationRepository->getCooldown() : 0;
}
Expand Down
42 changes: 25 additions & 17 deletions src/Messenger/Handler/CreateDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Exception\RecoverableMessageHandlingException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Valantic\ElasticaBridgeBundle\Messenger\Message\CreateDocument;
use Valantic\ElasticaBridgeBundle\Messenger\Message\ReleaseIndexLock;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
Expand All @@ -29,7 +27,6 @@ public function __construct(
private readonly IndexRepository $indexRepository,
private readonly ConfigurationRepository $configurationRepository,
private readonly LockService $lockService,
private readonly MessageBusInterface $messageBus,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly ConsoleOutputInterface $consoleOutput,
) {}
Expand All @@ -46,7 +43,13 @@ public function __invoke(CreateDocument $message): void
*/
private function handleMessage(CreateDocument $message): void
{
$messageDecreased = false;

try {
if ($this->lockService->isExecutionLocked($message->esIndex)) {
throw new RecoverableMessageHandlingException(sprintf('Execution locked (%s: %s)', $message->esIndex, $message->objectId));
}

if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) {

$count = $this->lockService->getCurrentCount($message->esIndex);
Expand All @@ -62,41 +65,46 @@ private function handleMessage(CreateDocument $message): void
);
}

if ($this->lockService->isExecutionLocked($message->esIndex)) {
return;
}

if ($message->callback->shouldCallEvent()) {
$this->eventDispatcher->dispatch($message->callback->getEvent(), $message->callback->getEventName());
}

if ($message->objectId === null || $message->objectType === null) {
$this->lockService->messageProcessed($message->esIndex);
$messageDecreased = true;

return;
}

$documentInstance = $this->documentRepository->get($message->document);
$dataObject = $message->objectType::getById($message->objectId) ?? throw new \RuntimeException('DataObject not found');
$esDocuments = [$this->documentHelper->elementToDocument($documentInstance, $dataObject)];
$esIndex = $this->indexRepository->flattenedGet($message->esIndex)->getBlueGreenInactiveElasticaIndex();

if (count($esDocuments) > 0) {
try {
try {
$esDocuments = [$this->documentHelper->elementToDocument($documentInstance, $dataObject)];
$esIndex = $this->indexRepository->flattenedGet($message->esIndex)->getBlueGreenInactiveElasticaIndex();

if (count($esDocuments) > 0) {
$esIndex->addDocuments($esDocuments);
} catch (\Throwable $throwable) {
if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
$key = $this->lockService->lockExecution($message->esIndex);
$this->messageBus->dispatch(new ReleaseIndexLock($message->esIndex, $key), [new DelayStamp(5000)]);
}
}
} catch (\Throwable $throwable) {
$this->consoleOutput->writeln(sprintf('Error processing message %s: %s', $message->esIndex, $throwable->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
$this->lockService->lockExecution($message->esIndex);

if ($this->configurationRepository->shouldPopulateAsync()) {
throw new UnrecoverableMessageHandlingException($throwable->getMessage(), previous: $throwable);
}
}

}
$messageDecreased = true;
$this->lockService->messageProcessed($message->esIndex);
} finally {
if (!$messageDecreased) {
$this->consoleOutput->writeln(sprintf('Message %s not processed. (ID: %s)', $message->esIndex, $message->objectId), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}

if ($message->lastItem && $message->cooldown > 0) {
$key = $this->lockService->getKey($message->esIndex, 'cooldown');
$this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire();
Expand Down
3 changes: 0 additions & 3 deletions src/Messenger/Handler/SwitchIndexHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public function __construct(
public function __invoke(ReleaseIndexLock $message): void
{
$releaseLock = true;
$startTime = microtime(true);
$maxAttempts = 5; // Set the maximum number of attempts
$attempt = 0;

Expand All @@ -45,9 +44,7 @@ public function __invoke(ReleaseIndexLock $message): void
}

// try to switch index. If not all messages are processed this will be rescheduled.
$key = $this->lockService->getKey($message->indexName, 'switch-blue-green');
$count = $this->lockService->getCurrentCount($message->indexName);
$this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);

while (!$this->lockService->allMessagesProcessed($message->indexName, $attempt) && $attempt < $maxAttempts) {
$seconds = 3 * $attempt;
Expand Down
53 changes: 28 additions & 25 deletions src/Service/LockService.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public function createLockFromKey(Key $key, ?int $ttl = null, ?bool $autorelease
public function lockExecution(string $document): Key
{
$key = $this->getKey($document, 'failure');
$this->consoleOutput->writeln(sprintf('Locking execution for %s (%s)', $document, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$lock = $this->lockFactory->createLockFromKey($key, ttl: $this->configurationRepository->getIndexingLockTimeout(), autoRelease: false);
$lock->acquire();

Expand All @@ -68,17 +69,26 @@ public function lockExecution(string $document): Key
public function isExecutionLocked(string $document): bool
{
$key = $this->getKey($document, 'failure');
$maxAttempts = 5;
$attempt = 0;
$isLocked = true;

while ($attempt < $maxAttempts && $isLocked) {
$lock = $this->lockFactory->createLockFromKey($key);
$isLocked = !$lock->acquireRead();
$lock->release();

if ($isLocked) {
usleep(300000); // Wait for 300 milliseconds before retrying
$attempt++;
}
}

return !$this->lockFactory->createLockFromKey($key)->acquire();
}

public function lockSwitchBlueGreen(IndexInterface $indexConfig): Key
{
$key = $this->getKey($indexConfig->getName(), 'switch-blue-green');
$lock = $this->lockFactory->createLockFromKey($key, ttl: 2 * $this->configurationRepository->getIndexingLockTimeout(), autoRelease: false);
$lock->acquire();
if ($isLocked) {
$this->consoleOutput->writeln(sprintf('Execution locked for %s (%s)', $document, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);
}

return $key;
return $isLocked;
}

public function allMessagesProcessed(string $indexName, int $attempt = 0): bool
Expand All @@ -93,21 +103,12 @@ public function allMessagesProcessed(string $indexName, int $attempt = 0): bool
return $currentCount === 0;
}

$key = $this->getKey($indexName, 'switch-blue-green');
$lock = $this->createLockFromKey($key, ttl: 0, autorelease: true);

if ($attempt > 2) {
$actualMessageCount = $this->getActualMessageCount($indexName);
$this->consoleOutput->writeln(sprintf('%s: %d attempts reached. Getting data from db. (%d => %d)', $indexName, $attempt, $currentCount, $actualMessageCount), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$currentCount = $actualMessageCount;
}

if (!$lock->acquire()) {
$this->consoleOutput->writeln('Lock is still active. Not all messages processed.', ConsoleOutputInterface::VERBOSITY_VERBOSE);

return false;
}

if ($currentCount > 0) {
return false;
}
Expand All @@ -116,7 +117,6 @@ public function allMessagesProcessed(string $indexName, int $attempt = 0): bool
return false;
}

$lock->release(); // release the lock instantly as we just checked
$cacheKey = self::LOCK_PREFIX . $indexName;
$this->redis->del($cacheKey); // clean up the cache key.

Expand Down Expand Up @@ -155,12 +155,15 @@ public function messageDispatched(string $getName): void

private function getActualMessageCount(string $indexName): int
{
$query = 'SELECT
COUNT(mm.id) AS remaining_messages
FROM messenger_messages mm
WHERE mm.queue_name = "elastica_bridge_populate"
AND mm.body LIKE CONCAT("%\\\\\"", "news_article_york", "\\\\\"%") AND mm.body LIKE "%CreateDocument%";';
$count = $this->connection->executeQuery($query, ['indexName' => $indexName])->fetchOne();
$query = "SELECT
COUNT(mm.id) AS remaining_messages
FROM messenger_messages mm
WHERE mm.queue_name = \"elastica_bridge_populate\"
AND mm.body LIKE CONCAT('%\\\\\\\\\"', :indexName, '\\\\\\\\\"%')
AND mm.delivered_at IS NULL
AND mm.body LIKE \"%CreateDocument%\"";

$count = $this->connection->executeQuery($query, ['indexName' => $indexName, 'indexNameLength' => strlen($indexName)])->fetchOne();

return (int) $count;
}
Expand Down

0 comments on commit 66900af

Please sign in to comment.