Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable "tracing" of callbacks, without any requirements to callback itself, or rewriting commands etc. #45

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected function configure()
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Allowed time in seconds for this process', null)
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals');
}
Expand Down Expand Up @@ -88,6 +89,10 @@ protected function initialize(InputInterface $input, OutputInterface $output)
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
}

if (!is_null($input->getOption('time-limit')) && ctype_digit((string) $input->getOption('time-limit')) && $input->getOption('time-limit') > 0) {
$this->consumer->setTimeLimit($input->getOption('time-limit'));
}

if ($routingKey = $input->getOption('route')) {
$this->consumer->setRoutingKey($routingKey);
}
Expand Down
57 changes: 55 additions & 2 deletions src/Kdyby/RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ class Consumer extends BaseConsumer
*/
protected $memoryLimit;

/**
* @var int $timeLimit
*/
protected $timeLimit;

/**
* @var int Start timestamp
*/
private $startTimestamp;

/**
* Set the memory limit
Expand All @@ -88,10 +96,42 @@ public function getMemoryLimit()
}


/**
* Set the time limit
*
* @param int $timeLimit
*/
public function setTimeLimit($timeLimit)
{
$this->timeLimit = $timeLimit;
}



/**
* Get the time limit
*
* @return int
*/
public function getTimeLimit()
{
return $this->timeLimit;
}



/**
* Registers listener to onStart event.
* @param IConsumerStartListener $listener
*/
public function addConsumerStartListener(IConsumerStartListener $listener) {
$this -> onStart []= [$listener, 'onStartListener'];
}

public function consume($msgAmount)
{
$this->target = $msgAmount;
$this->startTimestamp = time();
$this->setupConsumer();
$this->onStart($this);

Expand Down Expand Up @@ -153,7 +193,7 @@ public function purge()

public function processMessage(AMQPMessage $msg)
{
$this->onConsume($this, $msg);
$this->onConsume($this, $msg, $this->queueOptions['name'], $this->callback);
try {
$processFlag = call_user_func($this->callback, $msg);
$this->handleProcessMessage($msg, $processFlag);
Expand Down Expand Up @@ -198,7 +238,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
$this->consumed++;
$this->maybeStopConsumer();

if ($this->isRamAlmostOverloaded()) {
if ($this->isRamAlmostOverloaded() || $this->isTimeLimitExceeded()) {
$this->stopConsuming();
}
}
Expand All @@ -219,4 +259,17 @@ protected function isRamAlmostOverloaded()
return memory_get_usage(true) >= ($this->getMemoryLimit() * 1024 * 1024);
}


/**
* Checks if consumer running time is greater or equal for time allowed for this process
*
* @return boolean
*/
protected function isTimeLimitExceeded() {
if ($this->getTimeLimit() === NULL) {
return FALSE;
}

return (time() - $this->startTimestamp) >= ($this->getTimeLimit());
}
}
8 changes: 8 additions & 0 deletions src/Kdyby/RabbitMq/DI/RabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ public function loadConfiguration()

public function beforeCompile()
{
$listeners = array_keys($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\IConsumerStartListener'));
if ($listeners) { // setup onstart listener for every consumer if listeners are defined
foreach ($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\Consumer') as $serviceDefinition) {
foreach ($listeners as $listener) {
$serviceDefinition->addSetup('addConsumerStartListener', [$listener]);
}
}
}
unset($this->getContainerBuilder()->parameters[$this->name]);
}

Expand Down
21 changes: 21 additions & 0 deletions src/Kdyby/RabbitMq/IConsumerStartListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace Kdyby\RabbitMq;


/**
* DI registered services implementing this interface will listen to every consumer onStart event.
* @package Kdyby\RabbitMq
* @author Jakub Adamus <[email protected]>
*/
interface IConsumerStartListener
{

/**
*
* @param Consumer $consumer Consumer currently starting
* @return NULL nothing to return
*/
public function onStartListener(Consumer $consumer);

}
12 changes: 11 additions & 1 deletion src/Kdyby/RabbitMq/MultipleConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,23 @@ protected function queueDeclare()



public function stopConsuming()
{
foreach ($this->queues as $name => $options) {
$this->getChannel()->basic_cancel($this->getQueueConsumerTag($name));
}
$this->onStop($this);
}



public function processQueueMessage($queueName, AMQPMessage $msg)
{
if (!isset($this->queues[$queueName])) {
throw new QueueNotFoundException();
}

$this->onConsume($this, $msg);
$this->onConsume($this, $msg, $queueName, $this->queues[$queueName]['callback']);
try {
$processFlag = call_user_func($this->queues[$queueName]['callback'], $msg);
$this->handleProcessMessage($msg, $processFlag);
Expand Down