-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEnqueueMessageProducerTest.php
115 lines (89 loc) · 4.49 KB
/
EnqueueMessageProducerTest.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<?php
namespace ProophTest\ServiceBus;
use Enqueue\Client\Config;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
use Enqueue\SimpleClient\SimpleClient;
use Formapro\Prooph\ServiceBus\Message\Enqueue\Commands;
use Formapro\Prooph\ServiceBus\Message\Enqueue\EnqueueMessageProcessor;
use Formapro\Prooph\ServiceBus\Message\Enqueue\EnqueueMessageProducer;
use Formapro\Prooph\ServiceBus\Message\Enqueue\EnqueueSerializer;
use PHPUnit\Framework\TestCase;
use Prooph\Common\Messaging\FQCNMessageFactory;
use Prooph\Common\Messaging\NoOpMessageConverter;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use Prooph\ServiceBus\QueryBus;
use ProophTest\ServiceBus\Mock\DoSomething;
use ProophTest\ServiceBus\Mock\MessageHandler;
use ProophTest\ServiceBus\Mock\SomethingDone;
use Symfony\Component\Filesystem\Filesystem;
class EnqueueMessageProducerTest extends TestCase
{
/**
* @var SimpleClient
*/
private $client;
private $serializer;
protected function setUp()
{
(new Filesystem())->remove(__DIR__.'/queues/');
$this->client = new SimpleClient('file://'.__DIR__.'/queues');
$this->serializer = new EnqueueSerializer(new FQCNMessageFactory(), new NoOpMessageConverter());
}
/**
* @test
*/
public function it_sends_a_command_to_queue_pulls_it_with_consumer_and_forwards_it_to_command_bus()
{
$command = new DoSomething(['data' => 'test command']);
//The message dispatcher works with a ready-to-use enqueue producer and one queue
$messageProducer = new EnqueueMessageProducer($this->client->getProducer(), $this->serializer);
//Normally you would send the command on a command bus. We skip this step here cause we are only
//interested in the function of the message dispatcher
$messageProducer($command);
//Set up command bus which will receive the command message from the enqueue consumer
$consumerCommandBus = new CommandBus();
$doSomethingHandler = new MessageHandler();
$router = new CommandRouter();
$router->route($command->messageName())->to($doSomethingHandler);
$router->attachToMessageBus($consumerCommandBus);
$enqueueProcessor = new EnqueueMessageProcessor($consumerCommandBus, new EventBus(), new QueryBus(), $this->serializer);
$this->client->bind(Config::COMMAND_TOPIC, Commands::PROOPH_BUS, $enqueueProcessor);
$this->client->consume(new ChainExtension([
new LimitConsumedMessagesExtension(1),
new LimitConsumptionTimeExtension(new \DateTime('now + 5 seconds'))
]));
$this->assertNotNull($doSomethingHandler->getLastMessage());
$this->assertEquals($command->payload(), $doSomethingHandler->getLastMessage()->payload());
}
/**
* @test
*/
public function it_sends_an_event_to_queue_pulls_it_with_consumer_and_forwards_it_to_event_bus()
{
$event = new SomethingDone(['data' => 'test event']);
//The message dispatcher works with a ready-to-use enqueue producer and one queue
$messageProducer = new EnqueueMessageProducer($this->client->getProducer(), $this->serializer);
//Normally you would send the event on a event bus. We skip this step here cause we are only
//interested in the function of the message dispatcher
$messageProducer($event);
//Set up event bus which will receive the event message from the enqueue consumer
$consumerEventBus = new EventBus();
$somethingDoneListener = new MessageHandler();
$router = new EventRouter();
$router->route($event->messageName())->to($somethingDoneListener);
$router->attachToMessageBus($consumerEventBus);
$enqueueProcessor = new EnqueueMessageProcessor(new CommandBus(), $consumerEventBus, new QueryBus(), $this->serializer);
$this->client->bind(Config::COMMAND_TOPIC, Commands::PROOPH_BUS, $enqueueProcessor);
$this->client->consume(new ChainExtension([
new LimitConsumedMessagesExtension(1),
new LimitConsumptionTimeExtension(new \DateTime('now + 5 seconds'))
]));
$this->assertNotNull($somethingDoneListener->getLastMessage());
$this->assertEquals($event->payload(), $somethingDoneListener->getLastMessage()->payload());
}
}