Nette extension for RabbitMQ (using composer package jakubkulhan/bunny)
composer require gamee/nette-rabbitmq
config.neon:
extensions:
rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension
services:
- TestConsumer
rabbitmq:
connections:
default:
user: guest
password: guest
host: localhost
port: 5672
queues:
testQueue:
connection: default
# force queue declare on first queue operation during request
# autoCreate: true
exchanges:
testExchange:
connection: default
type: fanout
queueBindings:
testQueue:
routingKey: testRoutingKey
# force exchange declare on first exchange operation during request
# autoCreate: true
producers:
testProducer:
exchange: testExchange
# queue: testQueue
contentType: application/json
deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT
consumers:
testConsumer:
queue: testQueue
callback: [@TestConsumer, consume]
qos:
prefetchSize: 0
prefetchCount: 5
Since v3.0, all queues and exchanges are by default declared on demand using the console command:
php index.php rabbitmq:declareQueuesAndExchanges
It's intended to be a part of the deploy process to make sure all the queues and exchanges are prepared for use.
If you need to override this behavior (for example only declare queues that are used during a request and nothing else),
just add the autoCreate: true
parameter to queue or exchange of your choice.
You may also want to declare the queues and exchanges via rabbitmq management interface or a script but if you fail to
do so, don't run the declare console command and don't specify autoCreate: true
, exceptions will be thrown
when accessing undeclared queues/exchanges.
services.neon:
services:
- TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))
TestQueue.php:
<?php
declare(strict_types=1);
use Gamee\RabbitMQ\Producer\Producer;
final class TestQueue
{
/**
* @var Producer
*/
private $testProducer;
public function __construct(Producer $testProducer)
{
$this->testProducer = $testProducer;
}
public function publish(string $message): void
{
$json = json_encode(['message' => $message]);
$headers = [];
$this->testProducer->publish($json, $headers);
}
}
Your consumer callback has to return a confirmation that particular message has been acknowledges (or different states - unack, reject).
TestConsumer.php
<?php
declare(strict_types=1);
use Bunny\Message;
use Gamee\RabbitMQ\Consumer\IConsumer;
final class TestConsumer implements IConsumer
{
public function consume(Message $message): int
{
$messageData = json_decode($message->content);
$headers = $message->headers;
/**
* @todo Some logic here...
*/
return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
}
}
There are two consumer commands prepared. rabbitmq:consumer
wiil consume messages for specified amount of time (in seconds). Following command wiil be consuming messages for one hour:
php index.php rabbitmq:consumer testConsumer 3600
rabbitmq:staticConsumer
will consume particular amount of messages. Following example will consume just 20 messages:
php index.php rabbitmq:staticConsumer testConsumer 20