From f4551b7c61698fcb310991968ecd8ceffcaa834e Mon Sep 17 00:00:00 2001 From: David Callizaya Date: Wed, 18 Oct 2023 06:39:28 -0400 Subject: [PATCH] Add support for Kafka SASL --- .../Nayra/MessageBrokers/ServiceKafka.php | 35 +++++++++++++++++-- config/kafka.php | 8 +++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php b/ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php index 123708c3f4..db690ed1fa 100644 --- a/ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php +++ b/ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php @@ -2,6 +2,7 @@ namespace ProcessMaker\Nayra\MessageBrokers; +use Junges\Kafka\Config\Sasl; use Junges\Kafka\Contracts\KafkaConsumerMessage; use Junges\Kafka\Facades\Kafka; use ProcessMaker\Helpers\DBHelper; @@ -43,6 +44,12 @@ public function sendMessage(string $subject, string $collaborationId, mixed $bod $producer = Kafka::publishOn($subject) ->withHeaders(['collaborationId' => $collaborationId]) ->withBodyKey('body', $body); + + // SASL Configuration + if ($this->hasSaslConfig()) { + $producer = $producer->withSasl($this->getSaslConfig()); + } + $producer->send(); } @@ -67,8 +74,14 @@ public function worker() $prefix = config('kafka.prefix', ''); $consumer = Kafka::createConsumer([$prefix . self::QUEUE_NAME]) ->withOption('heartbeat.interval.ms', $heartbeat) - ->withOption('session.timeout.ms', $heartbeat * 10) - ->withHandler(function (KafkaConsumerMessage $message) { + ->withOption('session.timeout.ms', $heartbeat * 10); + + // SASL Configuration + if ($this->hasSaslConfig()) { + $consumer = $consumer->withSasl($this->getSaslConfig()); + } + + $consumer = $consumer->withHandler(function (KafkaConsumerMessage $message) { // Get transactions $transactions = $message->getBody(); @@ -81,6 +94,24 @@ public function worker() $consumer->consume(); } + private function hasSaslConfig(): bool + { + return config("kafka.sasl_mechanisms") ? true : false; + } + + private function getSaslConfig(): ?Sasl + { + if ($this->hasSaslConfig()) { + return new Sasl( + username: config('kafka.sasl_username'), + password: config('kafka.sasl_password'), + mechanisms: config('kafka.sasl_mechanisms'), + securityProtocol: config('kafka.security_protocol'), + ); + } + return null; + } + /** * Store data * diff --git a/config/kafka.php b/config/kafka.php index af262b1787..3287de49ae 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -65,4 +65,12 @@ | The prefix that will be used in the topic names */ 'prefix' => env('KAFKA_PREFIX', ''), + + /* + | SASL configuration + */ + 'sasl_password' => env('KAFKA_SASL_PASSWORD'), + 'sasl_username' => env('KAFKA_SASL_USERNAME'), + 'sasl_mechanisms' => env('KAFKA_SASL_MECHANISMS', 'SCRAM-SHA-256'), + 'security_protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_PLAINTEXT'), ];