Skip to content

Commit

Permalink
retry inside switch message for some amount of time before rescheduling.
Browse files Browse the repository at this point in the history
  • Loading branch information
rliebi committed Sep 27, 2024
1 parent ac6a958 commit 79939a4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 26 deletions.
45 changes: 27 additions & 18 deletions src/Document/DataObjectNormalizerTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Document;

use Pimcore\Cache;
use Doctrine\DBAL\Connection;
use Pimcore\Localization\LocaleService;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\DataObject\Concrete;
Expand All @@ -20,13 +20,20 @@
trait DataObjectNormalizerTrait
{
protected LocaleService $localeService;
private Connection $connection;

#[Required]
public function setLocaleService(LocaleService $localeService): void
{
$this->localeService = $localeService;
}

#[Required]
public function setDatabaseConnection(Connection $connection): void
{
$this->connection = $connection;
}

/**
* Given $element, collect data from the localized fields $fields (optionally using fallback values)
* and return a normalized array.
Expand Down Expand Up @@ -181,7 +188,6 @@ protected function children(
* Returns a normalized array of IDs of all (recursive) children of $element, optionally limited by $objectTypes.
*
* @param string[] $objectTypes
* @param int[] $carry
*
* @see \App\Elasticsearch\Index\Product\Document\ProductIndexDocument::getNormalized for a usage example
*
Expand All @@ -190,26 +196,29 @@ protected function children(
protected function childrenRecursive(
Concrete $element,
array $objectTypes = [AbstractObject::OBJECT_TYPE_OBJECT, AbstractObject::OBJECT_TYPE_FOLDER],
array $carry = [],
): array {
$cache = Cache::load('elastica-bridge-children-recursive-' . $element->getId());

if (is_array($cache)) {
return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => $cache];
$placeholders = implode(',', array_fill(0, count($objectTypes), '?'));

$query = 'WITH RECURSIVE CategoryHierarchy AS (
SELECT id, parentId, published
FROM objects WHERE id = ? AND type in (' . $placeholders . ') AND published = 1
UNION ALL
SELECT c.id, c.parentId, c.published
FROM objects c
INNER JOIN CategoryHierarchy ch ON ch.id = c.parentId
)
SELECT DISTINCT id
FROM CategoryHierarchy where published = 1;';
$statement = $this->connection->prepare($query);
$statement->bindValue(1, $element->getId(), \PDO::PARAM_INT);

foreach ($objectTypes as $index => $type) {
$statement->bindValue($index + 2, $type, \PDO::PARAM_STR);
}

foreach ($element->getChildren($objectTypes) as $child) {
/** @var Concrete $child */
$carry[] = $child->getId();
$carry = array_values(array_filter($carry));
$carry = $this->childrenRecursive($child, $objectTypes, $carry)[DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE];
}
$values = array_values(array_filter($carry));

Cache::save($values, 'elastica-bridge-children-recursive-' . $element->getId(), lifetime: 3600, force: true);

return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => array_values(array_filter($values))];
$result = $statement->executeQuery();

return [DocumentInterface::ATTRIBUTE_CHILDREN_RECURSIVE => array_map('intval', array_keys($result->fetchAllAssociativeIndexed()))];
}

/**
Expand Down
29 changes: 23 additions & 6 deletions src/Messenger/Handler/CreateDocumentHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function __construct(
private readonly ConsoleOutputInterface $consoleOutput,
) {}

public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): int
public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): ?int
{
return $this->handle($message, $ack);
}
Expand All @@ -53,16 +53,33 @@ public function __invoke(CreateDocument $message, ?Acknowledger $ack = null): in
*/
protected function process(array $jobs): void
{
$this->consoleOutput->writeln('Processing new batch', ConsoleOutputInterface::VERBOSITY_NORMAL);
$newBatch = true;

foreach ($jobs as [$message, $ack]) {
if ($message instanceof CreateDocument) {
if ($this->consoleOutput->getVerbosity() > ConsoleOutputInterface::VERBOSITY_NORMAL) {
$count = $this->lockService->getCurrentCount($message->esIndex);
$this->consoleOutput->writeln(sprintf('Processing message of %s %s. ~ %s left.', $message->esIndex, $message->objectId, $count), ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->consoleOutput->writeln(
sprintf(
'%sProcessing message of %s %s. ~ %s left. (PID: %s)',
$newBatch ? \PHP_EOL : '',
$message->esIndex,
$message->objectId,
$count,
getmypid(),
),
ConsoleOutputInterface::VERBOSITY_VERBOSE
);
$newBatch = false;
}

try {
$this->handleMessage($message);
$ack->ack();
} catch (\Throwable $e) {
$this->consoleOutput->writeln(sprintf('Error processing message of %s %s (%s)', $message->esIndex, $message->objectId, $e->getMessage()), ConsoleOutputInterface::VERBOSITY_NORMAL);
$ack->nack($e);
}
$this->handleMessage($message);
$ack->ack();

continue;
}
Expand Down Expand Up @@ -110,13 +127,13 @@ private function handleMessage(CreateDocument $message): void
}
}
}
$this->lockService->messageProcessed($message->esIndex);
} finally {
if ($message->lastItem && $message->cooldown > 0) {
$key = $this->lockService->getKey($message->esIndex, 'cooldown');
$this->lockService->createLockFromKey($key, ttl: $message->cooldown)->acquire();
}

$this->lockService->messageProcessed($message->esIndex);

\Pimcore::collectGarbage();
}
Expand Down
13 changes: 11 additions & 2 deletions src/Messenger/Handler/SwitchIndexHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public function __construct(
public function __invoke(ReleaseIndexLock $message): void
{
$releaseLock = true;
$startTime = microtime(true);
$maxAttempts = 5; // Set the maximum number of attempts
$attempt = 0;

try {
if ($message->switchIndex === false || $this->lockService->isExecutionLocked($message->indexName)) {
Expand All @@ -46,8 +49,14 @@ public function __invoke(ReleaseIndexLock $message): void
$count = $this->lockService->getCurrentCount($message->indexName);
$this->consoleOutput->writeln(sprintf('waiting for lock release (%s) for %s (%s)', $count, $message->indexName, hash('sha256', (string) $key)), ConsoleOutputInterface::VERBOSITY_VERBOSE);

if (!$this->lockService->allMessagesProcessed($message->indexName)) {
$this->consoleOutput->writeln(sprintf('not all messages processed (~%s remaining), rescheduling', $count), ConsoleOutputInterface::VERBOSITY_VERBOSE);
while (!$this->lockService->allMessagesProcessed($message->indexName) && $attempt < $maxAttempts) {
$this->consoleOutput->writeln(sprintf('not all messages processed (~%s remaining), attempt %d', $count, $attempt + 1), ConsoleOutputInterface::VERBOSITY_VERBOSE);
sleep($count * $attempt + 1);
$attempt++;
}

if ($attempt >= $maxAttempts) {
$this->consoleOutput->writeln('Max attempts reached, rescheduling', ConsoleOutputInterface::VERBOSITY_VERBOSE);
$this->messageBus->dispatch($message->clone(), [new DelayStamp($count * 1000 * 2)]);
$releaseLock = false;

Expand Down

0 comments on commit 79939a4

Please sign in to comment.