Skip to content

Commit

Permalink
use Symfony Messenger to handle event-triggered updates and prevent o…
Browse files Browse the repository at this point in the history
…verlaps with indexing command

resolve #51
close #39
  • Loading branch information
limenet committed Feb 14, 2024
1 parent 9d85e49 commit 892965c
Show file tree
Hide file tree
Showing 19 changed files with 269 additions and 39 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ valantic_elastica_bridge:
should_skip_failing_documents: false
```

## Queue

[Set up a worker](https://symfony.com/doc/current/messenger.html#consuming-messages-running-the-worker) to process `elastica_bridge_index`. Alternatively you can route the transport to use the `sync` handler: `framework.messenger.transports.elastica_bridge_index: 'sync'`.

## Indexing

### Bulk
Expand Down Expand Up @@ -96,6 +100,8 @@ The bridge automatically listens to Pimcore events and updates documents as need
This can be globally disabled by calling `\Valantic\ElasticaBridgeBundle\EventListener\Pimcore\ChangeListener::disableListener();`.
You can also dispatch a `Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement` message to handle updates to related objects which are not triggered by the `ChangeListener`.
## Status
```
Expand Down
1 change: 1 addition & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Remove deprecated options `valantic_elastica_bridge.client.host` and `valantic_elastica_bridge.client.port`. Use `valantic_elastica_bridge.client.dsn` instead, e.g. `http://localhost:9200`
- Renamed `valantic_elastica_bridge.client.addSentryBreadcrumbs` to `valantic_elastica_bridge.client.should_add_sentry_breadcrumbs`
- See [README#Queue](./README#queue) to set up the **required** Symfony Messenger workers.

## Upgrade from v2 to v3

Expand Down
2 changes: 1 addition & 1 deletion src/Command/Cleanup.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private function getIndices(): array

$indices = [];

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->usesBlueGreenIndices()) {
$indices[] = $indexConfig->getBlueGreenActiveElasticaIndex()->getName();
$indices[] = $indexConfig->getBlueGreenInactiveElasticaIndex()->getName();
Expand Down
20 changes: 4 additions & 16 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Util\ElasticsearchResponse;

class Index extends BaseCommand
Expand All @@ -34,8 +32,7 @@ public function __construct(
private readonly IndexRepository $indexRepository,
private readonly ElasticsearchClient $esClient,
private readonly KernelInterface $kernel,
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
private readonly LockService $lockService,
) {
parent::__construct();
}
Expand Down Expand Up @@ -73,7 +70,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$skippedIndices = [];

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if (
is_array($this->input->getArgument(self::ARGUMENT_INDEX))
&& count($this->input->getArgument(self::ARGUMENT_INDEX)) > 0
Expand All @@ -84,7 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
continue;
}

$lock = $this->getLock($indexConfig);
$lock = $this->lockService->getIndexingLock($indexConfig);

if (!$lock->acquire()) {
if ($this->input->getOption(self::OPTION_LOCK_RELEASE) === true) {
Expand Down Expand Up @@ -260,13 +257,4 @@ private function ensureCorrectBlueGreenIndexSetup(IndexInterface $indexConfig):

$this->output->writeln('<comment>-> Ensured indices are correctly set up with alias</comment>');
}

private function getLock(mixed $indexConfig): LockInterface
{
return $this->lockFactory
->createLock(
__METHOD__ . '->' . $indexConfig->getName(),
ttl: $this->configurationRepository->getIndexingLockTimeout()
);
}
}
2 changes: 1 addition & 1 deletion src/Command/PopulateIndex.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) {
return $indexConfig;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Command/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$this->output->writeln('');

foreach ($this->indexRepository->flattened() as $indexConfig) {
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
$this->processBundleIndex($indexConfig);
}

Expand Down
7 changes: 4 additions & 3 deletions src/EventListener/Pimcore/ChangeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use Pimcore\Model\Document;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;

/**
* An abstract listener for DataObject and Document listeners.
Expand All @@ -28,7 +29,7 @@ class ChangeListener implements EventSubscriberInterface
private static bool $isEnabled = true;

public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly MessageBusInterface $messageBus,
) {}

public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
Expand All @@ -45,7 +46,7 @@ public function handle(AssetEvent|DataObjectEvent|DocumentEvent $event): void
return;
}

$this->propagateChanges->handle($this->getFreshElement($element));
$this->messageBus->dispatch(new RefreshElement($this->getFreshElement($element)));
}

public static function enableListener(): void
Expand Down
45 changes: 45 additions & 0 deletions src/Messenger/Handler/AbstractRefreshHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\DataObject\Concrete;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Valantic\ElasticaBridgeBundle\Exception\EventListener\PimcoreElementNotFoundException;
use Valantic\ElasticaBridgeBundle\Messenger\Message\AbstractRefresh;

/** @template TModel of AbstractElement */
#[AsMessageHandler]
abstract class AbstractRefreshHandler
{
protected function resolveElement(AbstractRefresh $message): AbstractElement
{
/** @var class-string<TModel> $className */
$className = $message->className;

try {
$element = $className::getById($message->id);
} catch (\Throwable) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

if (!$element instanceof AbstractElement) {
// The element in question was deleted so we need a skeleton.
/** @var TModel $element */
$element = new ($className)();
$element->setId($message->id);

if ($element instanceof Concrete) {
$element->setPublished(false);
}
}

if ($element === null) {
throw new PimcoreElementNotFoundException($message->id, $message->className);
}

return $element;
}
}
28 changes: 28 additions & 0 deletions src/Messenger/Handler/RefreshElementHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
) {}

