Skip to content

Commit 8e94925

Browse files
committed
fixes
1 parent cf35f92 commit 8e94925

File tree

2 files changed

+11
-16
lines changed

2 files changed

+11
-16
lines changed

Async/ElasticaPopulateProcessor.php

+9-16
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use Enqueue\Client\CommandSubscriberInterface;
55
use Enqueue\Consumption\QueueSubscriberInterface;
6+
use Enqueue\Consumption\Result;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrMessage;
89
use Interop\Queue\PsrProcessor;
@@ -29,14 +30,10 @@ public function __construct(ProviderRegistry $providerRegistry)
2930
*/
3031
public function process(PsrMessage $message, PsrContext $context)
3132
{
32-
if (false == $message->getReplyTo()) {
33-
return self::REJECT;
34-
}
35-
3633
if ($message->isRedelivered()) {
37-
$this->sendReply($context, $message, false);
34+
$this->createReplyMessage($context, $message, false);
3835

39-
return self::REJECT;
36+
return Result::reply($this->createReplyMessage($context, $message, false), Result::REJECT);
4037
}
4138

4239
try {
@@ -45,29 +42,25 @@ public function process(PsrMessage $message, PsrContext $context)
4542
$provider = $this->providerRegistry->getProvider($options['indexName'], $options['typeName']);
4643
$provider->populate(null, $options);
4744

48-
$this->sendReply($context, $message, true);
49-
50-
return self::ACK;
45+
return Result::reply($this->createReplyMessage($context, $message, true));
5146
} catch (\Exception $e) {
52-
$this->sendReply($context, $message, false);
53-
54-
return self::REJECT;
47+
return Result::reply($this->createReplyMessage($context, $message, false), Result::REJECT);
5548
}
5649
}
5750

5851
/**
5952
* @param PsrContext $context
6053
* @param PsrMessage $message
6154
* @param bool $successful
55+
*
56+
* @return PsrMessage
6257
*/
63-
private function sendReply(PsrContext $context, PsrMessage $message, $successful)
58+
private function createReplyMessage(PsrContext $context, PsrMessage $message, $successful)
6459
{
6560
$replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
6661
$replyMessage->setProperty('fos-populate-successful', (int) $successful);
6762

68-
$replyQueue = $context->createQueue($message->getReplyTo());
69-
70-
$context->createProducer()->send($replyQueue, $replyMessage);
63+
return $replyMessage;
7164
}
7265

7366
/**

Resources/config/services.yml

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ services:
33
class: 'Enqueue\ElasticaBundle\Async\ElasticaPopulateProcessor'
44
arguments:
55
- '@fos_elastica.provider_registry'
6+
tags:
7+
- { name: "enqueue.client.processor" }
68

79
enqueue_elastica.purge_fos_elastic_populate_queue_listener:
810
class: 'Enqueue\ElasticaBundle\Listener\PurgeFosElasticPopulateQueueListener'

0 commit comments

Comments
 (0)