diff --git a/README.md b/README.md index e3832bb..7d92ad4 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ See the [`ProductIndex` provided in the example](docs/example/src/Elasticsearch/ - `relationAttributes` allow to store just a reference, i.e. the ID of a Pimcore element, instead of the entire object in the index. - The mapping can either be an array defined without keys, in which case the Pimcore element's property will be indexed using the same name or a key-value pair if the property should be named differently in the index. **If a key-value pair is used, it is also possible to pass a function retrieving the Pimcore element and returning an arbitrary array.** This is very powerful and allows to implement almost any use case. Mind that it is also possible to mix both approaches, i.e. define some entries with a key and others without one. -See the [`ProductIndexDocument` provided in the example](docs/example/src/Elasticsearch/Index/Product/Document/ProductIndexDocument.php) for more details. +See the [`ProductIndexDocument` provided in the example](docs/example/src/Elasticsearch/Index/Product/Document/ProductIndexDocument.php) for more details. Alternatively you can route the transport to use the `sync` handler: `framework.messenger.transports.elastica_bridge_index: 'sync'` ## Configuration @@ -68,6 +68,10 @@ valantic_elastica_bridge: should_skip_failing_documents: false ``` +## Queue + +[Set up a worker](https://symfony.com/doc/current/messenger.html#consuming-messages-running-the-worker) to process `elastica_bridge_index`. + ## Indexing ### Bulk diff --git a/UPGRADE.md b/UPGRADE.md index 311c6ba..46d3091 100644 --- a/UPGRADE.md +++ b/UPGRADE.md @@ -4,6 +4,7 @@ - Remove deprecated options `valantic_elastica_bridge.client.host` and `valantic_elastica_bridge.client.port`. Use `valantic_elastica_bridge.client.dsn` instead, e.g. `http://localhost:9200` - Renamed `valantic_elastica_bridge.client.addSentryBreadcrumbs` to `valantic_elastica_bridge.client.should_add_sentry_breadcrumbs` +- See [README#Queue](./README#queue) to set up the **required** Symfony Messenger workers. ## Upgrade from v2 to v3 diff --git a/src/Command/Cleanup.php b/src/Command/Cleanup.php index 4486739..0dca720 100644 --- a/src/Command/Cleanup.php +++ b/src/Command/Cleanup.php @@ -81,7 +81,7 @@ private function getIndices(): array $indices = []; - foreach ($this->indexRepository->flattened() as $indexConfig) { + foreach ($this->indexRepository->flattenedAll() as $indexConfig) { if ($indexConfig->usesBlueGreenIndices()) { $indices[] = $indexConfig->getBlueGreenActiveElasticaIndex()->getName(); $indices[] = $indexConfig->getBlueGreenInactiveElasticaIndex()->getName(); diff --git a/src/Command/Index.php b/src/Command/Index.php index f85208a..e45c7a7 100644 --- a/src/Command/Index.php +++ b/src/Command/Index.php @@ -11,15 +11,13 @@ use Symfony\Component\Console\Output\ConsoleOutput; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\HttpKernel\KernelInterface; -use Symfony\Component\Lock\LockFactory; -use Symfony\Component\Lock\LockInterface; use Symfony\Component\Process\Process; use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient; use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix; use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException; use Valantic\ElasticaBridgeBundle\Index\IndexInterface; -use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository; use Valantic\ElasticaBridgeBundle\Repository\IndexRepository; +use Valantic\ElasticaBridgeBundle\Service\LockService; use Valantic\ElasticaBridgeBundle\Util\ElasticsearchResponse; class Index extends BaseCommand @@ -34,8 +32,7 @@ public function __construct( private readonly IndexRepository $indexRepository, private readonly ElasticsearchClient $esClient, private readonly KernelInterface $kernel, - private readonly LockFactory $lockFactory, - private readonly ConfigurationRepository $configurationRepository, + private readonly LockService $lockService, ) { parent::__construct(); } @@ -73,7 +70,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int { $skippedIndices = []; - foreach ($this->indexRepository->flattened() as $indexConfig) { + foreach ($this->indexRepository->flattenedAll() as $indexConfig) { if ( is_array($this->input->getArgument(self::ARGUMENT_INDEX)) && count($this->input->getArgument(self::ARGUMENT_INDEX)) > 0 @@ -84,7 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int continue; } - $lock = $this->getLock($indexConfig); + $lock = $this->lockService->getIndexingLock($indexConfig); if (!$lock->acquire()) { if ($this->input->getOption(self::OPTION_LOCK_RELEASE) === true) { @@ -260,13 +257,4 @@ private function ensureCorrectBlueGreenIndexSetup(IndexInterface $indexConfig): $this->output->writeln('-> Ensured indices are correctly set up with alias'); } - - private function getLock(mixed $indexConfig): LockInterface - { - return $this->lockFactory - ->createLock( - __METHOD__ . '->' . $indexConfig->getName(), - ttl: $this->configurationRepository->getIndexingLockTimeout() - ); - } } diff --git a/src/Command/PopulateIndex.php b/src/Command/PopulateIndex.php index 83aa02c..a4fb597 100644 --- a/src/Command/PopulateIndex.php +++ b/src/Command/PopulateIndex.php @@ -58,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int private function getIndex(): ?IndexInterface { - foreach ($this->indexRepository->flattened() as $indexConfig) { + foreach ($this->indexRepository->flattenedAll() as $indexConfig) { if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) { return $indexConfig; } diff --git a/src/Command/Status.php b/src/Command/Status.php index af616fb..5fb2418 100644 --- a/src/Command/Status.php +++ b/src/Command/Status.php @@ -55,7 +55,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $this->output->writeln(''); - foreach ($this->indexRepository->flattened() as $indexConfig) { + foreach ($this->indexRepository->flattenedAll() as $indexConfig) { $this->processBundleIndex($indexConfig); } diff --git a/src/DependencyInjection/ValanticElasticaBridgeExtension.php b/src/DependencyInjection/ValanticElasticaBridgeExtension.php index 3e02a4e..655a846 100644 --- a/src/DependencyInjection/ValanticElasticaBridgeExtension.php +++ b/src/DependencyInjection/ValanticElasticaBridgeExtension.php @@ -37,7 +37,7 @@ public function load(array $configs, ContainerBuilder $container): void $config = $this->processConfiguration($configuration, $configs); $loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config')); - $loader->load('services.yml'); + $loader->load('services.yaml'); $container->setParameter('valantic_elastica_bridge', $config); } diff --git a/src/EventListener/Pimcore/ChangeListener.php b/src/EventListener/Pimcore/ChangeListener.php index 8ff61fa..0489a26 100644 --- a/src/EventListener/Pimcore/ChangeListener.php +++ b/src/EventListener/Pimcore/ChangeListener.php @@ -15,8 +15,9 @@ use Pimcore\Model\Document; use Pimcore\Model\Element\AbstractElement; use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\MessageBusInterface; use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException; -use Valantic\ElasticaBridgeBundle\Service\PropagateChanges; +use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement; /** * An abstract listener for DataObject and Document listeners. @@ -28,7 +29,7 @@ class ChangeListener implements EventSubscriberInterface private static bool $isEnabled = true; public function __construct( - private readonly PropagateChanges $propagateChanges, + private readonly MessageBusInterface $messageBus, ) {} public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void @@ -45,7 +46,7 @@ public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void return; } - $this->propagateChanges->handle($this->getFreshElement($element)); + $this->messageBus->dispatch(new RefreshElement($this->getFreshElement($element))); } public static function enableListener(): void diff --git a/src/Messenger/Handler/AbstractRefreshHandler.php b/src/Messenger/Handler/AbstractRefreshHandler.php new file mode 100644 index 0000000..471ba44 --- /dev/null +++ b/src/Messenger/Handler/AbstractRefreshHandler.php @@ -0,0 +1,45 @@ + $className */ + $className = $message->className; + + try { + $element = $className::getById($message->id); + } catch (\Throwable) { + throw new PimcoreElementNotFoundException($message->id, $message->className); + } + + if (!$element instanceof AbstractElement) { + // The element in question was deleted so we need a skeleton. + /** @var TModel $element */ + $element = new ($className)(); + $element->setId($message->id); + + if ($element instanceof Concrete) { + $element->setPublished(false); + } + } + + if ($element === null) { + throw new PimcoreElementNotFoundException($message->id, $message->className); + } + + return $element; + } +} diff --git a/src/Messenger/Handler/RefreshElementHandler.php b/src/Messenger/Handler/RefreshElementHandler.php new file mode 100644 index 0000000..db251c7 --- /dev/null +++ b/src/Messenger/Handler/RefreshElementHandler.php @@ -0,0 +1,28 @@ + + */ +class RefreshElementHandler extends AbstractRefreshHandler +{ + public function __construct( + private readonly PropagateChanges $propagateChanges, + ) {} + + public function __invoke(RefreshElement $message): void + { + $element = $this->resolveElement($message); + + $this->propagateChanges->handle($element); + } +} diff --git a/src/Messenger/Handler/RefreshElementInIndexHandler.php b/src/Messenger/Handler/RefreshElementInIndexHandler.php new file mode 100644 index 0000000..1e715fd --- /dev/null +++ b/src/Messenger/Handler/RefreshElementInIndexHandler.php @@ -0,0 +1,37 @@ + + */ +class RefreshElementInIndexHandler extends AbstractRefreshHandler +{ + public function __construct( + private readonly PropagateChanges $propagateChanges, + private readonly LockService $lockService, + private readonly IndexRepository $indexRepository, + ) {} + + public function __invoke(RefreshElementInIndex $message): void + { + $index = $this->indexRepository->flattenedGet($message->index); + $element = $this->resolveElement($message); + + if ($index->usesBlueGreenIndices() && !$this->lockService->getIndexingLock($index)->acquire()) { + $this->propagateChanges->handleIndex($element, $index, $index->getBlueGreenInactiveElasticaIndex()); + } + + $this->propagateChanges->handleIndex($element, $index); + } +} diff --git a/src/Messenger/Message/AbstractRefresh.php b/src/Messenger/Message/AbstractRefresh.php new file mode 100644 index 0000000..1c9d33e --- /dev/null +++ b/src/Messenger/Message/AbstractRefresh.php @@ -0,0 +1,20 @@ + */ + public string $className; + public int $id; + + protected function setElement(ElementInterface $element): void + { + $this->className = $element::class; + $this->id = $element->getId() ?? throw new \InvalidArgumentException('Pimcore ID is null.'); + } +} diff --git a/src/Messenger/Message/RefreshElement.php b/src/Messenger/Message/RefreshElement.php new file mode 100644 index 0000000..b4785d0 --- /dev/null +++ b/src/Messenger/Message/RefreshElement.php @@ -0,0 +1,15 @@ +setElement($element); + } +} diff --git a/src/Messenger/Message/RefreshElementInIndex.php b/src/Messenger/Message/RefreshElementInIndex.php new file mode 100644 index 0000000..ebdcdd0 --- /dev/null +++ b/src/Messenger/Message/RefreshElementInIndex.php @@ -0,0 +1,17 @@ +setElement($element); + } +} diff --git a/src/Repository/IndexRepository.php b/src/Repository/IndexRepository.php index b0bf4c6..5ad406f 100644 --- a/src/Repository/IndexRepository.php +++ b/src/Repository/IndexRepository.php @@ -4,6 +4,7 @@ namespace Valantic\ElasticaBridgeBundle\Repository; +use Valantic\ElasticaBridgeBundle\Exception\Repository\ItemNotFoundInRepositoryException; use Valantic\ElasticaBridgeBundle\Index\IndexInterface; use Valantic\ElasticaBridgeBundle\Index\TenantAwareInterface; @@ -17,7 +18,7 @@ class IndexRepository extends AbstractRepository /** * @return \Generator */ - public function flattened(): \Generator + public function flattenedAll(): \Generator { foreach ($this->all() as $indexConfig) { if ($indexConfig instanceof TenantAwareInterface) { @@ -32,4 +33,15 @@ public function flattened(): \Generator } } } + + public function flattenedGet(string $key): IndexInterface + { + foreach ($this->flattenedAll() as $candidateKey => $index) { + if ($candidateKey === $key) { + return $index; + } + } + + throw new ItemNotFoundInRepositoryException($key); + } } diff --git a/src/Resources/config/pimcore/config.yaml b/src/Resources/config/pimcore/config.yaml new file mode 100644 index 0000000..53b51b1 --- /dev/null +++ b/src/Resources/config/pimcore/config.yaml @@ -0,0 +1,2 @@ +imports: + - { resource: messenger.yaml } diff --git a/src/Resources/config/pimcore/messenger.yaml b/src/Resources/config/pimcore/messenger.yaml new file mode 100644 index 0000000..66393aa --- /dev/null +++ b/src/Resources/config/pimcore/messenger.yaml @@ -0,0 +1,8 @@ +framework: + messenger: + enabled: true + transports: + elastica_bridge_index: 'doctrine://default?queue_name=elastica_bridge_index' + routing: + Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement: elastica_bridge_index + Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex: elastica_bridge_index diff --git a/src/Resources/config/services.yml b/src/Resources/config/services.yaml similarity index 86% rename from src/Resources/config/services.yml rename to src/Resources/config/services.yaml index acdf8b6..0de954b 100644 --- a/src/Resources/config/services.yml +++ b/src/Resources/config/services.yaml @@ -28,3 +28,8 @@ services: Valantic\ElasticaBridgeBundle\Repository\DocumentRepository: arguments: - !tagged_iterator valantic.elastica_bridge.document + + Valantic\ElasticaBridgeBundle\Messenger\Handler\: + resource: '../../Messenger/Handler/*' + tags: + - { name: messenger.message_handler } diff --git a/src/Service/LockService.php b/src/Service/LockService.php new file mode 100644 index 0000000..425832b --- /dev/null +++ b/src/Service/LockService.php @@ -0,0 +1,29 @@ +lockFactory + ->createLock( + sprintf('%s:indexing:%s', self::LOCK_PREFIX, $indexConfig->getName()), + ttl: $this->configurationRepository->getIndexingLockTimeout() + ); + } +} diff --git a/src/Service/PropagateChanges.php b/src/Service/PropagateChanges.php index 24b97ae..ec41334 100644 --- a/src/Service/PropagateChanges.php +++ b/src/Service/PropagateChanges.php @@ -5,10 +5,13 @@ namespace Valantic\ElasticaBridgeBundle\Service; use Elastica\Exception\NotFoundException; +use Elastica\Index; use Pimcore\Model\DataObject\AbstractObject; use Pimcore\Model\Element\AbstractElement; +use Symfony\Component\Messenger\MessageBusInterface; use Valantic\ElasticaBridgeBundle\Document\DocumentInterface; use Valantic\ElasticaBridgeBundle\Index\IndexInterface; +use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex; use Valantic\ElasticaBridgeBundle\Repository\IndexRepository; class PropagateChanges @@ -16,6 +19,7 @@ class PropagateChanges public function __construct( private readonly IndexRepository $indexRepository, private readonly DocumentHelper $documentHelper, + private readonly MessageBusInterface $messageBus, ) {} /** @@ -26,15 +30,26 @@ public function __construct( */ public function handle(AbstractElement $element): void { - $indices = $this->matchingIndicesForElement($this->indexRepository->flattened(), $element); + $indices = $this->matchingIndicesForElement($this->indexRepository->flattenedAll(), $element); foreach ($indices as $index) { - $this->handleIndex($element, $index); + $this->messageBus->dispatch(new RefreshElementInIndex($element, $index->getName())); } } - private function handleIndex(AbstractElement $element, IndexInterface $index): void - { + public function handleIndex( + AbstractElement $element, + IndexInterface $index, + ?Index $elasticaIndex = null, + ): void { + $this->doHandleIndex($element, $index, $elasticaIndex ?? $index->getElasticaIndex()); + } + + public function doHandleIndex( + AbstractElement $element, + IndexInterface $index, + Index $elasticaIndex, + ): void { $document = $index->findDocumentInstanceByPimcore($element); if (!$document instanceof DocumentInterface) {