-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* create AMQP event queue * try run functional test of AMQP * sudo required for travis * not run tests on HHVM * add rabbitmq-server to addons * not run tests on HHVM * change $channel_rpc_timeout * try change $channel_rpc_timeout and $read_write_timeout * verbose phpunit * use dist trusty * remove rabbitmq-server addons * change $read_write_timeout and $heartbeat * try publish before subscribe * disable functional test * disable verbose phpunit * fix memory limit error * try test on HHVM * HHVM not longer supported
- Loading branch information
1 parent
764057f
commit bbb1332
Showing
11 changed files
with
607 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
AMQP queue | ||
========== | ||
|
||
Queues are designed to distribute the load and delay publishing of events or transfer their publishing to separate | ||
processes. | ||
|
||
The queue stores events in [RabbitMQ](https://www.rabbitmq.com/), using the [php-amqplib](https://github.com/php-amqplib/php-amqplib) | ||
library to access it. | ||
|
||
This queue uses a [serializer](https://symfony.com/doc/current/components/serializer.html) to convert event objects | ||
to strings and back while waiting for the transport of objects across the AMQP. The serializer uses the `predis` | ||
format as a default. You can change format if you need. You can make messages more optimal for a RabbitMQ than JSON. | ||
|
||
If the message could not be deserialized, then a critical message is written to the log so that the administrator can | ||
react quickly to the problem and the message is placed again at the end of the queue, so as not to lose it. | ||
|
||
You can use any implementations of [callable type](http://php.net/manual/en/language.types.callable.php) as a queue | ||
subscriber. | ||
|
||
Configure queue: | ||
|
||
```php | ||
use GpsLab\Domain\Event\Queue\Subscribe\AMQPSubscribeEventQueue; | ||
use GpsLab\Domain\Event\Queue\Serializer\SymfonySerializer; | ||
use PhpAmqpLib\Connection\AMQPStreamConnection; | ||
use Symfony\Component\Serializer\Serializer; | ||
|
||
//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // AMQP connection | ||
//$channel = $connection->channel(); | ||
//$symfony_serializer = new Serializer(); // Symfony serializer | ||
//$logger = new Logger(); // PSR-3 logger | ||
$queue_name = 'article_queue'; | ||
$format = 'json'; // default: predis | ||
// you can create another implementation of serializer | ||
$serializer = new SymfonySerializer($symfony_serializer, $format); | ||
$queue = new AMQPSubscribeEventQueue($channel, $serializer, $logger, $queue_name); | ||
``` | ||
|
||
Subscribe to the queue: | ||
|
||
```php | ||
use GpsLab\Domain\Event\Bus\ListenerLocatedEventBus; | ||
use GpsLab\Domain\Event\Listener\Locator\DirectBindingEventListenerLocator; | ||
|
||
$locator = new DirectBindingEventListenerLocator(); | ||
$bus = new ListenerLocatedEventBus($locator); | ||
|
||
$handler = function(ArticleRenamedEvent $event) use ($bus) { | ||
$bus->publish($event); | ||
}; | ||
|
||
$queue->subscribe($handler); | ||
``` | ||
|
||
You can unsubscribe of the queue: | ||
|
||
```php | ||
$queue->unsubscribe($handler); | ||
``` | ||
|
||
Make event and publish it into queue: | ||
|
||
```php | ||
$event = new ArticleRenamedEvent(); | ||
$event->new_name = $new_name; | ||
|
||
$queue->publish($event); | ||
``` | ||
|
||
You can use [QueueEventBus](../bus.md) for publish events in queue. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
<?php | ||
|
||
/** | ||
* GpsLab component. | ||
* | ||
* @author Peter Gribanov <[email protected]> | ||
* @copyright Copyright (c) 2011, Peter Gribanov | ||
* @license http://opensource.org/licenses/MIT | ||
*/ | ||
|
||
namespace GpsLab\Domain\Event\Queue\Subscribe; | ||
|
||
use GpsLab\Domain\Event\Event; | ||
use GpsLab\Domain\Event\Queue\Serializer\Serializer; | ||
use PhpAmqpLib\Channel\AMQPChannel; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
use Psr\Log\LoggerInterface; | ||
|
||
class AMQPSubscribeEventQueue implements SubscribeEventQueue | ||
{ | ||
/** | ||
* @var AMQPChannel | ||
*/ | ||
private $channel; | ||
|
||
/** | ||
* @var Serializer | ||
*/ | ||
private $serializer; | ||
|
||
/** | ||
* @var LoggerInterface | ||
*/ | ||
private $logger; | ||
|
||
/** | ||
* @var callable[] | ||
*/ | ||
private $handlers = []; | ||
|
||
/** | ||
* @var string | ||
*/ | ||
private $queue_name = ''; | ||
|
||
/** | ||
* @var bool | ||
*/ | ||
private $subscribed = false; | ||
|
||
/** | ||
* @var bool | ||
*/ | ||
private $declared = false; | ||
|
||
/** | ||
* @param AMQPChannel $channel | ||
* @param Serializer $serializer | ||
* @param LoggerInterface $logger | ||
* @param string $queue_name | ||
*/ | ||
public function __construct(AMQPChannel $channel, Serializer $serializer, LoggerInterface $logger, $queue_name) | ||
{ | ||
$this->channel = $channel; | ||
$this->serializer = $serializer; | ||
$this->logger = $logger; | ||
$this->queue_name = $queue_name; | ||
} | ||
|
||
/** | ||
* Publish event to queue. | ||
* | ||
* @param Event $event | ||
* | ||
* @return bool | ||
*/ | ||
public function publish(Event $event) | ||
{ | ||
$message = $this->serializer->serialize($event); | ||
$this->declareQueue(); | ||
$this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name); | ||
|
||
return true; | ||
} | ||
|
||
/** | ||
* Subscribe on event queue. | ||
* | ||
* @throws \ErrorException | ||
* | ||
* @param callable $handler | ||
*/ | ||
public function subscribe(callable $handler) | ||
{ | ||
$this->handlers[] = $handler; | ||
|
||
// laze subscribe | ||
if (!$this->subscribed) { | ||
$this->declareQueue(); | ||
$this->channel->basic_consume( | ||
$this->queue_name, | ||
'', | ||
false, | ||
true, | ||
false, | ||
false, | ||
function (AMQPMessage $message) { | ||
$this->handle($message->body); | ||
} | ||
); | ||
|
||
$this->subscribed = true; | ||
} | ||
|
||
while ($this->channel->is_consuming()) { | ||
$this->channel->wait(); | ||
} | ||
} | ||
|
||
/** | ||
* Unsubscribe on event queue. | ||
* | ||
* @param callable $handler | ||
* | ||
* @return bool | ||
*/ | ||
public function unsubscribe(callable $handler) | ||
{ | ||
$index = array_search($handler, $this->handlers); | ||
|
||
if ($index === false) { | ||
return false; | ||
} | ||
|
||
unset($this->handlers[$index]); | ||
|
||
return true; | ||
} | ||
|
||
private function declareQueue() | ||
{ | ||
// laze declare queue | ||
if (!$this->declared) { | ||
$this->channel->queue_declare($this->queue_name, false, false, false, false); | ||
$this->declared = true; | ||
} | ||
} | ||
|
||
/** | ||
* @param string $message | ||
*/ | ||
private function handle($message) | ||
{ | ||
try { | ||
$event = $this->serializer->deserialize($message); | ||
} catch (\Exception $e) { // catch only deserialize exception | ||
// it's a critical error | ||
// it is necessary to react quickly to it | ||
$this->logger->critical('Failed denormalize a event in the AMQP queue', [$message, $e->getMessage()]); | ||
|
||
// try denormalize in later | ||
$this->declareQueue(); | ||
$this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name); | ||
|
||
return; // no event for handle | ||
} | ||
|
||
foreach ($this->handlers as $handler) { | ||
call_user_func($handler, $event); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.