Skip to content

Commit

Permalink
Merge pull request #5496 from ProcessMaker/feature/FOUR-11134
Browse files Browse the repository at this point in the history
FOUR-11134 Integration Nayra Service in a Cluster
  • Loading branch information
ryancooley authored Oct 20, 2023
2 parents d1c526b + 3ccfc2f commit bb3ce27
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
36 changes: 34 additions & 2 deletions ProcessMaker/Nayra/MessageBrokers/ServiceKafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Exception;
use Illuminate\Support\Facades\Log;
use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
use Junges\Kafka\Facades\Kafka;
use ProcessMaker\Helpers\DBHelper;
Expand Down Expand Up @@ -39,13 +40,20 @@ public function disconnect()
* @param string $subject
* @param string $collaborationId
* @param mixed $body
* @return void
*/
public function sendMessage(string $subject, string $collaborationId, mixed $body)
{
$producer = Kafka::publishOn($subject)
->withHeaders(['collaborationId' => $collaborationId])
->withBodyKey('body', $body);

// SASL Configuration
if ($this->hasSaslConfig()) {
$producer = $producer->withSasl($this->getSaslConfig());
}

$producer->send();
}

Expand All @@ -70,8 +78,14 @@ public function worker()
$prefix = config('kafka.prefix', '');
$consumer = Kafka::createConsumer([$prefix . self::QUEUE_NAME])
->withOption('heartbeat.interval.ms', $heartbeat)
->withOption('session.timeout.ms', $heartbeat * 10)
->withHandler(function (KafkaConsumerMessage $message) {
->withOption('session.timeout.ms', $heartbeat * 10);

// SASL Configuration
if ($this->hasSaslConfig()) {
$consumer = $consumer->withSasl($this->getSaslConfig());
}

$consumer = $consumer->withHandler(function (KafkaConsumerMessage $message) {
// Get transactions
$transactions = $message->getBody();

Expand All @@ -84,6 +98,24 @@ public function worker()
$consumer->consume();
}

private function hasSaslConfig(): bool
{
return config("kafka.sasl_mechanisms") ? true : false;
}

private function getSaslConfig(): ?Sasl
{
if ($this->hasSaslConfig()) {
return new Sasl(
username: config('kafka.sasl_username'),
password: config('kafka.sasl_password'),
mechanisms: config('kafka.sasl_mechanisms'),
securityProtocol: config('kafka.security_protocol'),
);
}
return null;
}

/**
* Store data
*
Expand Down
8 changes: 8 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@
| The prefix that will be used in the topic names
*/
'prefix' => env('KAFKA_PREFIX', ''),

/*
| SASL configuration
*/
'sasl_password' => env('KAFKA_SASL_PASSWORD'),
'sasl_username' => env('KAFKA_SASL_USERNAME'),
'sasl_mechanisms' => env('KAFKA_SASL_MECHANISMS', 'SCRAM-SHA-256'),
'security_protocol' => env('KAFKA_SECURITY_PROTOCOL', 'SASL_PLAINTEXT'),
];

0 comments on commit bb3ce27

Please sign in to comment.