public function __invoke(RefreshElement $message): void
{
$element = $this->resolveElement($message);

$this->propagateChanges->handle($element);
}
}
37 changes: 37 additions & 0 deletions src/Messenger/Handler/RefreshElementInIndexHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Handler;

use Pimcore\Model\Element\AbstractElement;
use Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\LockService;
use Valantic\ElasticaBridgeBundle\Service\PropagateChanges;

/**
* @template TModel of AbstractElement
*
* @extends AbstractRefreshHandler<TModel>
*/
class RefreshElementInIndexHandler extends AbstractRefreshHandler
{
public function __construct(
private readonly PropagateChanges $propagateChanges,
private readonly LockService $lockService,
private readonly IndexRepository $indexRepository,
) {}

public function __invoke(RefreshElementInIndex $message): void
{
$index = $this->indexRepository->flattenedGet($message->index);
$element = $this->resolveElement($message);

if ($index->usesBlueGreenIndices() && !$this->lockService->getIndexingLock($index)->acquire()) {
$this->propagateChanges->handleIndex($element, $index, $index->getBlueGreenInactiveElasticaIndex());
}

$this->propagateChanges->handleIndex($element, $index);
}
}
20 changes: 20 additions & 0 deletions src/Messenger/Message/AbstractRefresh.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

abstract class AbstractRefresh
{
/** @var class-string<ElementInterface> */
public string $className;
public int $id;

protected function setElement(ElementInterface $element): void
{
$this->className = $element::class;
$this->id = $element->getId() ?? throw new \InvalidArgumentException('Pimcore ID is null.');
}
}
15 changes: 15 additions & 0 deletions src/Messenger/Message/RefreshElement.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElement extends AbstractRefresh
{
public function __construct(ElementInterface $element)
{
$this->setElement($element);
}
}
17 changes: 17 additions & 0 deletions src/Messenger/Message/RefreshElementInIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Messenger\Message;

use Pimcore\Model\Element\ElementInterface;

class RefreshElementInIndex extends AbstractRefresh
{
public function __construct(
ElementInterface $element,
public readonly string $index,
) {
$this->setElement($element);
}
}
14 changes: 13 additions & 1 deletion src/Repository/IndexRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Valantic\ElasticaBridgeBundle\Repository;

use Valantic\ElasticaBridgeBundle\Exception\Repository\ItemNotFoundInRepositoryException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Index\TenantAwareInterface;

Expand All @@ -17,7 +18,7 @@ class IndexRepository extends AbstractRepository
/**
* @return \Generator<string,IndexInterface,void,void>
*/
public function flattened(): \Generator
public function flattenedAll(): \Generator
{
foreach ($this->all() as $indexConfig) {
if ($indexConfig instanceof TenantAwareInterface) {
Expand All @@ -32,4 +33,15 @@ public function flattened(): \Generator
}
}
}

public function flattenedGet(string $key): IndexInterface
{
foreach ($this->flattenedAll() as $candidateKey => $index) {
if ($candidateKey === $key) {
return $index;
}
}

throw new ItemNotFoundInRepositoryException($key);
}
}
2 changes: 2 additions & 0 deletions src/Resources/config/pimcore/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
imports:
- { resource: messenger.yaml }
8 changes: 8 additions & 0 deletions src/Resources/config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
framework:
messenger:
enabled: true
transports:
elastica_bridge_index: 'doctrine://default?queue_name=elastica_bridge_index'
routing:
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElement: elastica_bridge_index
Valantic\ElasticaBridgeBundle\Messenger\Message\RefreshElementInIndex: elastica_bridge_index
5 changes: 5 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ services:
Valantic\ElasticaBridgeBundle\Repository\DocumentRepository:
arguments:
- !tagged_iterator valantic.elastica_bridge.document

Valantic\ElasticaBridgeBundle\Messenger\Handler\:
resource: '../../Messenger/Handler/*'
tags:
- { name: messenger.message_handler }
29 changes: 29 additions & 0 deletions src/Service/LockService.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Service;

use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\LockInterface;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;

class LockService
{
private const LOCK_PREFIX = 'pimcore-elastica-bridge';

public function __construct(
private readonly LockFactory $lockFactory,
private readonly ConfigurationRepository $configurationRepository,
) {}

public function getIndexingLock(IndexInterface $indexConfig): LockInterface
{
return $this->lockFactory
->createLock(
sprintf('%s:indexing:%s', self::LOCK_PREFIX, $indexConfig->getName()),
ttl: $this->configurationRepository->getIndexingLockTimeout()
);
}
}
Loading

0 comments on commit 892965c

Please sign in to comment.