Skip to content

Commit

Permalink
Add support for Kafka SASL
Browse files Browse the repository at this point in the history
  • Loading branch information
caleeli committed Oct 18, 2023
1 parent 1ce3837 commit f4551b7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
35 changes: 33 additions & 2 deletions ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace ProcessMaker\Nayra\MessageBrokers;

use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
use Junges\Kafka\Facades\Kafka;
use ProcessMaker\Helpers\DBHelper;
Expand Down Expand Up @@ -43,6 +44,12 @@ public function sendMessage(string $subject, string $collaborationId, mixed $bod
$producer = Kafka::publishOn($subject)
->withHeaders(['collaborationId' => $collaborationId])
->withBodyKey('body', $body);

// SASL Configuration
if ($this->hasSaslConfig()) {
$producer = $producer->withSasl($this->getSaslConfig());
}

$producer->send();
}

Expand All @@ -67,8 +74,14 @@ public function worker()
$prefix = config('kafka.prefix', '');
$consumer = Kafka::createConsumer([$prefix . self::QUEUE_NAME])
->withOption('heartbeat.interval.ms', $heartbeat)
->withOption('session.timeout.ms', $heartbeat * 10)
->withHandler(function (KafkaConsumerMessage $message) {
->withOption('session.timeout.ms', $heartbeat * 10);

// SASL Configuration
if ($this->hasSaslConfig()) {
$consumer = $consumer->withSasl($this->getSaslConfig());
}

$consumer = $consumer->withHandler(function (KafkaConsumerMessage $message) {
// Get transactions
$transactions = $message->getBody();

Expand All @@ -81,6 +94,24 @@ public function worker()
$consumer->consume();
}

private function hasSaslConfig(): bool
{
return config("kafka.sasl_mechanisms") ? true : false;
}

private function getSaslConfig(): ?Sasl
{
if ($this->hasSaslConfig()) {
return new Sasl(
username: config('kafka.sasl_username'),
password: config('kafka.sasl_password'),
mechanisms: config('kafka.sasl_mechanisms'),
securityProtocol: config('kafka.security_protocol'),
);
}
return null;
}

/**
* Store data
*
Expand Down
8 changes: 8 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@
| The prefix that will be used in the topic names
*/
'prefix' => env('KAFKA_PREFIX', ''),

/*
| SASL configuration
*/
'sasl_password' => env('KAFKA_SASL_PASSWORD'),
'sasl_username' => env('KAFKA_SASL_USERNAME'),
'sasl_mechanisms' => env('KAFKA_SASL_MECHANISMS', 'SCRAM-SHA-256'),
'security_protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_PLAINTEXT'),
];

0 comments on commit f4551b7

Please sign in to comment.