Skip to content

Commit

Permalink
Add logic to stop a consumer after a number of seconds (#233)
Browse files Browse the repository at this point in the history
* add logic to stop a consumer after a number of seconds

* Update src/Consumers/Consumer.php

Co-authored-by: Mateus Junges <[email protected]>

* Update src/Consumers/Consumer.php

Co-authored-by: Mateus Junges <[email protected]>

* Update src/Config/Config.php

Co-authored-by: Mateus Junges <[email protected]>

* add withMaxTime method on Kafka facade

---------

Co-authored-by: mihai.leu <[email protected]>
Co-authored-by: Mateus Junges <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent ba641b2 commit 75bec28
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 2 deletions.
8 changes: 8 additions & 0 deletions docs/consuming-messages/5-configuring-consumer-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ kafka consumer:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2);
```

### Configuring the max time when a consumer can process messages
If you want to consume a limited amount of time, you can use the `withMaxTime` method to set the max number of seconds for
kafka consumer to process messages:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxTime(3600);
```

### Setting Kafka configuration options
To set configuration options, you can use two methods: `withOptions`, passing an array of option and option value or, using the `withOption method and
passing two arguments, the option name and the option value.
Expand Down
5 changes: 5 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public function __construct(
private bool $stopAfterLastMessage = false,
private int $restartInterval = 1000,
private array $callbacks = [],
private int $maxTime = 0,
) {
$this->batchConfig = $batchConfig ?? new NullBatchConfig();
}
Expand Down Expand Up @@ -113,6 +114,10 @@ public function getMaxMessages(): int
{
return $this->maxMessages;
}
public function getMaxTime(): int
{
return $this->maxTime;
}

public function isAutoCommit(): bool
{
Expand Down
6 changes: 6 additions & 0 deletions src/Console/Commands/KafkaConsumer/Options.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Options
private ?int $commit = 1;
private ?string $dlq = null;
private int $maxMessages = -1;
private int $maxTime = 0;
private ?string $securityProtocol = null;
private ?string $saslUsername;
private ?string $saslPassword;
Expand Down Expand Up @@ -70,6 +71,11 @@ public function getMaxMessages(): int
return $this->maxMessages >= 1 ? $this->maxMessages : -1;
}

public function getMaxTime(): int
{
return $this->maxTime;
}

#[Pure]
public function getSasl(): ?Sasl
{
Expand Down
4 changes: 3 additions & 1 deletion src/Console/Commands/KafkaConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class KafkaConsumerCommand extends Command
{--commit=1}
{--dlq=? : The Dead Letter Queue}
{--maxMessage=? : The max number of messages that should be handled}
{--maxTime=0 : The max number of seconds that a consumer should run }
{--securityProtocol=?}';

protected $description = 'A Kafka Consumer for Laravel.';
Expand Down Expand Up @@ -70,7 +71,8 @@ public function handle()
consumer: app($consumer),
sasl: $options->getSasl(),
dlq: $options->getDlq(),
maxMessages: $options->getMaxMessages()
maxMessages: $options->getMaxMessages(),
maxTime: $options->getMaxTime(),
);

/** @var Consumer $consumer */
Expand Down
16 changes: 15 additions & 1 deletion src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Junges\Kafka\Message\ConsumedMessage;
use Junges\Kafka\MessageCounter;
use Junges\Kafka\Retryable;
use Junges\Kafka\Support\InfiniteTimer;
use Junges\Kafka\Support\Timer;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
Expand Down Expand Up @@ -79,6 +80,7 @@ public function consume(): void
{
$this->cancelStopConsume();
$this->configureRestartTimer();
$stopTimer = $this->configureStopTimer();

if ($this->supportAsyncSignals()) {
$this->listenForSignals();
Expand All @@ -103,7 +105,7 @@ public function consume(): void
do {
$this->retryable->retry(fn () => $this->doConsume());
$this->checkForRestart();
} while (! $this->maxMessagesLimitReached() && ! $this->stopRequested);
} while (! $this->maxMessagesLimitReached() && ! $stopTimer->isTimedOut() && ! $this->stopRequested);

if ($this->shouldRunStopConsumingCallback()) {
$callback = $this->whenStopConsuming;
Expand Down Expand Up @@ -350,6 +352,18 @@ private function maxMessagesLimitReached(): bool
return $this->messageCounter->maxMessagesLimitReached();
}

public function configureStopTimer(): Timer
{
$stopTimer = new Timer();
if ($this->config->getMaxTime() === 0) {
$stopTimer = new InfiniteTimer();
}

$stopTimer->start($this->config->getMaxTime() * 1000);

return $stopTimer;
}

/**
* Handle the message.
*
Expand Down
12 changes: 12 additions & 0 deletions src/Consumers/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ConsumerBuilder implements ConsumerBuilderContract
protected ?string $groupId;
protected Closure $handler;
protected int $maxMessages;
protected int $maxTime = 0;
protected int $maxCommitRetries;
protected string $brokers;
protected array $middlewares;
Expand Down Expand Up @@ -174,6 +175,16 @@ public function withMaxMessages(int $maxMessages): self
return $this;
}

/**
* @inheritDoc
*/
public function withMaxTime(int $maxTime): self
{
$this->maxTime = $maxTime;

return $this;
}

/**
* @inheritDoc
*/
Expand Down Expand Up @@ -325,6 +336,7 @@ public function build(): CanConsumeMessages
batchConfig: $this->getBatchConfig(),
stopAfterLastMessage: $this->stopAfterLastMessage,
callbacks: $this->callbacks,
maxTime: $this->maxTime,
);

return new Consumer($config, $this->deserializer, $this->committerFactory);
Expand Down
8 changes: 8 additions & 0 deletions src/Contracts/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public function usingCommitterFactory(CommitterFactory $committerFactory): self;
*/
public function withMaxMessages(int $maxMessages): self;

/**
* Define the max number seconds that a consumer should run
*
* @param int $maxTime
* @return \Junges\Kafka\Consumers\ConsumerBuilder
*/
public function withMaxTime(int $maxTime): self;

/**
* Specify the max retries attempts.
*
Expand Down
11 changes: 11 additions & 0 deletions src/Support/InfiniteTimer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace Junges\Kafka\Support;

class InfiniteTimer extends Timer
{
public function isTimedOut(): bool
{
return false;
}
}
50 changes: 50 additions & 0 deletions tests/Consumers/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,54 @@ function (KafkaConsumerMessage $message) {
//finaly only one message should be consumed
$this->assertEquals(1, $consumer->consumedMessagesCount());
}

public function testCanStopConsumeIfMaxTimeReached()
{
$message = new Message();
$message->err = 0;
$message->key = 'key';
$message->topic_name = 'test';
$message->payload = '{"body": "message payload"}';
$message->offset = 0;
$message->partition = 1;
$message->headers = [];

$message2 = new Message();
$message2->err = 0;
$message2->key = 'key2';
$message2->topic_name = 'test2';
$message2->payload = '{"body": "message payload2"}';
$message2->offset = 0;
$message2->partition = 1;
$message2->headers = [];

$this->mockConsumerWithMessage($message, $message2);
$this->mockProducer();

$fakeHandler = new CallableConsumer(
function (KafkaConsumerMessage $message) {
sleep(2);
},
[]
);

$config = new Config(
broker: 'broker',
topics: ['test-topic'],
securityProtocol: 'security',
commit: 1,
groupId: 'group',
consumer: $fakeHandler,
sasl: null,
dlq: null,
maxMessages: 2,
maxTime: 1,
);

$consumer = new Consumer($config, new JsonDeserializer());
$consumer->consume();

//finaly only one message should be consumed
$this->assertEquals(1, $consumer->consumedMessagesCount());
}
}

0 comments on commit 75bec28

Please sign in to comment.