Skip to content

Commit b596efb

Browse files
authored
Merge pull request #7 from php-enqueue/sync-index-with-doctrine-orm-object-change
[doctrine] Sync index with doctrine orm object change.
2 parents 82f6560 + 088f9ea commit b596efb

File tree

7 files changed

+306
-7
lines changed

7 files changed

+306
-7
lines changed

DependencyInjection/Configuration.php

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Enqueue\ElasticaBundle\DependencyInjection;
4+
5+
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
6+
use Symfony\Component\Config\Definition\ConfigurationInterface;
7+
8+
class Configuration implements ConfigurationInterface
9+
{
10+
/**
11+
* {@inheritdoc}
12+
*/
13+
public function getConfigTreeBuilder()
14+
{
15+
$tb = new TreeBuilder();
16+
$rootNode = $tb->root('enqueue_elastica');
17+
$rootNode
18+
->children()
19+
->arrayNode('doctrine')
20+
->children()
21+
->arrayNode('queue_listeners')
22+
->prototype('array')
23+
->addDefaultsIfNotSet()
24+
->children()
25+
->booleanNode('insert')->defaultTrue()->end()
26+
->booleanNode('update')->defaultTrue()->end()
27+
->booleanNode('remove')->defaultTrue()->end()
28+
->scalarNode('connection')->defaultValue('default')->cannotBeEmpty()->end()
29+
->scalarNode('index_name')->isRequired()->cannotBeEmpty()->end()
30+
->scalarNode('type_name')->isRequired()->cannotBeEmpty()->end()
31+
->scalarNode('model_class')->isRequired()->cannotBeEmpty()->end()
32+
->scalarNode('model_id')->defaultValue('id')->cannotBeEmpty()->end()->end()
33+
;
34+
35+
return $tb;
36+
}
37+
}

DependencyInjection/EnqueueElasticaExtension.php

+21
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
namespace Enqueue\ElasticaBundle\DependencyInjection;
44

5+
use Enqueue\ElasticaBundle\Doctrine\SyncIndexWithObjectChangeListener;
56
use Symfony\Component\Config\FileLocator;
67
use Symfony\Component\DependencyInjection\ContainerBuilder;
78
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
9+
use Symfony\Component\DependencyInjection\Reference;
810
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
911

1012
class EnqueueElasticaExtension extends Extension
@@ -14,7 +16,26 @@ class EnqueueElasticaExtension extends Extension
1416
*/
1517
public function load(array $configs, ContainerBuilder $container)
1618
{
19+
$config = $this->processConfiguration(new Configuration(), $configs);
20+
1721
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
1822
$loader->load('services.yml');
23+
24+
if (false == empty($config['doctrine']['queue_listeners'])) {
25+
foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) {
26+
$listenerId = sprintf(
27+
'enqueue_elastica.doctrine_queue_listener.%s.%s',
28+
$listenerConfig['index_name'],
29+
$listenerConfig['type_name']
30+
);
31+
32+
$container->register($listenerId, SyncIndexWithObjectChangeListener::class)
33+
->addArgument(new Reference('enqueue.transport.context'))
34+
->addArgument($listenerConfig['model_class'])
35+
->addArgument($listenerConfig)
36+
->addTag('doctrine.event_subscriber', ['connection' => $listenerConfig['connection']])
37+
;
38+
}
39+
}
1940
}
2041
}

