Skip to content

Commit 3d44dbf

Browse files
authored
Merge pull request #3 from php-enqueue/use-enqueue05-features
Upgrade to enqueue 0.5.x. Use its features. Simplify code.
2 parents 93d38f1 + 5067010 commit 3d44dbf

File tree

5 files changed

+74
-48
lines changed

5 files changed

+74
-48
lines changed

Async/Commands.php

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
namespace Enqueue\ElasticaBundle\Async;
3+
4+
5+
class Commands
6+
{
7+
const POPULATE = 'fos_elastica_populate';
8+
9+
private function __construct()
10+
{
11+
}
12+
}

Async/ElasticaPopulateProcessor.php

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
<?php
22
namespace Enqueue\ElasticaBundle\Async;
33

4+
use Enqueue\Client\CommandSubscriberInterface;
45
use Enqueue\Consumption\QueueSubscriberInterface;
56
use Enqueue\Psr\PsrContext;
67
use Enqueue\Psr\PsrMessage;
78
use Enqueue\Psr\PsrProcessor;
89
use Enqueue\Util\JSON;
910
use FOS\ElasticaBundle\Provider\ProviderRegistry;
1011

11-
class ElasticaPopulateProcessor implements PsrProcessor, QueueSubscriberInterface
12+
class ElasticaPopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
1213
{
1314
/**
1415
* @var ProviderRegistry
@@ -69,11 +70,24 @@ private function sendReply(PsrContext $context, PsrMessage $message, $successful
6970
$context->createProducer()->send($replyQueue, $replyMessage);
7071
}
7172

73+
/**
74+
* {@inheritdoc}
75+
*/
76+
public static function getSubscribedCommand()
77+
{
78+
return [
79+
'processorName' => Commands::POPULATE,
80+
'queueName' => Commands::POPULATE,
81+
'queueNameHardcoded' => true,
82+
'exclusive' => true,
83+
];
84+
}
85+
7286
/**
7387
* {@inheritdoc}
7488
*/
7589
public static function getSubscribedQueues()
7690
{
77-
return ['fos_elastica_populate'];
91+
return [Commands::POPULATE];
7892
}
7993
}

Elastica/AsyncDoctrineOrmProvider.php

+36-40
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
<?php
22
namespace Enqueue\ElasticaBundle\Elastica;
33

4-
use Enqueue\Psr\PsrContext;
5-
use Enqueue\Util\JSON;
4+
use Enqueue\Client\ProducerInterface;
5+
use Enqueue\ElasticaBundle\Async\Commands;
6+
use Enqueue\Rpc\Promise;
67
use FOS\ElasticaBundle\Doctrine\ORM\Provider;
78

