Skip to content

Commit

Permalink
Add support for pcntl signals (#2)
Browse files Browse the repository at this point in the history
* Add support for pcntl signals

* Remove pcntl_signal_dispatch

* Chaange signals
  • Loading branch information
masterjus authored Oct 25, 2017
1 parent a902d79 commit 1c2d5e7
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 22 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "nuwber/rabbitevents",
"description": "Nuwber broadcasting events",
"description": "Laravel back-to-back broadcasting events. It uses RabbitMQ as the transport.",
"keywords": ["queue", "laravel", "rabbitmq", "events"],
"type": "library",
"authors": [
{
Expand Down
4 changes: 3 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Broadcasting events by using RabbitMQ
# Events broadcasting for Laravel by using RabbitMQ

[![Build Status](https://travis-ci.org/nuwber/rabbitevents.svg?branch=master)](https://travis-ci.org/nuwber/rabbitevents)

Nuwber's broadcasting events provides a simple observer implementation, allowing you to listen for various events that occur in your current and another applications. For example if you need to react to some event fired from another microservice.

Do not confuse this package with Laravel's broadcast. This package was made to communicate in backend-backend way.

Generally, this is compilation of LAravel's [events](https://laravel.com/docs/events) and [queues](https://laravel.com/docs/queues).

Listener classes are typically stored in the `app/Listeners` folder. You may use Laravel's artisan command to generate them as it described in the [official documentation](https://laravel.com/docs/events).
Expand Down
134 changes: 115 additions & 19 deletions src/Console/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Carbon\Carbon;
use Illuminate\Console\Command;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
Expand All @@ -13,6 +14,7 @@
use Nuwber\Events\Job;
use Nuwber\Events\MessageProcessor;
use Nuwber\Events\ProcessingOptions;
use PhpAmqpLib\Exception\AMQPRuntimeException;

class ListenCommand extends Command
{
Expand All @@ -22,6 +24,7 @@ class ListenCommand extends Command
* @var string
*/
protected $signature = 'events:listen
{--memory=128 : The memory limit in megabytes}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';

Expand All @@ -32,6 +35,13 @@ class ListenCommand extends Command
*/
protected $description = 'Listen for system events thrown from other services';

/**
* Indicates if the listener should exit.
*
* @var bool
*/
private $shouldQuit;

/**
* Execute the console command.
*
Expand All @@ -42,23 +52,60 @@ public function handle()
{
$this->listenForEvents();

$this->listenForSignals();

$consumer = $this->makeConsumer();

$options = $this->gatherProcessingOptions();

$processor = $this->makeMessageProcessor($options);
$processor = $this->createProcessor($options);

while (true) {
if ($payload = $consumer->receive($options->timeout)) {
try {
$processor->process($consumer, $payload);
} catch (\Exception $e) {
// Do nothing because we've already fired all necessary events
}
if ($payload = $this->getNextJob($consumer, $options)) {
$processor->process($consumer, $payload);
}
$this->stopIfNecessary($options);
}
}

/**
* Receive next message from queuer
*
* @param PsrConsumer $consumer
* @param $options
* @return \Interop\Queue\PsrMessage|null
*/
protected function getNextJob(PsrConsumer $consumer, $options)
{
try {
return $consumer->receive($options->timeout);
} catch (\Exception $e) {
$this->laravel->make(ExceptionHandler::class)->report($e);

$this->stopListeningIfLostConnection($e);
} catch (\Throwable $e) {
$this->laravel->make(ExceptionHandler::class)->report($e);

$this->stopListeningIfLostConnection($e);
}
}

/**
* @param ProcessingOptions $options
* @return MessageProcessor
*/
protected function createProcessor(ProcessingOptions $options)
{
return new MessageProcessor(
$this->laravel,
$this->laravel->make(PsrContext::class),
$this->laravel->make('events'),
$this->laravel->make('broadcast.events'),
$options,
$this->laravel->make('queue')->getConnectionName()
);
}

/**
* Listen for the queue events in order to update the console output.
*
Expand Down Expand Up @@ -128,7 +175,7 @@ protected function writeStatus(Job $listener, $status, $type)
*/
protected function logFailedJob(JobFailed $event)
{
logger($event->job->getRawBody(), $event->exception->getTrace());
$this->laravel['log']->debug($event->job->getRawBody(), $event->exception->getTrace());
}

/**
Expand All @@ -138,8 +185,7 @@ private function makeConsumer()
{
return $this->laravel->make(ConsumerFactory::class)
->make(
$this->laravel->make('broadcast.events')
->getEvents()
$this->laravel->make('broadcast.events')->getEvents()
);
}

Expand All @@ -151,20 +197,70 @@ private function makeConsumer()
protected function gatherProcessingOptions()
{
return new ProcessingOptions(
$this->option('memory'),
$this->option('timeout'),
$this->option('tries')
);
}

private function makeMessageProcessor(ProcessingOptions $options)
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
return new MessageProcessor(
$this->laravel,
$this->laravel->make(PsrContext::class),
$this->laravel->make('events'),
$this->laravel->make('broadcast.events'),
$options,
$this->laravel->make('queue')->getConnectionName()
);
pcntl_async_signals(true);

foreach ([SIGINT, SIGTERM, SIGALRM] as $signal) {
pcntl_signal($signal, function () {
$this->shouldQuit = true;
});
}
}

/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit;
}

/**
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
*/
public function stop($status = 0)
{
exit($status);
}

/**
* Stop the process if necessary.
*
* @param ProcessingOptions $options
*/
protected function stopIfNecessary(ProcessingOptions $options)
{
if ($this->shouldQuit) {
$this->stop();
}

if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
}
}

protected function stopListeningIfLostConnection($exception)
{
if ($exception instanceof AMQPRuntimeException) {
$this->shouldQuit = true;
}
}
}
7 changes: 6 additions & 1 deletion src/ProcessingOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ class ProcessingOptions
* @var int
*/
public $maxTries;
/**
* @var int
*/
public $memory;

public function __construct($timeout = 60, $maxTries = 0)
public function __construct($memory = 128, $timeout = 60, $maxTries = 0)
{
$this->timeout = $timeout;
$this->memory = $memory;
$this->maxTries = (int)$maxTries;
}
}

0 comments on commit 1c2d5e7

Please sign in to comment.