Skip to content

Commit

Permalink
- pubsub kafka backend: Added macro support for group id and group in…
Browse files Browse the repository at this point in the history
…stance id
  • Loading branch information
LiberatorUSA committed May 18, 2024
1 parent 4cd0a33 commit 83ae6ce
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
9 changes: 9 additions & 0 deletions platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientTopic.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientTopic : public CORE::CTSGNotifier
virtual bool SetJournal( CIPubSubJournalBasicPtr journal );

virtual CIPubSubJournalBasicPtr GetJournal( void ) const;

/**
* Helper function that can be used to resolve some common macros in strings
* The generic implementation at this level supports the following macros
* - {topicName}
* - {clientType}
*/
virtual bool TryResolveMacrosInString( const CORE::CString& testString ,
CORE::CString& resultString ) const;
};

/*-------------------------------------------------------------------------*/
Expand Down
23 changes: 23 additions & 0 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
#define GUCEF_CORE_LOGGING_H
#endif /* GUCEF_CORE_LOGGING_H ? */

#ifndef GUCEF_PUBSUB_CPUBSUBCLIENT_H
#include "gucefPUBSUB_CPubSubClient.h"
#define GUCEF_PUBSUB_CPUBSUBCLIENT_H
#endif /* GUCEF_PUBSUB_CPUBSUBCLIENT_H ? */

#include "gucefPUBSUB_CPubSubClientTopic.h"

/*-------------------------------------------------------------------------//
Expand Down Expand Up @@ -112,6 +117,24 @@ CPubSubClientTopic::~CPubSubClientTopic()

/*-------------------------------------------------------------------------*/

bool
CPubSubClientTopic::TryResolveMacrosInString( const CORE::CString& testString ,
CORE::CString& resultString ) const
{GUCEF_TRACE;

resultString = testString.ReplaceSubstr( "{topicName}", GetTopicName() );

const CPubSubClient* client = GetClient();
if ( GUCEF_NULL != client )
{
resultString = resultString.ReplaceSubstr( "{clientType}", client->GetType() );
}

return true;
}

/*-------------------------------------------------------------------------*/

bool
CPubSubClientTopic::AcknowledgeReceipt( const CIPubSubMsg& msg )
{GUCEF_TRACE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,21 @@ CKafkaPubSubClientTopic::SetupBasedOnConfig( void )
{
if ( !m_config.consumerGroupName.IsNULLOrEmpty() )
{
if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.id", m_config.consumerGroupName, errStr ) )
CORE::CString consumerGroupName;
if ( !TryResolveMacrosInString( m_config.consumerGroupName, consumerGroupName ) )
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to resolve macros in consumerGroupName" );
}

if ( RdKafka::Conf::CONF_OK == m_kafkaConsumerConf->set( "group.id", consumerGroupName, errStr ) )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic:LoadConfig: Set Kafka group.id to \"" +
consumerGroupName + "\" for topic " + m_config.topicName );
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer group id to \"" +
m_config.consumerGroupName + "\", error message: " + errStr );
consumerGroupName + "\", error message: " + errStr );
++m_kafkaErrorReplies;
return false;
}
Expand All @@ -731,7 +742,18 @@ CKafkaPubSubClientTopic::SetupBasedOnConfig( void )
{
if ( !m_config.consumerName.IsNULLOrEmpty() )
{
if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.instance.id", m_config.consumerName, errStr ) )
CORE::CString consumerName;
if ( !TryResolveMacrosInString( m_config.consumerName, consumerName ) )
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to resolve macros in consumerName" );
}

if ( RdKafka::Conf::CONF_OK == m_kafkaConsumerConf->set( "group.instance.id", consumerName, errStr ) )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic:LoadConfig: Set Kafka group.instance.id to \"" +
consumerName + "\" for topic " + m_config.topicName );
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer instance id to \"" +
m_config.consumerName + "\", error message: " + errStr );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"preferDedicatedConnection": false,
"topicName": "ExampleKafkaTopicName",
"consumerGroupName": "pubsubcapture",
"consumerName": "$HOSTNAME$",
"consumerName": "$HOSTNAME$-{topicName}",
"CustomConfig": {
"addProducerHostnameAsKafkaMsgHeader": false,
"consumerModeStartOffset": "stored",
Expand All @@ -70,7 +70,8 @@
"KafkaConsumerGlobalConfig": {
"auto.offset.reset": "largest",
"consume.callback.max.messages": "0",
"log_level": "7"
"log_level": "7",
"enable.auto.commit": "false"
}
}
},
Expand Down

0 comments on commit 83ae6ce

Please sign in to comment.