diff --git a/docs/consuming-messages/5-configuring-consumer-options.md b/docs/consuming-messages/5-configuring-consumer-options.md index 39a3a05c..d2155771 100644 --- a/docs/consuming-messages/5-configuring-consumer-options.md +++ b/docs/consuming-messages/5-configuring-consumer-options.md @@ -35,6 +35,14 @@ kafka consumer: $consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2); ``` +### Configuring the max time when a consumer can process messages +If you want to consume a limited amount of time, you can use the `withMaxTime` method to set the max number of seconds for +kafka consumer to process messages: + +```php +$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxTime(3600); +``` + ### Setting Kafka configuration options To set configuration options, you can use two methods: `withOptions`, passing an array of option and option value or, using the `withOption method and passing two arguments, the option name and the option value. diff --git a/src/Config/Config.php b/src/Config/Config.php index 5e6e10ce..ee11c1bd 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -80,6 +80,7 @@ public function __construct( private bool $stopAfterLastMessage = false, private int $restartInterval = 1000, private array $callbacks = [], + private int $maxTime = 0, ) { $this->batchConfig = $batchConfig ?? new NullBatchConfig(); } @@ -113,6 +114,10 @@ public function getMaxMessages(): int { return $this->maxMessages; } + public function getMaxTime(): int + { + return $this->maxTime; + } public function isAutoCommit(): bool { diff --git a/src/Console/Commands/KafkaConsumer/Options.php b/src/Console/Commands/KafkaConsumer/Options.php index 5d11a506..467afd05 100644 --- a/src/Console/Commands/KafkaConsumer/Options.php +++ b/src/Console/Commands/KafkaConsumer/Options.php @@ -14,6 +14,7 @@ class Options private ?int $commit = 1; private ?string $dlq = null; private int $maxMessages = -1; + private int $maxTime = 0; private ?string $securityProtocol = null; private ?string $saslUsername; private ?string $saslPassword; @@ -70,6 +71,11 @@ public function getMaxMessages(): int return $this->maxMessages >= 1 ? $this->maxMessages : -1; } + public function getMaxTime(): int + { + return $this->maxTime; + } + #[Pure] public function getSasl(): ?Sasl { diff --git a/src/Console/Commands/KafkaConsumerCommand.php b/src/Console/Commands/KafkaConsumerCommand.php index e138454b..6db155cc 100644 --- a/src/Console/Commands/KafkaConsumerCommand.php +++ b/src/Console/Commands/KafkaConsumerCommand.php @@ -18,6 +18,7 @@ class KafkaConsumerCommand extends Command {--commit=1} {--dlq=? : The Dead Letter Queue} {--maxMessage=? : The max number of messages that should be handled} + {--maxTime=0 : The max number of seconds that a consumer should run } {--securityProtocol=?}'; protected $description = 'A Kafka Consumer for Laravel.'; @@ -70,7 +71,8 @@ public function handle() consumer: app($consumer), sasl: $options->getSasl(), dlq: $options->getDlq(), - maxMessages: $options->getMaxMessages() + maxMessages: $options->getMaxMessages(), + maxTime: $options->getMaxTime(), ); /** @var Consumer $consumer */ diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index 51aea68f..858cd6d4 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -18,6 +18,7 @@ use Junges\Kafka\Message\ConsumedMessage; use Junges\Kafka\MessageCounter; use Junges\Kafka\Retryable; +use Junges\Kafka\Support\InfiniteTimer; use Junges\Kafka\Support\Timer; use RdKafka\Conf; use RdKafka\KafkaConsumer; @@ -79,6 +80,7 @@ public function consume(): void { $this->cancelStopConsume(); $this->configureRestartTimer(); + $stopTimer = $this->configureStopTimer(); if ($this->supportAsyncSignals()) { $this->listenForSignals(); @@ -103,7 +105,7 @@ public function consume(): void do { $this->retryable->retry(fn () => $this->doConsume()); $this->checkForRestart(); - } while (! $this->maxMessagesLimitReached() && ! $this->stopRequested); + } while (! $this->maxMessagesLimitReached() && ! $stopTimer->isTimedOut() && ! $this->stopRequested); if ($this->shouldRunStopConsumingCallback()) { $callback = $this->whenStopConsuming; @@ -350,6 +352,18 @@ private function maxMessagesLimitReached(): bool return $this->messageCounter->maxMessagesLimitReached(); } + public function configureStopTimer(): Timer + { + $stopTimer = new Timer(); + if ($this->config->getMaxTime() === 0) { + $stopTimer = new InfiniteTimer(); + } + + $stopTimer->start($this->config->getMaxTime() * 1000); + + return $stopTimer; + } + /** * Handle the message. * diff --git a/src/Consumers/ConsumerBuilder.php b/src/Consumers/ConsumerBuilder.php index b2d492d2..15f0533c 100644 --- a/src/Consumers/ConsumerBuilder.php +++ b/src/Consumers/ConsumerBuilder.php @@ -26,6 +26,7 @@ class ConsumerBuilder implements ConsumerBuilderContract protected ?string $groupId; protected Closure $handler; protected int $maxMessages; + protected int $maxTime = 0; protected int $maxCommitRetries; protected string $brokers; protected array $middlewares; @@ -174,6 +175,16 @@ public function withMaxMessages(int $maxMessages): self return $this; } + /** + * @inheritDoc + */ + public function withMaxTime(int $maxTime): self + { + $this->maxTime = $maxTime; + + return $this; + } + /** * @inheritDoc */ @@ -325,6 +336,7 @@ public function build(): CanConsumeMessages batchConfig: $this->getBatchConfig(), stopAfterLastMessage: $this->stopAfterLastMessage, callbacks: $this->callbacks, + maxTime: $this->maxTime, ); return new Consumer($config, $this->deserializer, $this->committerFactory); diff --git a/src/Contracts/ConsumerBuilder.php b/src/Contracts/ConsumerBuilder.php index 250b0707..75249885 100644 --- a/src/Contracts/ConsumerBuilder.php +++ b/src/Contracts/ConsumerBuilder.php @@ -82,6 +82,14 @@ public function usingCommitterFactory(CommitterFactory $committerFactory): self; */ public function withMaxMessages(int $maxMessages): self; + /** + * Define the max number seconds that a consumer should run + * + * @param int $maxTime + * @return \Junges\Kafka\Consumers\ConsumerBuilder + */ + public function withMaxTime(int $maxTime): self; + /** * Specify the max retries attempts. * diff --git a/src/Support/InfiniteTimer.php b/src/Support/InfiniteTimer.php new file mode 100644 index 00000000..253d470f --- /dev/null +++ b/src/Support/InfiniteTimer.php @@ -0,0 +1,11 @@ +assertEquals(1, $consumer->consumedMessagesCount()); } + + public function testCanStopConsumeIfMaxTimeReached() + { + $message = new Message(); + $message->err = 0; + $message->key = 'key'; + $message->topic_name = 'test'; + $message->payload = '{"body": "message payload"}'; + $message->offset = 0; + $message->partition = 1; + $message->headers = []; + + $message2 = new Message(); + $message2->err = 0; + $message2->key = 'key2'; + $message2->topic_name = 'test2'; + $message2->payload = '{"body": "message payload2"}'; + $message2->offset = 0; + $message2->partition = 1; + $message2->headers = []; + + $this->mockConsumerWithMessage($message, $message2); + $this->mockProducer(); + + $fakeHandler = new CallableConsumer( + function (KafkaConsumerMessage $message) { + sleep(2); + }, + [] + ); + + $config = new Config( + broker: 'broker', + topics: ['test-topic'], + securityProtocol: 'security', + commit: 1, + groupId: 'group', + consumer: $fakeHandler, + sasl: null, + dlq: null, + maxMessages: 2, + maxTime: 1, + ); + + $consumer = new Consumer($config, new JsonDeserializer()); + $consumer->consume(); + + //finaly only one message should be consumed + $this->assertEquals(1, $consumer->consumedMessagesCount()); + } }