89
class AsyncDoctrineOrmProvider extends Provider
910
{
11+
/**
12+
* @var int
13+
*/
1014
private $batchSize;
1115

1216
/**
13-
* @var PsrContext
17+
* @var ProducerInterface
1418
*/
15-
private $context;
19+
private $producer;
1620

1721
/**
18-
* @param PsrContext $context
22+
* @param ProducerInterface $producer
1923
*/
20-
public function setContext(PsrContext $context)
24+
public function setContext(ProducerInterface $producer)
2125
{
22-
$this->context = $context;
26+
$this->producer = $producer;
2327
}
2428

2529
/**
@@ -42,49 +46,41 @@ protected function doPopulate($options, \Closure $loggerClosure = null)
4246
$nbObjects = $this->countObjects($queryBuilder);
4347
$offset = $options['offset'];
4448

45-
$queue = $this->context->createQueue('fos_elastica_populate');
46-
$resultQueue = $this->context->createTemporaryQueue();
47-
$consumer = $this->context->createConsumer($resultQueue);
48-
49-
$producer = $this->context->createProducer();
50-
51-
$nbMessages = 0;
49+
/** @var Promise[] $promises */
50+
$promises = [];
5251
for (; $offset < $nbObjects; $offset += $options['batch_size']) {
5352
$options['offset'] = $offset;
5453
$options['real_populate'] = true;
55-
$message = $this->context->createMessage(JSON::encode($options));
56-
$message->setReplyTo($resultQueue->getQueueName());
57-
$producer->send($queue, $message);
5854

59-
$nbMessages++;
55+
$promises[] = $this->producer->sendCommand(Commands::POPULATE, $options, true);
6056
}
6157

6258
$limitTime = time() + 180;
63-
while ($nbMessages) {
64-
if ($message = $consumer->receive(20000)) {
65-
$errorMessage = null;
66-
67-
$errorMessage = null;
68-
if (false == $message->getProperty('fos-populate-successful', false)) {
69-
$errorMessage = sprintf(
70-
'<error>Batch failed: </error> <comment>Failed to process message %s</comment>',
71-
$message->getBody()
72-
);
73-
}
74-
75-
if ($loggerClosure) {
76-
$loggerClosure($options['batch_size'], $nbObjects, $errorMessage);
59+
while ($promises) {
60+
foreach ($promises as $index => $promise) {
61+
if ($message = $promise->receiveNoWait()) {
62+
unset($promises[$index]);
63+
64+
$errorMessage = null;
65+
if (false == $message->getProperty('fos-populate-successful', false)) {
66+
$errorMessage = sprintf(
67+
'<error>Batch failed: </error> <comment>Failed to process message %s</comment>',
68+
$message->getBody()
69+
);
70+
}
71+
72+
if ($loggerClosure) {
73+
$loggerClosure($options['batch_size'], $nbObjects, $errorMessage);
74+
}
75+
76+
$limitTime = time() + 180;
7777
}
7878

79-
$consumer->acknowledge($message);
80-
81-
$nbMessages--;
79+
sleep(1);
8280

83-
$limitTime = time() + 180;
84-
}
85-
86-
if (time() > $limitTime) {
87-
throw new \LogicException(sprintf('No response in %d seconds', 180));
81+
if (time() > $limitTime) {
82+
throw new \LogicException(sprintf('No response in %d seconds', 180));
83+
}
8884
}
8985
}
9086
}

README.md

+9-5
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ Here's an example of what your EnqueueBundle configuration may look like:
6161
```yaml
6262
enqueue:
6363
transport:
64-
default: 'fs'
65-
fs:
66-
store_dir: %kernel.root_dir%/../var/messages
64+
default: 'file://%kernel.root_dir%/../var/messages'
6765
```
6866
6967
Sure you can configure other transports like: [rabbitmq, amqp, stomp and so on](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/config_reference.md)
@@ -84,10 +82,16 @@ If you want to disable this behavior you can un register the bundle or use env v
8482
$ ENQUEUE_ELASTICA_DISABLE_ASYNC=1 ./bin/console fos:elastica:populate
8583
```
8684

87-
and have pull of consumer commands run somewhere, run them as many as you'd like
85+
Run some consumers either using client (you might have to enable it) consume command:
86+
87+
```bash
88+
$ ./bin/console enqueue:consume --setup-broker -vvv
89+
```
90+
91+
or a transport one:
8892

8993
```bash
90-
$ ./bin/console enqueue:transport:consume enqueue_elastica.populate_processor -vv
94+
$ ./bin/console enqueue:transport:consume enqueue_elastica.populate_processor -vvv
9195
```
9296

9397
We suggest to use [supervisor](http://supervisord.org/) on production to control numbers of consumers and restart them.

Resources/config/services.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ services:
77
enqueue_elastica.purge_fos_elastic_populate_queue_listener:
88
class: 'Enqueue\ElasticaBundle\Listener\PurgeFosElasticPopulateQueueListener'
99
arguments:
10-
- '@enqueue.transport.context'
10+
- '@enqueue.client.producer'
1111
tags:
1212
- { name: 'kernel.event_subscriber' }

0 commit comments

Comments
 (0)