From 64ba449032314b3b22693f73eab1b982d4e522e2 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Sun, 3 Jul 2016 16:57:39 +0200 Subject: [PATCH 01/16] Allow to set default routing key for producer. Producers currently had method "setRoutingKey" (from AMQPMember), but this was not used at all. This commit allows to use this parameter as default routing key, instead of need to send it always to publish method. Also routingKey parameter for producer introduced in config section. In most use cases, there is application forced to publish with "magic constant" routingKey, which has to be same as routingKeys defined in consumers (for Multiple Consumers). By allowing to set default routingKey is easy to move from simple Consumer (one per queue) to MultipleConsumer. Also this commit brings more abstraction, since routingKey in simple cases could remain only in config.neon and there is no need for routingKey in app code. --- docs/en/index.md | 9 +++++ src/Kdyby/RabbitMq/DI/RabbitMqExtension.php | 2 ++ src/Kdyby/RabbitMq/Producer.php | 38 ++++++++++++--------- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/docs/en/index.md b/docs/en/index.md index 33e7700e..d09272b8 100644 --- a/docs/en/index.md +++ b/docs/en/index.md @@ -147,6 +147,15 @@ Besides the message itself, the `Kdyby\RabbitMq\Producer::publish()` method also The array of additional properties allows you to alter the properties with which an `PhpAmqpLib\Message\AMQPMessage` object gets constructed by default. This way, for example, you can change the application headers. +You are able to set default routing key in producer. You can provide it by `setRoutingKey` method or in configuration like bellow. Default routing key will be used for calls of `publish` method without second parrameter, or when second parameter is set to `NULL`. Be aware of setting second parameter to empty string, which is considered as "send without routing key". +```yaml + ... + producers: + uploadPicture: + routingKey: iphone.upload + ... +``` + You can use `setContentType` and `setDeliveryMode` methods in order to set the message content type and the message delivery mode respectively. Default values are `text/plain` for content type and `2` for delivery mode. diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 5ac570a4..842ce27b 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -65,6 +65,7 @@ class RabbitMqExtension extends Nette\DI\CompilerExtension 'queue' => [], 'contentType' => 'text/plain', 'deliveryMode' => 2, + 'routingKey' => '', 'autoSetupFabric' => NULL, // inherits from `rabbitmq: autoSetupFabric:` ]; @@ -284,6 +285,7 @@ protected function loadProducers($producers) ->setClass('Kdyby\RabbitMq\IProducer') ->addSetup('setContentType', [$config['contentType']]) ->addSetup('setDeliveryMode', [$config['deliveryMode']]) + ->addSetup('setRoutingKey', array($config['routingKey'])) ->addTag(self::TAG_PRODUCER); if (!empty($config['exchange'])) { diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index 8588f3a3..445c13a4 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -49,21 +49,25 @@ protected function getBasicProperties() } - - /** - * Publishes the message and merges additional properties with basic properties - * - * @param string $msgBody - * @param string $routingKey - * @param array $additionalProperties - */ - public function publish($msgBody, $routingKey = '', $additionalProperties = []) - { - if ($this->autoSetupFabric) { - $this->setupFabric(); - } - - $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties)); - $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); - } + /** + * Publishes the message and merges additional properties with basic properties + * + * @param string $msgBody + * @param string $routingKey IF not provided or set to null, used default routingKey from configuration of this producer + * @param array $additionalProperties + */ + public function publish($msgBody, $routingKey = '', $additionalProperties = array()) + { + if ($this->autoSetupFabric) { + $this->setupFabric(); + } + + if ($this -> routingKey && (func_num_args() <= 1 || $routingKey === NULL)) { + // routingKey parameter not provided or set to NULL, use default + $routingKey = $this -> routingKey ?: ''; + } + + $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties)); + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); + } } From 325480cc2466b2bf87466e8966a774ae66a7ab16 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 13:56:02 +0200 Subject: [PATCH 02/16] Update RabbitMqExtension.php --- src/Kdyby/RabbitMq/DI/RabbitMqExtension.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 842ce27b..ab231648 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -285,7 +285,7 @@ protected function loadProducers($producers) ->setClass('Kdyby\RabbitMq\IProducer') ->addSetup('setContentType', [$config['contentType']]) ->addSetup('setDeliveryMode', [$config['deliveryMode']]) - ->addSetup('setRoutingKey', array($config['routingKey'])) + ->addSetup('setRoutingKey', array($config['routingKey'])) ->addTag(self::TAG_PRODUCER); if (!empty($config['exchange'])) { From 5c68f072f69955eb5b67204a697ca6eab38ebd92 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 13:57:23 +0200 Subject: [PATCH 03/16] Update RabbitMqExtension.php --- src/Kdyby/RabbitMq/DI/RabbitMqExtension.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index ab231648..99798c3d 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -65,7 +65,7 @@ class RabbitMqExtension extends Nette\DI\CompilerExtension 'queue' => [], 'contentType' => 'text/plain', 'deliveryMode' => 2, - 'routingKey' => '', + 'routingKey' => '', 'autoSetupFabric' => NULL, // inherits from `rabbitmq: autoSetupFabric:` ]; @@ -285,7 +285,7 @@ protected function loadProducers($producers) ->setClass('Kdyby\RabbitMq\IProducer') ->addSetup('setContentType', [$config['contentType']]) ->addSetup('setDeliveryMode', [$config['deliveryMode']]) - ->addSetup('setRoutingKey', array($config['routingKey'])) + ->addSetup('setRoutingKey', [$config['routingKey']]) ->addTag(self::TAG_PRODUCER); if (!empty($config['exchange'])) { From 5634b1d86ab76d7f6684b25bcf9c0e5e2527cff1 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 13:57:53 +0200 Subject: [PATCH 04/16] Update Producer.php --- src/Kdyby/RabbitMq/Producer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index 445c13a4..cb5cc71f 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -56,7 +56,7 @@ protected function getBasicProperties() * @param string $routingKey IF not provided or set to null, used default routingKey from configuration of this producer * @param array $additionalProperties */ - public function publish($msgBody, $routingKey = '', $additionalProperties = array()) + public function publish($msgBody, $routingKey = '', $additionalProperties = []) { if ($this->autoSetupFabric) { $this->setupFabric(); From 01a5158f22b84f240849f9929733484802fbf790 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 15:00:48 +0200 Subject: [PATCH 05/16] Update Producer.php --- src/Kdyby/RabbitMq/Producer.php | 42 ++++++++++++++++----------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index cb5cc71f..8f899135 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -48,26 +48,24 @@ protected function getBasicProperties() return ['content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode]; } - - /** - * Publishes the message and merges additional properties with basic properties - * - * @param string $msgBody - * @param string $routingKey IF not provided or set to null, used default routingKey from configuration of this producer - * @param array $additionalProperties - */ - public function publish($msgBody, $routingKey = '', $additionalProperties = []) - { - if ($this->autoSetupFabric) { - $this->setupFabric(); - } - - if ($this -> routingKey && (func_num_args() <= 1 || $routingKey === NULL)) { - // routingKey parameter not provided or set to NULL, use default - $routingKey = $this -> routingKey ?: ''; - } - - $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties)); - $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); - } + /** + * Publishes the message and merges additional properties with basic properties + * + * @param string $msgBody + * @param string $routingKey IF not provided or set to null, used default routingKey from configuration of this producer + * @param array $additionalProperties + */ + public function publish($msgBody, $routingKey = '', $additionalProperties = []) + { + if ($this->autoSetupFabric) { + $this->setupFabric(); + } + if ($this -> routingKey && (func_num_args() <= 1 || $routingKey === NULL)) { + // routingKey parameter not provided or set to NULL, use default + $routingKey = $this -> routingKey; + } + + $msg = new AMQPMessage((string) $msgBody, array_merge($this->getBasicProperties(), $additionalProperties)); + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); + } } From 439933741139c8282916132a62580a540a9ba0b5 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 15:03:36 +0200 Subject: [PATCH 06/16] Update index.md --- docs/en/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/en/index.md b/docs/en/index.md index d09272b8..a40ecf12 100644 --- a/docs/en/index.md +++ b/docs/en/index.md @@ -148,6 +148,7 @@ The array of additional properties allows you to alter the properties with which This way, for example, you can change the application headers. You are able to set default routing key in producer. You can provide it by `setRoutingKey` method or in configuration like bellow. Default routing key will be used for calls of `publish` method without second parrameter, or when second parameter is set to `NULL`. Be aware of setting second parameter to empty string, which is considered as "send without routing key". + ```yaml ... producers: From 3cf0b218750494ec59aba11870763e48261cc328 Mon Sep 17 00:00:00 2001 From: kratkyzobak Date: Mon, 4 Jul 2016 15:19:51 +0200 Subject: [PATCH 07/16] Update index.md --- docs/en/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/index.md b/docs/en/index.md index a40ecf12..3cfaef0a 100644 --- a/docs/en/index.md +++ b/docs/en/index.md @@ -147,7 +147,7 @@ Besides the message itself, the `Kdyby\RabbitMq\Producer::publish()` method also The array of additional properties allows you to alter the properties with which an `PhpAmqpLib\Message\AMQPMessage` object gets constructed by default. This way, for example, you can change the application headers. -You are able to set default routing key in producer. You can provide it by `setRoutingKey` method or in configuration like bellow. Default routing key will be used for calls of `publish` method without second parrameter, or when second parameter is set to `NULL`. Be aware of setting second parameter to empty string, which is considered as "send without routing key". +You can set default routing key in producer context. You can provide it by `setRoutingKey` method or in configuration like bellow. Default routing key will be used in `publish` method calls with second argument ommited or set to `NULL`. Be aware, that setting second argument to empty string will lead to send empty string as routing key. ```yaml ... From dac3d267d223d11ceae138c10e650af52c594a31 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 14:35:10 +0200 Subject: [PATCH 08/16] Merge --- src/Kdyby/RabbitMq/Producer.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index 38ded739..49d1e8ce 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -60,10 +60,6 @@ public function publish($msgBody, $routingKey = '', $additionalProperties = []) if ($this->autoSetupFabric) { $this->setupFabric(); } - if ($this -> routingKey && (func_num_args() <= 1 || $routingKey === NULL)) { - // routingKey parameter not provided or set to NULL, use default - $routingKey = $this -> routingKey; - } if ($routingKey === '' || $routingKey === NULL) { // empty string or NULL $routingKey = $this->routingKey; From 46ffb4b404a1d77ff615ac6bf2306f9964dc760b Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 15:13:14 +0200 Subject: [PATCH 09/16] Add "global" consumer onStart listener. --- src/Kdyby/RabbitMq/DI/RabbitMqExtension.php | 8 +++++++ src/Kdyby/RabbitMq/IConsumerStartListener.php | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 src/Kdyby/RabbitMq/IConsumerStartListener.php diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 99798c3d..d84d39ab 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -222,6 +222,14 @@ public function loadConfiguration() public function beforeCompile() { + $listeners = array_keys($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\IConsumerStartListener')); + if ($listeners) { // setup onstart listener for every consumer if listeners are defined + foreach ($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\Consumer') as $serviceDefinition) { + foreach ($listeners as $listener) { + $serviceDefinition->addSetup('addConsumerStartListener', $listener); + } + } + } unset($this->getContainerBuilder()->parameters[$this->name]); } diff --git a/src/Kdyby/RabbitMq/IConsumerStartListener.php b/src/Kdyby/RabbitMq/IConsumerStartListener.php new file mode 100644 index 00000000..18ad52a2 --- /dev/null +++ b/src/Kdyby/RabbitMq/IConsumerStartListener.php @@ -0,0 +1,21 @@ + + */ +interface IConsumerStartListener +{ + + /** + * + * @param Consumer $consumer Consumer currently starting + * @return NULL nothing to return + */ + public function onStartListener(Consumer $consumer); + +} From 7aa723c7f870eeea41c3056826c1980533ecacba Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 17:34:57 +0200 Subject: [PATCH 10/16] Add "global" consumer onStart listener (fix addSetup call) --- src/Kdyby/RabbitMq/DI/RabbitMqExtension.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index d84d39ab..cc84a4ab 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -226,7 +226,7 @@ public function beforeCompile() if ($listeners) { // setup onstart listener for every consumer if listeners are defined foreach ($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\Consumer') as $serviceDefinition) { foreach ($listeners as $listener) { - $serviceDefinition->addSetup('addConsumerStartListener', $listener); + $serviceDefinition->addSetup('addConsumerStartListener', [$listener]); } } } From 6bc50a4b35f562c355ea6dd0fd43c1c0ebccb9bd Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 17:40:54 +0200 Subject: [PATCH 11/16] Add "global" consumer onStart listener and addConsumerStartListener to Consumer class --- src/Kdyby/RabbitMq/Consumer.php | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Kdyby/RabbitMq/Consumer.php b/src/Kdyby/RabbitMq/Consumer.php index 8fe5aa94..98252784 100644 --- a/src/Kdyby/RabbitMq/Consumer.php +++ b/src/Kdyby/RabbitMq/Consumer.php @@ -88,6 +88,13 @@ public function getMemoryLimit() } + /** + * Registers listener to onStart event. + * @param IConsumerStartListener $listener + */ + public function addConsumerStartListener(IConsumerStartListener $listener) { + $this -> onStart []= [$listener, 'onStartListener']; + } public function consume($msgAmount) { From 5494f0d78263b956ab94c7258b905186e1821d08 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 17:42:47 +0200 Subject: [PATCH 12/16] Add queue and callback info to onConsume method (mostly for MultipleConsumer recognition). --- src/Kdyby/RabbitMq/Consumer.php | 2 +- src/Kdyby/RabbitMq/MultipleConsumer.php | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Kdyby/RabbitMq/Consumer.php b/src/Kdyby/RabbitMq/Consumer.php index 98252784..8621bd8f 100644 --- a/src/Kdyby/RabbitMq/Consumer.php +++ b/src/Kdyby/RabbitMq/Consumer.php @@ -160,7 +160,7 @@ public function purge() public function processMessage(AMQPMessage $msg) { - $this->onConsume($this, $msg); + $this->onConsume($this, $msg, $this->queueOptions['name'], $this->callback); try { $processFlag = call_user_func($this->callback, $msg); $this->handleProcessMessage($msg, $processFlag); diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index ba0f6481..c5fa420e 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -95,10 +95,11 @@ public function processQueueMessage($queueName, AMQPMessage $msg) throw new QueueNotFoundException(); } - $this->onConsume($this, $msg); + $this->onConsume($this, $msg, $queueName, $this->queues[$queueName]['callback']); try { $processFlag = call_user_func($this->queues[$queueName]['callback'], $msg); $this->handleProcessMessage($msg, $processFlag); + $this->handleProcessMessage($msg, $processFlag); } catch (TerminateException $e) { $this->handleProcessMessage($msg, $e->getResponse()); From f0d9c3d3ddbbf27ec0565c92e092cdbe43e2bc8a Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 18:45:08 +0200 Subject: [PATCH 13/16] Merge upstream --- src/Kdyby/RabbitMq/Producer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Kdyby/RabbitMq/Producer.php b/src/Kdyby/RabbitMq/Producer.php index 49d1e8ce..ed7ee654 100644 --- a/src/Kdyby/RabbitMq/Producer.php +++ b/src/Kdyby/RabbitMq/Producer.php @@ -48,6 +48,8 @@ protected function getBasicProperties() return ['content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode]; } + + /** * Publishes the message and merges additional properties with basic properties * From 22376c206e5ef78727ca04d030d4d4766e09af06 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 18:45:49 +0200 Subject: [PATCH 14/16] Merge upstream --- src/Kdyby/RabbitMq/MultipleConsumer.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index c5fa420e..f3d57862 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -99,7 +99,6 @@ public function processQueueMessage($queueName, AMQPMessage $msg) try { $processFlag = call_user_func($this->queues[$queueName]['callback'], $msg); $this->handleProcessMessage($msg, $processFlag); - $this->handleProcessMessage($msg, $processFlag); } catch (TerminateException $e) { $this->handleProcessMessage($msg, $e->getResponse()); From 984116414dffa5e6c17ef1301f65ea2736be02b7 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 22:27:16 +0200 Subject: [PATCH 15/16] Fix stopConsuming for MultipleConsumer --- src/Kdyby/RabbitMq/MultipleConsumer.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index f3d57862..0c83aff0 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -89,6 +89,16 @@ protected function queueDeclare() + public function stopConsuming() + { + foreach ($this->queues as $name => $options) { + $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name)); + } + $this->onStop($this); + } + + + public function processQueueMessage($queueName, AMQPMessage $msg) { if (!isset($this->queues[$queueName])) { From bbd98ba2a3939fa5f93180daee2609ef9178e2e5 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Thu, 7 Jul 2016 22:29:27 +0200 Subject: [PATCH 16/16] Add support for limit time of consuming --- .../RabbitMq/Command/BaseConsumerCommand.php | 5 ++ src/Kdyby/RabbitMq/Consumer.php | 48 ++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php index b63978fd..f65a95c9 100644 --- a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php @@ -43,6 +43,7 @@ protected function configure() ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) + ->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Allowed time in seconds for this process', null) ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals'); } @@ -88,6 +89,10 @@ protected function initialize(InputInterface $input, OutputInterface $output) $this->consumer->setMemoryLimit($input->getOption('memory-limit')); } + if (!is_null($input->getOption('time-limit')) && ctype_digit((string) $input->getOption('time-limit')) && $input->getOption('time-limit') > 0) { + $this->consumer->setTimeLimit($input->getOption('time-limit')); + } + if ($routingKey = $input->getOption('route')) { $this->consumer->setRoutingKey($routingKey); } diff --git a/src/Kdyby/RabbitMq/Consumer.php b/src/Kdyby/RabbitMq/Consumer.php index 8621bd8f..ece18ee3 100644 --- a/src/Kdyby/RabbitMq/Consumer.php +++ b/src/Kdyby/RabbitMq/Consumer.php @@ -63,7 +63,15 @@ class Consumer extends BaseConsumer */ protected $memoryLimit; + /** + * @var int $timeLimit + */ + protected $timeLimit; + /** + * @var int Start timestamp + */ + private $startTimestamp; /** * Set the memory limit @@ -88,6 +96,30 @@ public function getMemoryLimit() } + /** + * Set the time limit + * + * @param int $timeLimit + */ + public function setTimeLimit($timeLimit) + { + $this->timeLimit = $timeLimit; + } + + + + /** + * Get the time limit + * + * @return int + */ + public function getTimeLimit() + { + return $this->timeLimit; + } + + + /** * Registers listener to onStart event. * @param IConsumerStartListener $listener @@ -99,6 +131,7 @@ public function addConsumerStartListener(IConsumerStartListener $listener) { public function consume($msgAmount) { $this->target = $msgAmount; + $this->startTimestamp = time(); $this->setupConsumer(); $this->onStart($this); @@ -205,7 +238,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) $this->consumed++; $this->maybeStopConsumer(); - if ($this->isRamAlmostOverloaded()) { + if ($this->isRamAlmostOverloaded() || $this->isTimeLimitExceeded()) { $this->stopConsuming(); } } @@ -226,4 +259,17 @@ protected function isRamAlmostOverloaded() return memory_get_usage(true) >= ($this->getMemoryLimit() * 1024 * 1024); } + + /** + * Checks if consumer running time is greater or equal for time allowed for this process + * + * @return boolean + */ + protected function isTimeLimitExceeded() { + if ($this->getTimeLimit() === NULL) { + return FALSE; + } + + return (time() - $this->startTimestamp) >= ($this->getTimeLimit()); + } }