Skip to content

Commit

Permalink
WIP: use Symfony Messenger to handle event-triggered updates and prev…
Browse files Browse the repository at this point in the history
…ent overlaps with indexing command

resolve #51
close #39
  • Loading branch information
limenet committed Jan 23, 2024
1 parent e89e264 commit 362c651
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 22 deletions.
18 changes: 3 additions & 15 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Util\ElasticsearchResponse;

class Index extends BaseCommand
Expand All @@ -34,8 +32,7 @@ public function __construct(
private readonly IndexRepository $indexRepository,
private readonly ElasticsearchClient $esClient,
private readonly KernelInterface $kernel,
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
private readonly LockService $lockService,
) {
parent::__construct();
}
Expand Down Expand Up @@ -84,7 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
continue;
}

$lock = $this->getLock($indexConfig);
$lock = $this->lockService->getLock($indexConfig);

if (!$lock->acquire()) {
if ($this->input->getOption(self::OPTION_LOCK_RELEASE) === true) {
Expand Down Expand Up @@ -260,13 +257,4 @@ private function ensureCorrectBlueGreenIndexSetup(IndexInterface $indexConfig):

$this->output->writeln('<comment>-> Ensured indices are correctly set up with alias</comment>');
}

private function getLock(mixed $indexConfig): LockInterface
{
return $this->lockFactory
->createLock(
__METHOD__ . '->' . $indexConfig->getName(),
ttl: $this->configurationRepository->getIndexingLockTimeout()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function load(array $configs, ContainerBuilder $container): void
$config = $this->processConfiguration($configuration, $configs);

$loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('services.yml');
$loader->load('services.yaml');

$container->setParameter('valantic_elastica_bridge', $config);
}
Expand Down
7 changes: 4 additions & 3 deletions src/EventListener/Pimcore/ChangeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use Pimcore\Model\Document;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;

/**
* An abstract listener for DataObject and Document listeners.
Expand All @@ -28,7 +29,7 @@ class ChangeListener implements EventSubscriberInterface
private static bool $isEnabled = true;

public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly MessageBusInterface $messageBus,
) {}

public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
Expand All @@ -45,7 +46,7 @@ public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
return;
}

$this->propagateChanges->handle($this->getFreshElement($element));
$this->messageBus->dispatch(new RefreshElement($this->getFreshElement($element)));
}

public static function enableListener(): void
Expand Down
45 changes: 45 additions & 0 deletions src/Messenger/Handler/AbstractRefreshHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\DataObject\Concrete;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Messenger\Message\AbstractRefresh;

/** @template TModel of AbstractElement */
#[AsMessageHandler]
abstract class AbstractRefreshHandler
{
protected function resolveElement(AbstractRefresh $message): AbstractElement
{
/** @var class-string<TModel> $className */
$className = $message->className;

try {
$element = $className::getById($message->id);
} catch (\Throwable) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

if (!$element instanceof AbstractElement) {
// The element in question was deleted so we need a skeleton.
/** @var TModel $element */
$element = new ($className)();
$element->setId($message->id);

if ($element instanceof Concrete) {
$element->setPublished(false);
}
}

if ($element === null) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

return $element;
}
}
28 changes: 28 additions & 0 deletions src/Messenger/Handler/RefreshElementHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
) {}

public function __invoke(RefreshElement $message): void
{
$element = $this->resolveElement($message);

$this->propagateChanges->handle($element);
}
}
41 changes: 41 additions & 0 deletions src/Messenger/Handler/RefreshElementInIndexHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementInIndexHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly LockService $lockService,
private readonly IndexRepository $indexRepository,
) {}

public function __invoke(RefreshElementInIndex $message): void
{
$index = $this->indexRepository->getFlattened($message->index);
$element = $this->resolveElement($message);

if ($index->usesBlueGreenIndices()) {
$lock = $this->lockService->getLock($index);

if (!$lock->acquire()) {
$this->propagateChanges->handleIndex($element, $index, $index->getBlueGreenInactiveElasticaIndex());
}
}

$this->propagateChanges->handleIndex($element, $index);
}
}
20 changes: 20 additions & 0 deletions src/Messenger/Message/AbstractRefresh.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

