Skip to content

Commit

Permalink
Support qourum queues
Browse files Browse the repository at this point in the history
Not sure yet how to migrate, but at least start somehow/where.
  • Loading branch information
bjoernhaeuser committed Sep 25, 2024
1 parent ae7e930 commit b5f0a09
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 71 deletions.
67 changes: 20 additions & 47 deletions src/Rebuy/Amqp/Consumer/ConsumerManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,29 @@ class ConsumerManager
{
const DEFAULT_IDLE_TIMEOUT = 900;

/**
* @var ConsumerContainer[]
*/
private $consumerContainers;
private array $consumerContainers;

/**
* @var Serializer
*/
private $serializer;
private Serializer $serializer;

/**
* @var EventDispatcherInterface
*/
private $eventDispatcher;
private EventDispatcherInterface $eventDispatcher;

/**
* @var AMQPChannel
*/
private $channel;
private AMQPChannel $channel;

/**
* @var string
*/
private $exchangeName;
private string $exchangeName;

/**
* @var Collection
*/
private $errorHandlers;
private ArrayCollection $errorHandlers;

/**
* @var int
*/
private $idleTimeout = self::DEFAULT_IDLE_TIMEOUT;
private int $idleTimeout = self::DEFAULT_IDLE_TIMEOUT;

/**
* @var Parser
*/
private $parser;
private Parser $parser;

/**
* @param AMQPChannel $channel
* @param string $exchangeName
* @param Serializer $serializer
* @param Parser $parser
*/
public function __construct(AMQPChannel $channel, $exchangeName, Serializer $serializer, Parser $parser)
public function __construct(AMQPChannel $channel, string $exchangeName, Serializer $serializer, Parser $parser)
{
$this->serializer = $serializer;
$this->eventDispatcher = new EventDispatcher();
Expand All @@ -79,7 +55,7 @@ public function __construct(AMQPChannel $channel, $exchangeName, Serializer $ser
$this->parser = $parser;
}

public function wait()
public function wait(): void
{
while (count($this->channel->callbacks)) {
$this->channel->wait(null, false, $this->idleTimeout);
Expand All @@ -91,7 +67,7 @@ public function wait()
*
* @throws ConsumerException
*/
public function registerConsumer($consumer)
public function registerConsumer(object $consumer): void
{
$type = gettype($consumer);
if ($type !== 'object') {
Expand All @@ -107,7 +83,7 @@ public function registerConsumer($consumer)
/**
* @param EventDispatcherInterface $eventDispatcher
*/
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher): void
{
$this->eventDispatcher = $eventDispatcher;
}
Expand All @@ -117,7 +93,7 @@ public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
*
* @throws ConsumerException
*/
private function registerConsumerContainer(ConsumerContainer $consumerContainer)
private function registerConsumerContainer(ConsumerContainer $consumerContainer): void
{
$consumerName = $consumerContainer->getConsumerName();
if (isset($this->consumerContainers[$consumerName])) {
Expand All @@ -131,7 +107,7 @@ private function registerConsumerContainer(ConsumerContainer $consumerContainer)
);
}

$this->channel->queue_declare($consumerName, false, true, false, false);
$this->channel->queue_declare($consumerName, false, true, false, false, arguments: ['x-queue-type' => 'quorum']);
foreach ($consumerContainer->getBindings() as $binding) {
$this->channel->queue_bind($consumerName, $this->exchangeName, $binding);
}
Expand All @@ -148,23 +124,20 @@ private function registerConsumerContainer(ConsumerContainer $consumerContainer)
/**
* @param ErrorHandlerInterface $errorHandler
*/
public function registerErrorHandler(ErrorHandlerInterface $errorHandler)
public function registerErrorHandler(ErrorHandlerInterface $errorHandler): void
{
$this->errorHandlers->add($errorHandler);
}

/**
* @param int $idleTimeout
*/
public function setIdleTimeout($idleTimeout)
public function setIdleTimeout(int $idleTimeout): void
{
$this->idleTimeout = $idleTimeout;
}

/**
* @return int
*/
public function getIdleTimeout()
public function getIdleTimeout(): int
{
return $this->idleTimeout;
}
Expand All @@ -175,7 +148,7 @@ public function getIdleTimeout()
*
* @return mixed|null
*/
private function consume(ConsumerContainer $container, AMQPMessage $message)
private function consume(ConsumerContainer $container, AMQPMessage $message): mixed
{
$event = new ConsumerEvent($message, $container);
$this->dispatchEvent($event, ConsumerEvents::PRE_CONSUME);
Expand All @@ -194,9 +167,9 @@ private function consume(ConsumerContainer $container, AMQPMessage $message)
* @throws ConsumerContainerException
* @return mixed|null
*/
private function invoke(ConsumerContainer $consumerContainer, AMQPMessage $message)
private function invoke(ConsumerContainer $consumerContainer, AMQPMessage $message): mixed
{
$payload = $this->serializer->deserialize($message->body, $consumerContainer->getMessageClass(), 'json');
$payload = $this->serializer->deserialize($message->getBody(), $consumerContainer->getMessageClass(), 'json');

try {
$result = $consumerContainer->invoke($payload);
Expand Down
26 changes: 2 additions & 24 deletions tests/Rebuy/Tests/Amqp/Consumer/ConsumerManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,6 @@ protected function setUp(): void
$this->manager->setEventDispatcher($this->eventDispatcher->reveal());
}

/**
* @test
*/
public function register_consumer_with_string_parameter_should_throw_exception()
{
$this->expectException(ConsumerException::class);
$this->expectExceptionMessage('Expected argument of type "object", "string" given');

$this->manager->registerConsumer('string');
}

/**
* @test
*/
public function register_consumer_with_int_parameter_should_throw_exception()
{
$this->expectException(ConsumerException::class);
$this->expectExceptionMessage('Expected argument of type "object", "integer" given');

$this->manager->registerConsumer(12);
}

/**
* @test
*/
Expand All @@ -99,7 +77,7 @@ public function register_consumer_should_declare_queue()

$this->manager->registerConsumer($consumer);

$this->channel->queue_declare('myName', false, true, false, false)->shouldHaveBeenCalled();
$this->channel->queue_declare('myName', false, true, false, false, false, ['x-queue-type' => 'quorum'])->shouldHaveBeenCalled();
}

/**
Expand Down Expand Up @@ -145,7 +123,7 @@ public function register_consumer_should_bind_queues()

$this->channel->basic_qos(null, 1, false)->shouldBeCalled();
$this->channel->basic_consume($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled();
$this->channel->queue_declare($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled();
$this->channel->queue_declare($consumerName, Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any(), Argument::any())->shouldBeCalled();
$this->channel->queue_bind($consumerName, self::EXCHANGE_NAME, $binding1)->shouldBeCalled();
$this->channel->queue_bind($consumerName, self::EXCHANGE_NAME, $binding2)->shouldBeCalled();

Expand Down

0 comments on commit b5f0a09

Please sign in to comment.