From 83ae6ce01056f4505eaf84ed395ebd07b0964fbe Mon Sep 17 00:00:00 2001 From: Dinand Vanvelzen Date: Sat, 18 May 2024 13:03:32 -0500 Subject: [PATCH] - pubsub kafka backend: Added macro support for group id and group instance id --- .../include/gucefPUBSUB_CPubSubClientTopic.h | 9 ++++++ .../src/gucefPUBSUB_CPubSubClientTopic.cpp | 23 +++++++++++++++ ...subpluginKAFKA_CKafkaPubSubClientTopic.cpp | 28 +++++++++++++++++-- .../channel_templates/kafka2storage.json | 5 ++-- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientTopic.h b/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientTopic.h index aa60f8b7a..9d66fb7f0 100644 --- a/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientTopic.h +++ b/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientTopic.h @@ -310,6 +310,15 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientTopic : public CORE::CTSGNotifier virtual bool SetJournal( CIPubSubJournalBasicPtr journal ); virtual CIPubSubJournalBasicPtr GetJournal( void ) const; + + /** + * Helper function that can be used to resolve some common macros in strings + * The generic implementation at this level supports the following macros + * - {topicName} + * - {clientType} + */ + virtual bool TryResolveMacrosInString( const CORE::CString& testString , + CORE::CString& resultString ) const; }; /*-------------------------------------------------------------------------*/ diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp index 13a79194d..a3ad56a88 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp @@ -32,6 +32,11 @@ #define GUCEF_CORE_LOGGING_H #endif /* GUCEF_CORE_LOGGING_H ? */ +#ifndef GUCEF_PUBSUB_CPUBSUBCLIENT_H +#include "gucefPUBSUB_CPubSubClient.h" +#define GUCEF_PUBSUB_CPUBSUBCLIENT_H +#endif /* GUCEF_PUBSUB_CPUBSUBCLIENT_H ? */ + #include "gucefPUBSUB_CPubSubClientTopic.h" /*-------------------------------------------------------------------------// @@ -112,6 +117,24 @@ CPubSubClientTopic::~CPubSubClientTopic() /*-------------------------------------------------------------------------*/ +bool +CPubSubClientTopic::TryResolveMacrosInString( const CORE::CString& testString , + CORE::CString& resultString ) const +{GUCEF_TRACE; + + resultString = testString.ReplaceSubstr( "{topicName}", GetTopicName() ); + + const CPubSubClient* client = GetClient(); + if ( GUCEF_NULL != client ) + { + resultString = resultString.ReplaceSubstr( "{clientType}", client->GetType() ); + } + + return true; +} + +/*-------------------------------------------------------------------------*/ + bool CPubSubClientTopic::AcknowledgeReceipt( const CIPubSubMsg& msg ) {GUCEF_TRACE; diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp index ecab9e3f3..8facc5b81 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp @@ -708,10 +708,21 @@ CKafkaPubSubClientTopic::SetupBasedOnConfig( void ) { if ( !m_config.consumerGroupName.IsNULLOrEmpty() ) { - if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.id", m_config.consumerGroupName, errStr ) ) + CORE::CString consumerGroupName; + if ( !TryResolveMacrosInString( m_config.consumerGroupName, consumerGroupName ) ) + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to resolve macros in consumerGroupName" ); + } + + if ( RdKafka::Conf::CONF_OK == m_kafkaConsumerConf->set( "group.id", consumerGroupName, errStr ) ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic:LoadConfig: Set Kafka group.id to \"" + + consumerGroupName + "\" for topic " + m_config.topicName ); + } + else { GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer group id to \"" + - m_config.consumerGroupName + "\", error message: " + errStr ); + consumerGroupName + "\", error message: " + errStr ); ++m_kafkaErrorReplies; return false; } @@ -731,7 +742,18 @@ CKafkaPubSubClientTopic::SetupBasedOnConfig( void ) { if ( !m_config.consumerName.IsNULLOrEmpty() ) { - if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.instance.id", m_config.consumerName, errStr ) ) + CORE::CString consumerName; + if ( !TryResolveMacrosInString( m_config.consumerName, consumerName ) ) + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to resolve macros in consumerName" ); + } + + if ( RdKafka::Conf::CONF_OK == m_kafkaConsumerConf->set( "group.instance.id", consumerName, errStr ) ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic:LoadConfig: Set Kafka group.instance.id to \"" + + consumerName + "\" for topic " + m_config.topicName ); + } + else { GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer instance id to \"" + m_config.consumerName + "\", error message: " + errStr ); diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json b/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json index 8a4a6f065..a7c7e2ff3 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json @@ -44,7 +44,7 @@ "preferDedicatedConnection": false, "topicName": "ExampleKafkaTopicName", "consumerGroupName": "pubsubcapture", - "consumerName": "$HOSTNAME$", + "consumerName": "$HOSTNAME$-{topicName}", "CustomConfig": { "addProducerHostnameAsKafkaMsgHeader": false, "consumerModeStartOffset": "stored", @@ -70,7 +70,8 @@ "KafkaConsumerGlobalConfig": { "auto.offset.reset": "largest", "consume.callback.max.messages": "0", - "log_level": "7" + "log_level": "7", + "enable.auto.commit": "false" } } },