Doctrine/Queue/Commands.php

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine\Queue;
3+
4+
final class Commands
5+
{
6+
const SYNC_INDEX_WITH_OBJECT_CHANGE = 'fos_elastica_doctrine_orm_sync_index_with_object_change';
7+
8+
private function __construct()
9+
{
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine\Queue;
3+
4+
use Enqueue\Client\CommandSubscriberInterface;
5+
use Enqueue\Consumption\QueueSubscriberInterface;
6+
use Enqueue\Consumption\Result;
7+
use Enqueue\Util\JSON;
8+
use FOS\ElasticaBundle\Persister\PersisterRegistry;
9+
use FOS\ElasticaBundle\Provider\IndexableInterface;
10+
use Interop\Queue\PsrContext;
11+
use Interop\Queue\PsrMessage;
12+
use Interop\Queue\PsrProcessor;
13+
use Symfony\Bridge\Doctrine\RegistryInterface;
14+
15+
final class SyncIndexWithObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
16+
{
17+
const INSERT_ACTION = 'insert';
18+
19+
const UPDATE_ACTION = 'update';
20+
21+
const REMOVE_ACTION = 'remove';
22+
23+
/**
24+
* @var PersisterRegistry
25+
*/
26+
private $persisterRegistry;
27+
28+
/**
29+
* @var IndexableInterface
30+
*/
31+
private $indexable;
32+
33+
/**
34+
* @var RegistryInterface
35+
*/
36+
private $doctrine;
37+
38+
public function __construct(RegistryInterface $doctrine, PersisterRegistry $persisterRegistry, IndexableInterface $indexable)
39+
{
40+
$this->persisterRegistry = $persisterRegistry;
41+
$this->indexable = $indexable;
42+
$this->doctrine = $doctrine;
43+
}
44+
45+
/**
46+
* {@inheritdoc}
47+
*/
48+
public function process(PsrMessage $message, PsrContext $context)
49+
{
50+
$data = JSON::decode($message->getBody());
51+
52+
if (false == isset($data['action'])) {
53+
return Result::reject('The message data misses action');
54+
}
55+
if (false == isset($data['model_class'])) {
56+
return Result::reject('The message data misses model_class');
57+
}
58+
if (false == isset($data['id'])) {
59+
return Result::reject('The message data misses id');
60+
}
61+
if (false == isset($data['index_name'])) {
62+
return Result::reject('The message data misses id');
63+
}
64+
if (false == isset($data['type_name'])) {
65+
return Result::reject('The message data misses id');
66+
}
67+
68+
$action = $data['action'];
69+
$modelClass = $data['model_class'];
70+
$id = $data['id'];
71+
$index = $data['index_name'];
72+
$type = $data['type_name'];
73+
74+
$repository = $this->doctrine->getManagerForClass($modelClass)->getRepository($modelClass);
75+
$persister = $this->persisterRegistry->getPersister($index, $type);
76+
77+
switch ($action) {
78+
case self::UPDATE_ACTION:
79+
if (false == $object = $repository->find($id)) {
80+
$persister->deleteById($id);
81+
82+
return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id));
83+
}
84+
85+
if ($persister->handlesObject($object)) {
86+
if ($this->indexable->isObjectIndexable($index, $type, $object)) {
87+
$persister->replaceOne($object);
88+
} else {
89+
$persister->deleteOne($object);
90+
}
91+
}
92+
93+
return self::ACK;
94+
case self::INSERT_ACTION:
95+
if (false == $object = $repository->find($id)) {
96+
$persister->deleteById($id);
97+
98+
return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id));
99+
}
100+
101+
if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $type, $object)) {
102+
$persister->insertOne($object);
103+
}
104+
105+
return self::ACK;
106+
case self::REMOVE_ACTION:
107+
$persister->deleteById($id);
108+
109+
return self::ACK;
110+
default:
111+
return Result::reject(sprintf('The action "%s" is not supported', $action));
112+
}
113+
}
114+
115+
/**
116+
* {@inheritdoc}
117+
*/
118+
public static function getSubscribedCommand()
119+
{
120+
return [
121+
'processorName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
122+
'queueName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
123+
'queueNameHardcoded' => true,
124+
'exclusive' => true,
125+
];
126+
}
127+
128+
/**
129+
* {@inheritdoc}
130+
*/
131+
public static function getSubscribedQueues()
132+
{
133+
return [Commands::SYNC_INDEX_WITH_OBJECT_CHANGE];
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Doctrine;
3+
4+
use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
5+
use Enqueue\ElasticaBundle\Doctrine\Queue\Commands;
6+
use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor;
7+
use Enqueue\Util\JSON;
8+
use Interop\Queue\PsrContext;
9+
use Doctrine\Common\EventSubscriber;
10+
11+
final class SyncIndexWithObjectChangeListener implements EventSubscriber
12+
{
13+
/**
14+
* @var PsrContext
15+
*/
16+
private $context;
17+
18+
/**
19+
* @var string
20+
*/
21+
private $modelClass;
22+
23+
/**
24+
* @var array
25+
*/
26+
private $config;
27+
28+
public function __construct(PsrContext $context, $modelClass, array $config)
29+
{
30+
$this->context = $context;
31+
$this->modelClass = $modelClass;
32+
$this->config = $config;
33+
}
34+
35+
public function postUpdate(LifecycleEventArgs $args)
36+
{
37+
38+
if ($args->getObject() instanceof $this->modelClass) {
39+
$this->sendUpdateIndexMessage(SyncProcessor::UPDATE_ACTION, $args);
40+
}
41+
}
42+
43+
public function postPersist(LifecycleEventArgs $args)
44+
{
45+
if ($args->getObject() instanceof $this->modelClass) {
46+
$this->sendUpdateIndexMessage(SyncProcessor::INSERT_ACTION, $args);
47+
}
48+
}
49+
50+
public function preRemove(LifecycleEventArgs $args)
51+
{
52+
if ($args->getObject() instanceof $this->modelClass) {
53+
$this->sendUpdateIndexMessage(SyncProcessor::REMOVE_ACTION, $args);
54+
}
55+
}
56+
57+
public function getSubscribedEvents()
58+
{
59+
return [
60+
'postPersist',
61+
'postUpdate',
62+
'preRemove',
63+
];
64+
}
65+
66+
/**
67+
* @param string $action
68+
* @param LifecycleEventArgs $args
69+
*/
70+
private function sendUpdateIndexMessage($action, LifecycleEventArgs $args)
71+
{
72+
$object = $args->getObject();
73+
74+
$rp = new \ReflectionProperty($object, $this->config['model_id']);
75+
$rp->setAccessible(true);
76+
$id = $rp->getValue($object);
77+
$rp->setAccessible(false);
78+
79+
$queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_OBJECT_CHANGE);
80+
81+
$message = $this->context->createMessage(JSON::encode([
82+
'action' => $action,
83+
'model_class' => $this->modelClass,
84+
'model_id' => $this->config['model_id'],
85+
'id' => $id,
86+
'index_name' => $this->config['index_name'],
87+
'type_name' => $this->config['type_name'],
88+
]));
89+
90+
$this->context->createProducer()->send($queue, $message);
91+
}
92+
}

