Skip to content

Commit 3cf0b85

Browse files
authored
Merge pull request #2 from php-enqueue/upd-to-0-3-enqueue
Sync with change in Enqueue 0.3.x version.
2 parents f1b432b + 5c1026b commit 3cf0b85

5 files changed

+32
-22
lines changed

Async/ElasticaPopulateProcessor.php

+17-8
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
<?php
22
namespace Enqueue\ElasticaBundle\Async;
33

4-
use Enqueue\Psr\Context;
5-
use Enqueue\Psr\Message;
6-
use Enqueue\Psr\Processor;
4+
use Enqueue\Consumption\QueueSubscriberInterface;
5+
use Enqueue\Psr\PsrContext;
6+
use Enqueue\Psr\PsrMessage;
7+
use Enqueue\Psr\PsrProcessor;
78
use Enqueue\Util\JSON;
89
use FOS\ElasticaBundle\Provider\ProviderRegistry;
910

10-
class ElasticaPopulateProcessor implements Processor
11+
class ElasticaPopulateProcessor implements PsrProcessor, QueueSubscriberInterface
1112
{
1213
/**
1314
* @var ProviderRegistry
@@ -25,7 +26,7 @@ public function __construct(ProviderRegistry $providerRegistry)
2526
/**
2627
* {@inheritdoc}
2728
*/
28-
public function process(Message $message, Context $context)
29+
public function process(PsrMessage $message, PsrContext $context)
2930
{
3031
if (false == $message->getReplyTo()) {
3132
return self::REJECT;
@@ -54,11 +55,11 @@ public function process(Message $message, Context $context)
5455
}
5556

5657
/**
57-
* @param Context $context
58-
* @param Message $message
58+
* @param PsrContext $context
59+
* @param PsrMessage $message
5960
* @param bool $successful
6061
*/
61-
private function sendReply(Context $context, Message $message, $successful)
62+
private function sendReply(PsrContext $context, PsrMessage $message, $successful)
6263
{
6364
$replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
6465
$replyMessage->setProperty('fos-populate-successful', (int) $successful);
@@ -67,4 +68,12 @@ private function sendReply(Context $context, Message $message, $successful)
6768

6869
$context->createProducer()->send($replyQueue, $replyMessage);
6970
}
71+
72+
/**
73+
* {@inheritdoc}
74+
*/
75+
public static function getSubscribedQueues()
76+
{
77+
return ['fos_elastica_populate'];
78+
}
7079
}

Elastica/AsyncDoctrineOrmProvider.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php
22
namespace Enqueue\ElasticaBundle\Elastica;
33

4-
use Enqueue\Psr\Context;
4+
use Enqueue\Psr\PsrContext;
55
use Enqueue\Util\JSON;
66
use FOS\ElasticaBundle\Doctrine\ORM\Provider;
77

@@ -10,14 +10,14 @@ class AsyncDoctrineOrmProvider extends Provider
1010
private $batchSize;
1111

1212
/**
13-
* @var Context
13+
* @var PsrContext
1414
*/
1515
private $context;
1616

1717
/**
18-
* @param Context $context
18+
* @param PsrContext $context
1919
*/
20-
public function setContext(Context $context)
20+
public function setContext(PsrContext $context)
2121
{
2222
$this->context = $context;
2323
}
@@ -42,7 +42,7 @@ protected function doPopulate($options, \Closure $loggerClosure = null)
4242
$nbObjects = $this->countObjects($queryBuilder);
4343
$offset = $options['offset'];
4444

45-
$queue = $this->context->createQueue('fos_elastica.populate');
45+
$queue = $this->context->createQueue('fos_elastica_populate');
4646
$resultQueue = $this->context->createTemporaryQueue();
4747
$consumer = $this->context->createConsumer($resultQueue);
4848

Listener/PurgeFosElasticPopulateQueueListener.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
<?php
22
namespace Enqueue\ElasticaBundle\Listener;
33

4-
use Enqueue\Psr\Context;
4+
use Enqueue\Psr\PsrContext;
55
use FOS\ElasticaBundle\Event\IndexPopulateEvent;
66
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
77

88
class PurgeFosElasticPopulateQueueListener implements EventSubscriberInterface
99
{
1010
/**
11-
* @var Context
11+
* @var PsrContext
1212
*/
1313
private $context;
1414

1515
/**
16-
* @param Context $context
16+
* @param PsrContext $context
1717
*/
18-
public function __construct(Context $context)
18+
public function __construct(PsrContext $context)
1919
{
2020
$this->context = $context;
2121
}
2222

2323
public function onPreIndexPopulate(IndexPopulateEvent $event)
2424
{
2525
if (method_exists($this->context, 'purge')) {
26-
$queue = $this->context->createQueue('fos_elastica.populate');
26+
$queue = $this->context->createQueue('fos_elastica_populate');
2727

2828
$this->context->purge($queue);
2929

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ enqueue:
6767
```
6868
6969
Sure you can configure other transports like: [rabbitmq, amqp, stomp and so on](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/config_reference.md)
70-
Create a `fos_elastica.populate` queue on broker side, if needed.
70+
Create a `fos_elastica_populate` queue on broker side, if needed.
7171

7272
## Usage
7373

@@ -87,7 +87,7 @@ $ ENQUEUE_ELASTICA_DISABLE_ASYNC=1 ./bin/console fos:elastica:populate
8787
and have pull of consumer commands run somewhere, run them as many as you'd like
8888

8989
```bash
90-
$ ./bin/console enqueue:transport:consume fos_elastica.populate enqueue_elastica.populate_processor
90+
$ ./bin/console enqueue:transport:consume enqueue_elastica.populate_processor -vv
9191
```
9292

9393
We suggest to use [supervisor](http://supervisord.org/) on production to control numbers of consumers and restart them.
@@ -97,7 +97,7 @@ Here's config example
9797
```
9898
# cat /etc/supervisor/conf.d/fos_elastica_populate.conf
9999
[program:fos_elastica_populate]
100-
command=/mqs/symfony/bin/console enqueue:transport:consume fos_elastica.populate enqueue_elastica.populate_processor
100+
command=/mqs/symfony/bin/console enqueue:transport:consume fos_elastica_populate enqueue_elastica.populate_processor
101101
process_name=%(program_name)s_%(process_num)02d
102102
numprocs=10
103103
autostart=true

composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
"require": {
88
"php": ">=5.6",
99
"friendsofsymfony/elastica-bundle": "^3",
10-
"enqueue/enqueue-bundle": "^0.2"
10+
"enqueue/enqueue-bundle": "^0.3.5",
11+
"enqueue/enqueue": "^0.3.5"
1112
},
1213
"autoload": {
1314
"psr-4": { "Enqueue\\ElasticaBundle\\": "" }

0 commit comments

Comments
 (0)