Skip to content

Commit

Permalink
Add the --timeout option (#86)
Browse files Browse the repository at this point in the history
* Add the --timeout option

Co-authored-by: Eugene Kirdzei <[email protected]>
  • Loading branch information
masterjus and eugene-nuwber authored Mar 7, 2022
1 parent 2bd4664 commit dc86c99
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 140 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The logging configuration part was moved from a connection to the first level of

### `\Illuminate\Queue` is not the requirement anymore

If you've been using this package on your fork or extension you should add this package in your `composer.json` as a requirement.
To avoid confusion from `Illuminate\Queue` component, the dependency from this package was removed. If you've been using this package on your fork or extension you should add this package in your `composer.json` as a requirement.
In terms of this avoidance, the `Job` class was renamed to `Handler`. If you are listening to Events from the previous version please replace them to new ones. Now the list of events is: `ListenerHandling`, `ListenerHandled`, `ListenerHandleFailed`, `ListenerHandlerExceptionOccurred` and `MessageProcessingFailed`.

## RabbitEvents Publisher<a name="publisher"></a>

Expand All @@ -130,6 +131,16 @@ You could start listening to an event only by using `rabbitevents:listen` comman

If your listener crashes, then managers will rerun your listener and all messages sent to a queue will be handled in the same order as they were sent. There is the known problem: as queues are separated and you have messages that affect the same entity there's no guarantee that all actions will be done in an expected order. To avoid such problems you can send message time as a part of the payload and handle it internally in your listeners.


#### Options<a name="listen-options"></a>
- **--service=**. When a queue starts the name of the service becomes a part of a queue name: `service:event.name`. By default, service is the APP_NAME from your `.env`. You could override the first part of a queue name by this option.
- **--connection=**. The name of connection specified in the `config/rabbitevents.php` config file. Default: `rabbitmq`.
- **--memory=128**. The memory limit in megabytes. The RabbitEvents have restarting a worker if limit exceeded.
- **--timeout=60**. The number of seconds a massage could be handled.
- **--tries=1**. Number of times to attempt to handle a Message before logging it failed.
- **--sleep=5**. Sleep time in seconds before handling failed message next time.
- **--quiet**. No console output

### Command `rabbitevents:list` <a name='command-list'></a>

To get the list of all registered events please use the command `rabbitevents:list`.
Expand Down
25 changes: 19 additions & 6 deletions src/RabbitEvents/Foundation/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Illuminate\Support\Carbon;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use RabbitEvents\Foundation\Exceptions\ConnectionLostException;

/**
* @mixin AmqpConsumer
Expand All @@ -23,20 +26,30 @@ public function __call(string $method, ?array $args)

/**
* Receives a Message from the queue and returns Message object
*
* @param int $timeout
* @return ?Message
*/
public function nextMessage(int $timeout = 0): ?Message
{
if (!$amqpMessage = $this->amqpConsumer->receive($timeout)) {
if (!$amqpMessage = $this->receiveMessage($timeout)) {
return null;
}

$amqpMessage->setTimestamp(Carbon::now()->timestamp);
// Set timestamp only if this message was not released before
if (!$amqpMessage->getTimestamp()) {
$amqpMessage->setTimestamp(Carbon::now()->getTimestamp());
}

return Message::createFromAmqpMessage($amqpMessage, $this->context->getTransport())
->setAmqpMessage($amqpMessage);
->setAmqpMessage($amqpMessage)
->increaseAttempts();
}

protected function receiveMessage(int $timeout = 0): ?AmqpMessage
{
try {
return $this->amqpConsumer->receive($timeout);
} catch (AMQPRuntimeException $exception) {
throw new ConnectionLostException($exception);
}
}

public function acknowledge(Message $message): void
Expand Down
13 changes: 13 additions & 0 deletions src/RabbitEvents/Foundation/Exceptions/ConnectionLostException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace RabbitEvents\Foundation\Exceptions;

use Throwable;

class ConnectionLostException extends \RuntimeException
{
public function __construct(?Throwable $previous = null)
{
parent::__construct('Connection lost', 0, $previous);
}
}
22 changes: 13 additions & 9 deletions src/RabbitEvents/Listener/Commands/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
use Illuminate\Support\Arr;
use RabbitEvents\Foundation\Context;
use RabbitEvents\Foundation\Support\QueueName;
use RabbitEvents\Listener\Events\HandlerExceptionOccurred;
use RabbitEvents\Listener\Events\ListenerHandlerExceptionOccurred;
use RabbitEvents\Listener\Events\ListenerHandleFailed;
use RabbitEvents\Listener\Events\ListenerHandled;
use RabbitEvents\Listener\Events\ListenerHandling;
use RabbitEvents\Listener\Events\MessageProcessingFailed;
use RabbitEvents\Listener\Events\MessageProcessed;
use RabbitEvents\Listener\Events\MessageProcessing;
use RabbitEvents\Listener\Message\HandlerFactory;
use RabbitEvents\Listener\Message\Processor;
use RabbitEvents\Listener\Message\ProcessingOptions;
Expand Down Expand Up @@ -51,16 +52,17 @@ class ListenCommand extends Command
* @param Context $context
* @param Worker $worker
* @throws \Throwable
* @retur ?int
*/
public function handle(Context $context, Worker $worker): void
public function handle(Context $context, Worker $worker)
{
$options = $this->gatherProcessingOptions();

$this->registerLogWriters($options->connectionName);

$this->listenForEvents();

$worker->work(
return $worker->work(
new Processor(new HandlerFactory($this->laravel), $this->laravel['events']),
$context->createConsumer(
new QueueName($options->service, $this->argument('event')),
Expand All @@ -73,7 +75,7 @@ public function handle(Context $context, Worker $worker): void
/**
* Gather all the queue worker options as a single object.
*
* @return \RabbitEvents\Listener\Message\ProcessingOptions
* @return ProcessingOptions
*/
protected function gatherProcessingOptions(): ProcessingOptions
{
Expand All @@ -82,6 +84,7 @@ protected function gatherProcessingOptions(): ProcessingOptions
$this->option('connection') ?: $this->laravel['config']['rabbitevents.default'],
(int) $this->option('memory'),
(int) $this->option('tries'),
(int) $this->option('timeout'),
(int) $this->option('sleep')
);
}
Expand All @@ -99,10 +102,11 @@ protected function listenForEvents(): void
}
};

$this->laravel['events']->listen(MessageProcessing::class, $callback);
$this->laravel['events']->listen(MessageProcessed::class, $callback);
$this->laravel['events']->listen(ListenerHandling::class, $callback);
$this->laravel['events']->listen(ListenerHandled::class, $callback);
$this->laravel['events']->listen(ListenerHandleFailed::class, $callback);
$this->laravel['events']->listen(ListenerHandlerExceptionOccurred::class, $callback);
$this->laravel['events']->listen(MessageProcessingFailed::class, $callback);
$this->laravel['events']->listen(HandlerExceptionOccurred::class, $callback);
}

/**
Expand Down
18 changes: 10 additions & 8 deletions src/RabbitEvents/Listener/Commands/Log/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

namespace RabbitEvents\Listener\Commands\Log;

use RabbitEvents\Listener\Events\HandlerExceptionOccurred;
use RabbitEvents\Listener\Events\MessageProcessed;
use RabbitEvents\Listener\Events\MessageProcessing;
use RabbitEvents\Listener\Events\ListenerHandlerExceptionOccurred;
use RabbitEvents\Listener\Events\ListenerHandled;
use RabbitEvents\Listener\Events\ListenerHandling;
use RabbitEvents\Listener\Events\ListenerHandleFailed;
use RabbitEvents\Listener\Events\MessageProcessingFailed;

abstract class Writer
Expand All @@ -15,20 +16,21 @@ abstract class Writer
public const STATUS_FAILED = 'Failed';

/**
* @param HandlerExceptionOccurred | MessageProcessing | MessageProcessed | MessageProcessingFailed $event
* @param ListenerHandlerExceptionOccurred | ListenerHandling | ListenerHandled | ListenerHandleFailed $event
*/
abstract public function log($event): void;

/**
* @param HandlerExceptionOccurred | MessageProcessing | MessageProcessed | MessageProcessingFailed $event
* @param ListenerHandlerExceptionOccurred | ListenerHandling | ListenerHandled | ListenerHandleFailed $event
* @return string
*/
protected function getStatus($event): string
{
return match (get_class($event)) {
MessageProcessing::class => self::STATUS_PROCESSING,
MessageProcessed::class => self::STATUS_PROCESSED,
HandlerExceptionOccurred::class => self::STATUS_EXCEPTION,
ListenerHandling::class => self::STATUS_PROCESSING,
ListenerHandled::class => self::STATUS_PROCESSED,
ListenerHandlerExceptionOccurred::class => self::STATUS_EXCEPTION,
ListenerHandleFailed::class => self::STATUS_FAILED,
MessageProcessingFailed::class => self::STATUS_FAILED,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use RabbitEvents\Listener\Message\Handler;

class HandlerExceptionOccurred
class ListenerHandleFailed
{
public function __construct(public Handler $handler, public \Throwable $exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use RabbitEvents\Listener\Message\Handler;

class MessageProcessed
class ListenerHandled
{
public function __construct(public Handler $handler)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace RabbitEvents\Listener\Events;

use RabbitEvents\Listener\Message\Handler;

class ListenerHandlerExceptionOccurred
{
public function __construct(public Handler $handler, public \Throwable $exception)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use RabbitEvents\Listener\Message\Handler;

class MessageProcessing
class ListenerHandling
{
public function __construct(public Handler $handler)
{
Expand Down
6 changes: 2 additions & 4 deletions src/RabbitEvents/Listener/Events/MessageProcessingFailed.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
<?php

declare(strict_types=1);

namespace RabbitEvents\Listener\Events;

use RabbitEvents\Listener\Message\Handler;
use RabbitEvents\Foundation\Message;

class MessageProcessingFailed
{
public function __construct(public Handler $handler, public \Throwable $exception)
public function __construct(public Message $message, public \Throwable $exception)
{
}
}
1 change: 1 addition & 0 deletions src/RabbitEvents/Listener/Message/ProcessingOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public function __construct(
public string $connectionName,
public int $memory = 128,
public int $maxTries = 0,
public int $timeout = 60,
public int $sleep = 5
) {
}
Expand Down
Loading

0 comments on commit dc86c99

Please sign in to comment.