From b5f0a0935e8076c98cb88f959ef9a59ba79a06d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20H=C3=A4user?= Date: Wed, 25 Sep 2024 13:56:57 +0200 Subject: [PATCH] Support qourum queues Not sure yet how to migrate, but at least start somehow/where. --- src/Rebuy/Amqp/Consumer/ConsumerManager.php | 67 ++++++------------- .../Amqp/Consumer/ConsumerManagerTest.php | 26 +------ 2 files changed, 22 insertions(+), 71 deletions(-) diff --git a/src/Rebuy/Amqp/Consumer/ConsumerManager.php b/src/Rebuy/Amqp/Consumer/ConsumerManager.php index fcc9bd3..67184be 100644 --- a/src/Rebuy/Amqp/Consumer/ConsumerManager.php +++ b/src/Rebuy/Amqp/Consumer/ConsumerManager.php @@ -21,45 +21,21 @@ class ConsumerManager { const DEFAULT_IDLE_TIMEOUT = 900; - /** - * @var ConsumerContainer[] - */ - private $consumerContainers; + private array $consumerContainers; - /** - * @var Serializer - */ - private $serializer; + private Serializer $serializer; - /** - * @var EventDispatcherInterface - */ - private $eventDispatcher; + private EventDispatcherInterface $eventDispatcher; - /** - * @var AMQPChannel - */ - private $channel; + private AMQPChannel $channel; - /** - * @var string - */ - private $exchangeName; + private string $exchangeName; - /** - * @var Collection - */ - private $errorHandlers; + private ArrayCollection $errorHandlers; - /** - * @var int - */ - private $idleTimeout = self::DEFAULT_IDLE_TIMEOUT; + private int $idleTimeout = self::DEFAULT_IDLE_TIMEOUT; - /** - * @var Parser - */ - private $parser; + private Parser $parser; /** * @param AMQPChannel $channel @@ -67,7 +43,7 @@ class ConsumerManager * @param Serializer $serializer * @param Parser $parser */ - public function __construct(AMQPChannel $channel, $exchangeName, Serializer $serializer, Parser $parser) + public function __construct(AMQPChannel $channel, string $exchangeName, Serializer $serializer, Parser $parser) { $this->serializer = $serializer; $this->eventDispatcher = new EventDispatcher(); @@ -79,7 +55,7 @@ public function __construct(AMQPChannel $channel, $exchangeName, Serializer $ser $this->parser = $parser; } - public function wait() + public function wait(): void { while (count($this->channel->callbacks)) { $this->channel->wait(null, false, $this->idleTimeout); @@ -91,7 +67,7 @@ public function wait() * * @throws ConsumerException */ - public function registerConsumer($consumer) + public function registerConsumer(object $consumer): void { $type = gettype($consumer); if ($type !== 'object') { @@ -107,7 +83,7 @@ public function registerConsumer($consumer) /** * @param EventDispatcherInterface $eventDispatcher */ - public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) + public function setEventDispatcher(EventDispatcherInterface $eventDispatcher): void { $this->eventDispatcher = $eventDispatcher; } @@ -117,7 +93,7 @@ public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) * * @throws ConsumerException */ - private function registerConsumerContainer(ConsumerContainer $consumerContainer) + private function registerConsumerContainer(ConsumerContainer $consumerContainer): void { $consumerName = $consumerContainer->getConsumerName(); if (isset($this->consumerContainers[$consumerName])) { @@ -131,7 +107,7 @@ private function registerConsumerContainer(ConsumerContainer $consumerContainer) ); } - $this->channel->queue_declare($consumerName, false, true, false, false); + $this->channel->queue_declare($consumerName, false, true, false, false, arguments: ['x-queue-type' => 'quorum']); foreach ($consumerContainer->getBindings() as $binding) { $this->channel->queue_bind($consumerName, $this->exchangeName, $binding); } @@ -148,15 +124,12 @@ private function registerConsumerContainer(ConsumerContainer $consumerContainer) /** * @param ErrorHandlerInterface $errorHandler */ - public function registerErrorHandler(ErrorHandlerInterface $errorHandler) + public function registerErrorHandler(ErrorHandlerInterface $errorHandler): void { $this->errorHandlers->add($errorHandler); } - /** - * @param int $idleTimeout - */ - public function setIdleTimeout($idleTimeout) + public function setIdleTimeout(int $idleTimeout): void { $this->idleTimeout = $idleTimeout; } @@ -164,7 +137,7 @@ public function setIdleTimeout($idleTimeout) /** * @return int */ - public function getIdleTimeout() + public function getIdleTimeout(): int { return $this->idleTimeout; } @@ -175,7 +148,7 @@ public function getIdleTimeout() * * @return mixed|null */ - private function consume(ConsumerContainer $container, AMQPMessage $message) + private function consume(ConsumerContainer $container, AMQPMessage $message): mixed { $event = new ConsumerEvent($message, $container); $this->dispatchEvent($event, ConsumerEvents::PRE_CONSUME); @@ -194,9 +167,9 @@ private function consume(ConsumerContainer $container, AMQPMessage $message) * @throws ConsumerContainerException * @return mixed|null */ - private function invoke(ConsumerContainer $consumerContainer, AMQPMessage $message) + private function invoke(ConsumerContainer $consumerContainer, AMQPMessage $message): mixed { - $payload = $this->serializer->deserialize($message->body, $consumerContainer->getMessageClass(), 'json'); + $payload = $this->serializer->deserialize($message->getBody(), $consumerContainer->getMessageClass(), 'json'); try { $result = $consumerContainer->invoke($payload); diff --git a/tests/Rebuy/Tests/Amqp/Consumer/ConsumerManagerTest.php b/tests/Rebuy/Tests/Amqp/Consumer/ConsumerManagerTest.php index b5f5725..f228f9b 100644 --- a/tests/Rebuy/Tests/Amqp/Consumer/ConsumerManagerTest.php +++ b/tests/Rebuy/Tests/Amqp/Consumer/ConsumerManagerTest.php @@ -62,28 +62,6 @@ protected function setUp(): void $this->manager->setEventDispatcher($this->eventDispatcher->reveal()); } - /** - * @test - */ - public function register_consumer_with_string_parameter_should_throw_exception() - { - $this->expectException(ConsumerException::class); - $this->expectExceptionMessage('Expected argument of type "object", "string" given'); - - $this->manager->registerConsumer('string'); - } - - /** - * @test - */ - public function register_consumer_with_int_parameter_should_throw_exception() - { - $this->expectException(ConsumerException::class); - $this->expectExceptionMessage('Expected argument of type "object", "integer" given'); - - $this->manager->registerConsumer(12); - } - /** * @test */ @@ -99,7 +77,7 @@ public function register_consumer_should_declare_queue() $this->manager->registerConsumer($consumer); - $this->channel->queue_declare('myName', false, true, false, false)->shouldHaveBeenCalled(); + $this->channel->queue_declare('myName', false, true, false, false, false, ['x-queue-type' => 'quorum'])->shouldHaveBeenCalled(); } /** @@ -145,7 +123,7 @@ public function register_consumer_should_bind_queues() $this->channel->basic_qos(null, 1, false)->shouldBeCalled(); $this->channel->basic_consume($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled(); - $this->channel->queue_declare($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled(); + $this->channel->queue_declare($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled(); $this->channel->queue_bind($consumerName, self::EXCHANGE_NAME, $binding1)->shouldBeCalled(); $this->channel->queue_bind($consumerName, self::EXCHANGE_NAME, $binding2)->shouldBeCalled();