abstract class AbstractRefresh
{
/** @var class-string<ElementInterface> */
public string $className;
public int $id;

protected function setElement(ElementInterface $element): void
{
$this->className = $element::class;
$this->id = $element->getId() ?? throw new \InvalidArgumentException('Pimcore ID is null.');
}
}
15 changes: 15 additions & 0 deletions src/Messenger/Message/RefreshElement.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElement extends AbstractRefresh
{
public function __construct(ElementInterface $element)
{
$this->setElement($element);
}
}
17 changes: 17 additions & 0 deletions src/Messenger/Message/RefreshElementInIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElementInIndex extends AbstractRefresh
{
public function __construct(
ElementInterface $element,
public readonly string $index,
) {
$this->setElement($element);
}
}
12 changes: 12 additions & 0 deletions src/Repository/IndexRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Repository;

use Valantic\ElasticaBridgeBundle\Exception\Repository\ItemNotFoundInRepositoryException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Index\TenantAwareInterface;

Expand Down Expand Up @@ -32,4 +33,15 @@ public function flattened(): \Generator
}
}
}

public function getFlattened(string $key): IndexInterface
{
foreach ($this->flattened() as $candidateKey => $index) {
if ($candidateKey === $key) {
return $index;
}
}

throw new ItemNotFoundInRepositoryException($key);
}
}
2 changes: 2 additions & 0 deletions src/Resources/config/pimcore/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
imports:
- { resource: messenger.yaml }
8 changes: 8 additions & 0 deletions src/Resources/config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
framework:
messenger:
enabled: true
transports:
elastica_bridge_index: 'doctrine://default?queue_name=elastica_bridge_index'
routing:
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement: elastica_bridge_index
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex: elastica_bridge_index
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ services:
Valantic\ElasticaBridgeBundle\Repository\DocumentRepository:
arguments:
- !tagged_iterator valantic.elastica_bridge.document

Valantic\ElasticaBridgeBundle\Messenger\Handler\:
resource: '../../Messenger/Handler/*'
tags:
- { name: messenger.message_handler }
29 changes: 29 additions & 0 deletions src/Service/LockService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Service;

use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;

class LockService
{
private const LOCK_PREFIX = 'pimcore-elastica-bridge';

public function __construct(
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
) {}

public function getLock(IndexInterface $indexConfig): LockInterface
{
return $this->lockFactory
->createLock(
sprintf('%s:indexing:%s', self::LOCK_PREFIX, $indexConfig->getName()),
ttl: $this->configurationRepository->getIndexingLockTimeout()
);
}
}
21 changes: 18 additions & 3 deletions src/Service/PropagateChanges.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
namespace Valantic\ElasticaBridgeBundle\Service;

use Elastica\Exception\NotFoundException;
use Elastica\Index;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Messenger\MessageBusInterface;
use Valantic\ElasticaBridgeBundle\Document\DocumentInterface;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;

class PropagateChanges
{
public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentHelper $documentHelper,
private readonly MessageBusInterface $messageBus,
) {}

/**
Expand All @@ -29,12 +33,23 @@ public function handle(AbstractElement $element): void
$indices = $this->matchingIndicesForElement($this->indexRepository->flattened(), $element);

foreach ($indices as $index) {
$this->handleIndex($element, $index);
$this->messageBus->dispatch(new RefreshElementInIndex($element, $index->getName()));
}
}

private function handleIndex(AbstractElement $element, IndexInterface $index): void
{
public function handleIndex(
AbstractElement $element,
IndexInterface $index,
?Index $elasticaIndex = null,
): void {
$this->doHandleIndex($element, $index, $elasticaIndex ?? $index->getElasticaIndex());
}

public function doHandleIndex(
AbstractElement $element,
IndexInterface $index,
Index $elasticaIndex,
): void {
$document = $index->findDocumentInstanceByPimcore($element);

if (!$document instanceof DocumentInterface) {
Expand Down

0 comments on commit 362c651

Please sign in to comment.