Queue/PopulateProcessor.php

+1-7
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,15 @@
44
use Enqueue\Client\CommandSubscriberInterface;
55
use Enqueue\Consumption\QueueSubscriberInterface;
66
use Enqueue\Consumption\Result;
7-
use FOS\ElasticaBundle\Persister\Event\Events;
8-
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
9-
use FOS\ElasticaBundle\Persister\Event\PreInsertObjectsEvent;
107
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
118
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
12-
use FOS\ElasticaBundle\Provider\PagerInterface;
139
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
1410
use Interop\Queue\PsrContext;
1511
use Interop\Queue\PsrMessage;
1612
use Interop\Queue\PsrProcessor;
1713
use Enqueue\Util\JSON;
18-
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
19-
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
2014

21-
class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
15+
final class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
2216
{
2317
/**
2418
* @var PagerProviderRegistry

Resources/config/services.yml

+9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ services:
77
tags:
88
- { name: "enqueue.client.processor" }
99

10+
enqueue_elastica.doctrine.sync_index_with_object_change_processor:
11+
class: 'Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor'
12+
arguments:
13+
- '@doctrine'
14+
- '@fos_elastica.persister_registry'
15+
- '@fos_elastica.indexable'
16+
tags:
17+
- { name: "enqueue.client.processor" }
18+
1019
enqueue_elastica.purge_populate_queue_listener:
1120
class: 'Enqueue\ElasticaBundle\Persister\Listener\PurgePopulateQueueListener'
1221
arguments:

0 commit comments

Comments
 (0)