diff --git a/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClient.h b/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClient.h index 300eb023d..501292ec3 100644 --- a/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClient.h +++ b/platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClient.h @@ -121,21 +121,21 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClient : public CORE::CTSGNotifier virtual bool GetSupportedFeatures( CPubSubClientFeatures& features ) const = 0; - virtual CPubSubClientTopicPtr CreateTopicAccess( CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) = 0; + virtual CPubSubClientTopicBasicPtr CreateTopicAccess( CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) = 0; /** * Same as the version that takes an entire config except the expectation here is that the topic * is already configured via a CPubSubClientConfig but not yet instantiated * This would be the typical case when using global app config defined topics and not programatic topic access */ - virtual CPubSubClientTopicPtr CreateTopicAccess( const CString& topicName , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ); + virtual CPubSubClientTopicBasicPtr CreateTopicAccess( const CString& topicName , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ); - virtual CPubSubClientTopicPtr GetTopicAccess( const CString& topicName ) = 0; + virtual CPubSubClientTopicBasicPtr GetTopicAccess( const CString& topicName ) = 0; - virtual CPubSubClientTopicPtr GetOrCreateTopicAccess( const CString& topicName , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ); + virtual CPubSubClientTopicBasicPtr GetOrCreateTopicAccess( const CString& topicName , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ); virtual bool GetMultiTopicAccess( CPubSubClientTopicConfigPtr topicConfig , PubSubClientTopicSet& topicAccess ); @@ -317,7 +317,7 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClient : public CORE::CTSGNotifier virtual bool ConfigureJournal( CPubSubClientConfigPtr clientConfig ); - virtual bool ConfigureJournal( CPubSubClientTopicPtr topic , + virtual bool ConfigureJournal( CPubSubClientTopicBasicPtr topic , CPubSubClientTopicConfigPtr topicConfig ); private: diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClient.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClient.cpp index 5575fbea9..a75572269 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClient.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClient.cpp @@ -218,14 +218,14 @@ CPubSubClient::SetPulseGenerator( CORE::PulseGeneratorPtr newPulseGenerator ) /*-------------------------------------------------------------------------*/ -CPubSubClientTopicPtr +CPubSubClientTopicBasicPtr CPubSubClient::GetOrCreateTopicAccess( const CString& topicName , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; MT::CObjectScopeLock lock( this ); - CPubSubClientTopicPtr topicAccess = GetTopicAccess( topicName ); + CPubSubClientTopicBasicPtr topicAccess = GetTopicAccess( topicName ); if ( topicAccess.IsNULL() ) { topicAccess = CreateTopicAccess( topicName, pulseGenerator ); @@ -235,7 +235,7 @@ CPubSubClient::GetOrCreateTopicAccess( const CString& topicName , /*-------------------------------------------------------------------------*/ -CPubSubClientTopicPtr +CPubSubClientTopicBasicPtr CPubSubClient::CreateTopicAccess( const CString& topicName , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -245,7 +245,7 @@ CPubSubClient::CreateTopicAccess( const CString& topicName , CPubSubClientTopicConfigPtr topicConfig = GetOrCreateTopicConfig( topicName ); if ( !topicConfig.IsNULL() ) { - CPubSubClientTopicPtr topicAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + CPubSubClientTopicBasicPtr topicAccess = CreateTopicAccess( topicConfig, pulseGenerator ); return topicAccess; } return CPubSubClientTopicPtr(); @@ -275,7 +275,7 @@ CPubSubClient::GetMultiTopicAccess( const CString& topicName , // As such it redirects to the basic GetTopicAccess() // Backends should override this if they support pattern matching access - CPubSubClientTopicPtr tAccess = GetTopicAccess( topicName ); + CPubSubClientTopicBasicPtr tAccess = GetTopicAccess( topicName ); if ( !tAccess.IsNULL() ) { topicAccess.insert( tAccess ); @@ -353,7 +353,7 @@ CPubSubClient::CreateMultiTopicAccess( CPubSubClientTopicConfigPtr topicConfig , // As such it redirects to the basic CreateTopicAccess() // Backends should override this if they support pattern matching access - CPubSubClientTopicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); if ( !tAccess.IsNULL() ) { topicAccess.insert( tAccess ); @@ -568,7 +568,7 @@ CPubSubClient::ConfigureJournal( CPubSubClientConfigPtr clientConfig ) /*-------------------------------------------------------------------------*/ bool -CPubSubClient::ConfigureJournal( CPubSubClientTopicPtr topic , +CPubSubClient::ConfigureJournal( CPubSubClientTopicBasicPtr topic , CPubSubClientTopicConfigPtr topicConfig ) {GUCEF_TRACE; diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp index 80b826ef3..14b8298cf 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp @@ -653,7 +653,7 @@ CPubSubClientSide::OnTopicAccessCreated( CORE::CNotifier* notifier , return; CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData ); - CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); + CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); if ( !topicAccess.IsNULL() ) { MT::CScopeWriterLock lock( m_rwdataLock ); @@ -706,7 +706,7 @@ CPubSubClientSide::OnTopicAccessDestroyed( CORE::CNotifier* notifier , } CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData ); - CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); + CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); if ( !topicAccess.IsNULL() ) { MT::CScopeWriterLock lock( m_rwdataLock ); diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp index 3d3e5a6bd..38fc213f2 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp @@ -381,49 +381,62 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi if ( !topicAccess.IsNULL() ) { const CORE::CString& topicName = topicAccess->GetTopicName(); - if ( routeConfig->preferFromTopicThreadForDestination && fromPulseGenerator.IsNULL() ) - { - fromPulseGenerator = topicAccess->GetPulseGenerator(); - if ( fromPulseGenerator.IsNULL() ) - fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator(); - if ( !fromPulseGenerator.IsNULL() ) - { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, will be using pulse generator for thread " + - CORE::ToString( fromPulseGenerator->GetPulseDriverThreadId() ) ); - } - else + if ( topicName.HasChar( '*' ) < 0 ) + { + if ( routeConfig->preferFromTopicThreadForDestination && fromPulseGenerator.IsNULL() ) { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, however no suitable pulse generator is available" ); + fromPulseGenerator = topicAccess->GetPulseGenerator(); + if ( fromPulseGenerator.IsNULL() ) + fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator(); + + if ( !fromPulseGenerator.IsNULL() ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, will be using pulse generator for thread " + + CORE::ToString( fromPulseGenerator->GetPulseDriverThreadId() ) ); + } + else + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, however no suitable pulse generator is available" ); + } } - } - CPubSubFlowRouteTopicConfigPtr autoTopicRouteConfig = CPubSubFlowRouteTopicConfig::CreateSharedObj(); - autoTopicRouteConfig->fromSideTopicName = topicName; + CPubSubFlowRouteTopicConfigPtr autoTopicRouteConfig = CPubSubFlowRouteTopicConfig::CreateSharedObj(); + autoTopicRouteConfig->fromSideTopicName = topicName; - if ( routeConfig->toSideTopicsAutoMatchFromSide ) - autoTopicRouteConfig->toSideTopicName = topicName; - else - autoTopicRouteConfig->toSideTopicName = topicRouteConfig->toSideTopicName; + if ( routeConfig->toSideTopicsAutoMatchFromSide ) + autoTopicRouteConfig->toSideTopicName = topicName; + else + autoTopicRouteConfig->toSideTopicName = topicRouteConfig->toSideTopicName; - if ( routeConfig->failoverSideTopicsAutoMatchFromSide ) - autoTopicRouteConfig->failoverSideTopicName = topicName; - else - autoTopicRouteConfig->failoverSideTopicName = topicRouteConfig->failoverSideTopicName; + if ( routeConfig->failoverSideTopicsAutoMatchFromSide ) + autoTopicRouteConfig->failoverSideTopicName = topicName; + else + autoTopicRouteConfig->failoverSideTopicName = topicRouteConfig->failoverSideTopicName; - if ( routeConfig->spilloverSideTopicsAutoMatchFromSide ) - autoTopicRouteConfig->spilloverSideTopicName = topicName; - else - autoTopicRouteConfig->spilloverSideTopicName = topicRouteConfig->spilloverSideTopicName; + if ( routeConfig->spilloverSideTopicsAutoMatchFromSide ) + autoTopicRouteConfig->spilloverSideTopicName = topicName; + else + autoTopicRouteConfig->spilloverSideTopicName = topicRouteConfig->spilloverSideTopicName; - if ( routeConfig->deadLetterSideTopicsAutoMatchFromSide ) - autoTopicRouteConfig->deadLetterSideTopicName = topicName; - else - autoTopicRouteConfig->deadLetterSideTopicName = topicRouteConfig->deadLetterSideTopicName; + if ( routeConfig->deadLetterSideTopicsAutoMatchFromSide ) + autoTopicRouteConfig->deadLetterSideTopicName = topicName; + else + autoTopicRouteConfig->deadLetterSideTopicName = topicRouteConfig->deadLetterSideTopicName; - totalSuccess = MatchTopicRouteConfig( autoTopicRouteConfig , - fromPulseGenerator , - destPulseGenerator ) && totalSuccess; + totalSuccess = MatchTopicRouteConfig( autoTopicRouteConfig , + fromPulseGenerator , + destPulseGenerator ) && totalSuccess; + } + else + { + // Something went wrong. We should not get back a topic with a topic mame which still + // has wildcards in it. That will confuse this code. The backend should resolve to non wildcard containing names + totalSuccess = false; + GUCEF_ERROR_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: glob pattern topic \"" + + topicRouteConfig->fromSideTopicName + "\" for side \"" + fromSide->GetSideId() + "\" was matched by the backend to topic \"" + + topicName + "\" which still contains a wildcard. This is not supported." ); + } } ++i; } @@ -456,7 +469,8 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi if ( fromPulseGenerator.IsNULL() && routeConfig->preferFromTopicThreadForDestination ) { - fromPulseGenerator = topicLinks.fromTopic->GetPulseGenerator(); + if ( GUCEF_NULL != topicLinks.fromTopic ) + fromPulseGenerator = topicLinks.fromTopic->GetPulseGenerator(); if ( fromPulseGenerator.IsNULL() ) fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator(); @@ -477,8 +491,16 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi if ( topicLinks.fromTopic != oldFromTopic ) { - fromSideTopicLinks.erase( oldFromTopic ); - fromSideTopicLinks[ topicLinks.fromTopic ] = &topicLinks; + if ( GUCEF_NULL != oldFromTopic ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Erasing map entry for old topic implementation: " + CORE::ToString( oldFromTopic ) ); + fromSideTopicLinks.erase( oldFromTopic ); + } + if ( GUCEF_NULL != topicLinks.fromTopic ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Adding map entry for new topic implementation: " + CORE::ToString( topicLinks.fromTopic ) ); + fromSideTopicLinks[ topicLinks.fromTopic ] = &topicLinks; + } } if ( GUCEF_NULL != toSide && !topicRouteConfig->toSideTopicName.IsNULLOrEmpty() ) @@ -2779,7 +2801,7 @@ CPubSubFlowRouter::OnSidePubSubClientTopicCreation( CORE::CNotifier* notifier return; CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData ); - CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); + CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName ); if ( !topicAccess.IsNULL() ) { // create the topic association but dont connect yet diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h index f10630508..e6e9a3aa2 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h @@ -87,10 +87,10 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp index bbdf15488..60d9505e7 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp @@ -178,7 +178,7 @@ CAwsSqsPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& featur /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CAwsSqsPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -211,7 +211,7 @@ CAwsSqsPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topi /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CAwsSqsPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -222,7 +222,7 @@ CAwsSqsPubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/include/pubsubpluginKAFKA_CKafkaPubSubClient.h b/plugins/PUBSUB/pubsubpluginKAFKA/include/pubsubpluginKAFKA_CKafkaPubSubClient.h index 18fc928bd..51893cd85 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/include/pubsubpluginKAFKA_CKafkaPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginKAFKA/include/pubsubpluginKAFKA_CKafkaPubSubClient.h @@ -77,6 +77,8 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClient : public PUBSUB:: static const CORE::CString TypeName; + typedef std::map< CKafkaPubSubClientTopicConfigPtr , CORE::CString::StringSet > TTopicConfigPtrToStringSetMap; + CKafkaPubSubClient( const PUBSUB::CPubSubClientConfig& config ); virtual ~CKafkaPubSubClient() GUCEF_VIRTUAL_OVERRIDE; @@ -89,10 +91,29 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClient : public PUBSUB:: virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + + virtual bool GetMultiTopicAccess( const CORE::CString& topicName , + PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; + + virtual bool GetMultiTopicAccess( const CORE::CString::StringSet& topicNames , + PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual bool CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + + bool AutoCreateMultiTopicAccess( CKafkaPubSubClientTopicConfigPtr templateTopicConfig , + const CORE::CString::StringSet& topicNameList , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ); + + bool AutoCreateMultiTopicAccess( const TTopicConfigPtrToStringSetMap& topicsToCreate , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ); virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; @@ -174,15 +195,20 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClient : public PUBSUB:: bool SetupBasedOnConfig( void ); + bool SetupKafkaMetaDataAccess( void ); + private: typedef CORE::CTEventHandlerFunctor< CKafkaPubSubClient > TEventCallback; typedef std::map< CORE::CString, CKafkaPubSubClientTopicPtr > TTopicMap; + RdKafka::Conf* m_kafkaProducerConf; + RdKafka::Producer* m_kafkaProducer; CKafkaPubSubClientConfig m_config; CORE::CTimer* m_metricsTimer; TTopicMap m_topicMap; CORE::ThreadPoolPtr m_threadPool; + CORE::UInt32 m_kafkaErrorReplies; mutable bool m_isHealthy; MT::CMutex m_lock; }; diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp index 5a30e07c8..2e0cf0b96 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp @@ -72,9 +72,12 @@ const CORE::CString CKafkaPubSubClient::TypeName = "Kafka"; CKafkaPubSubClient::CKafkaPubSubClient( const PUBSUB::CPubSubClientConfig& config ) : PUBSUB::CPubSubClient() + , m_kafkaProducerConf( GUCEF_NULL ) + , m_kafkaProducer( GUCEF_NULL ) , m_config() , m_metricsTimer( GUCEF_NULL ) , m_topicMap() + , m_kafkaErrorReplies( 0 ) , m_isHealthy( true ) , m_lock() {GUCEF_TRACE; @@ -115,6 +118,11 @@ CKafkaPubSubClient::Clear( void ) ++i; } m_topicMap.clear(); + + GUCEF_DELETE m_kafkaProducer; + m_kafkaProducer = GUCEF_NULL; + GUCEF_DELETE m_kafkaProducerConf; + m_kafkaProducerConf = GUCEF_NULL; } /*-------------------------------------------------------------------------*/ @@ -166,8 +174,8 @@ CKafkaPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& feature features.supportsTopicIndexBasedBookmark = true; // Offsets (index) is the native Kafka "bookmark"method and thus preferred features.supportsMsgDateTimeBasedBookmark = true; // We support this via code that converts the DateTime to offsets features.supportsDerivingBookmarkFromMsg = true; // Supported via a BSOD on the message's index field currently - features.supportsDiscoveryOfAvailableTopics = false; // @TODO: not implemented yet - features.supportsGlobPatternTopicNames = false; + features.supportsDiscoveryOfAvailableTopics = true; // Implemented via RdKafka producer metadata api + features.supportsGlobPatternTopicNames = true; features.supportsSubscriptionMsgArrivalDelayRequests = true; // We support a backoff of the consume() event processing features.supportsSubscriptionEndOfDataEvent = false; // @TODO: needs work return true; @@ -175,25 +183,49 @@ CKafkaPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& feature /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CKafkaPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; - CKafkaPubSubClientTopicPtr topicAccess; + if ( topicConfig.IsNULL() ) + return PUBSUB::CPubSubClientTopicPtr(); + + PUBSUB::CPubSubClientTopicBasicPtr topicAccess; { MT::CScopeMutex lock( m_lock ); - topicAccess = ( GUCEF_NEW CKafkaPubSubClientTopic( this ) )->CreateSharedPtr(); - if ( topicAccess->LoadConfig( *topicConfig ) ) + // Check to see if this logical/conceptual 'topic' represents multiple pattern matched Kafka topics + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicConfig->topicName.HasChar( '*' ) > -1 ) { - m_topicMap[ topicConfig->topicName ] = topicAccess; - RegisterTopicEventHandlers( topicAccess ); + PubSubClientTopicSet allTopicAccess; + if ( CreateMultiTopicAccess( topicConfig, allTopicAccess, pulseGenerator ) && !allTopicAccess.empty() ) + { + // Caller should really use the CreateMultiTopicAccess() variant + topicAccess = *(allTopicAccess.begin()); + } } else { - topicAccess->Shutdown(); - topicAccess.Unlink(); + CKafkaPubSubClientTopicPtr kafkaTopicAccess = ( GUCEF_NEW CKafkaPubSubClientTopic( this ) )->CreateSharedPtr(); + if ( kafkaTopicAccess->LoadConfig( *topicConfig ) ) + { + topicAccess = kafkaTopicAccess; + m_topicMap[ topicConfig->topicName ] = kafkaTopicAccess; + + ConfigureJournal( topicAccess, topicConfig ); + PUBSUB::CIPubSubJournalBasicPtr journal = topicAccess->GetJournal(); + if ( !journal.IsNULL() && topicConfig->journalConfig.useJournal ) + journal->AddTopicCreatedJournalEntry(); + + RegisterTopicEventHandlers( kafkaTopicAccess ); + } + else + { + kafkaTopicAccess->Shutdown(); + kafkaTopicAccess.Unlink(); + } } } @@ -208,7 +240,7 @@ CKafkaPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topic /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CKafkaPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -219,7 +251,7 @@ CKafkaPubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ @@ -240,6 +272,189 @@ CKafkaPubSubClient::GetAllCreatedTopicAccess( PubSubClientTopicSet& topicAccess /*-------------------------------------------------------------------------*/ +bool +CKafkaPubSubClient::GetMultiTopicAccess( const CORE::CString& topicName , + PubSubClientTopicSet& topicAccess ) +{GUCEF_TRACE; + + // Check to see if this logical/conceptual 'topic' name represents multiple pattern matched Redis Streams + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicName.HasChar( '*' ) > -1 ) + { + // We create the actual topic objects from the wildcard glob pattern topic which is used + // as a template. As such we need to match the pattern again to find the various topics that could have been spawned + bool matchesFound = false; + TTopicMap::iterator i = m_topicMap.begin(); + while ( i != m_topicMap.end() ) + { + if ( (*i).first.WildcardEquals( topicName, '*', true ) ) + { + topicAccess.insert( (*i).second ); + matchesFound = true; + } + ++i; + } + return matchesFound; + } + else + { + TTopicMap::iterator i = m_topicMap.find( topicName ); + if ( i != m_topicMap.end() ) + { + topicAccess.insert( (*i).second ); + return true; + } + return false; + } +} + +/*-------------------------------------------------------------------------*/ + +bool +CKafkaPubSubClient::GetMultiTopicAccess( const CORE::CString::StringSet& topicNames , + PubSubClientTopicSet& topicAccess ) +{GUCEF_TRACE; + + bool totalSuccess = true; + CORE::CString::StringSet::const_iterator i = topicNames.begin(); + while ( i != topicNames.end() ) + { + totalSuccess = GetMultiTopicAccess( (*i), topicAccess ) && totalSuccess; + ++i; + } + return totalSuccess; +} + +/*-------------------------------------------------------------------------*/ + +bool +CKafkaPubSubClient::AutoCreateMultiTopicAccess( const TTopicConfigPtrToStringSetMap& topicsToCreate , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + CORE::UInt32 newTopicAccessCount = 0; + bool totalSuccess = true; + { + MT::CObjectScopeLock lock( this ); + + TTopicConfigPtrToStringSetMap::const_iterator m = topicsToCreate.begin(); + while ( m != topicsToCreate.end() ) + { + PUBSUB::CPubSubClientTopicConfigPtr templateTopicConfig( ((*m).first) ); + if ( !templateTopicConfig.IsNULL() ) + { + const CORE::CString::StringSet& topicNameList = (*m).second; + + CORE::CString::StringSet::const_iterator i = topicNameList.begin(); + while ( i != topicNameList.end() ) + { + CKafkaPubSubClientTopicConfigPtr topicConfig = CKafkaPubSubClientTopicConfig::CreateSharedObj(); + topicConfig->LoadConfig( *templateTopicConfig.GetPointerAlways() ); + topicConfig->topicName = (*i); + + CKafkaPubSubClientTopicPtr tAccess; + { + MT::CObjectScopeLock lock( this ); + + tAccess = ( GUCEF_NEW CKafkaPubSubClientTopic( this ) )->CreateSharedPtr(); + if ( tAccess->LoadConfig( *topicConfig ) ) + { + m_topicMap[ topicConfig->topicName ] = tAccess; + topicAccess.insert( tAccess ); + m_config.topics.push_back( topicConfig ); + ++newTopicAccessCount; + + ConfigureJournal( tAccess, topicConfig ); + PUBSUB::CIPubSubJournalBasicPtr journal = tAccess->GetJournal(); + if ( !journal.IsNULL() && topicConfig->journalConfig.useJournal ) + journal->AddTopicCreatedJournalEntry(); + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created topic \"" + + topicConfig->topicName + "\" based on template config \"" + templateTopicConfig->topicName + "\"" ); + } + else + { + tAccess.Unlink(); + totalSuccess = false; + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Failed to load config for topic \"" + + topicConfig->topicName + "\" based on template config \"" + templateTopicConfig->topicName + "\"" ); + } + } + ++i; + } + } + ++m; + } + } + + if ( newTopicAccessCount > 0 ) + { + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created " + + CORE::ToString( newTopicAccessCount ) + " topics based on template configs" ); + + TopicsAccessAutoCreatedEventData eData( topicAccess ); + NotifyObservers( TopicsAccessAutoCreatedEvent, &eData ); + } + + return totalSuccess; +} + +/*-------------------------------------------------------------------------*/ + +bool +CKafkaPubSubClient::AutoCreateMultiTopicAccess( CKafkaPubSubClientTopicConfigPtr templateTopicConfig , + const CORE::CString::StringSet& topicNameList , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + TTopicConfigPtrToStringSetMap topicToCreate; + topicToCreate[ templateTopicConfig ] = topicNameList; + return AutoCreateMultiTopicAccess( topicToCreate, topicAccess, pulseGenerator ); +} + +/*-------------------------------------------------------------------------*/ + +bool +CKafkaPubSubClient::CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicConfig->topicName.HasChar( '*' ) > -1 ) + { + CORE::CString::StringSet topicNameList; + CORE::CString::StringSet globPatternFilters; + + globPatternFilters.insert( topicConfig->topicName ); + + if ( BeginTopicDiscovery( globPatternFilters ) ) + { + //m_streamIndexingTimer->SetEnabled( true ); + + if ( !topicNameList.empty() ) + return AutoCreateMultiTopicAccess( topicConfig, topicNameList, topicAccess, pulseGenerator ); + return true; // Since its pattern based potential creation at a later time also counts as success + } + return false; + } + else + { + PUBSUB::CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + if ( !tAccess.IsNULL() ) + { + topicAccess.insert( tAccess ); + return true; + } + } + return false; +} + +/*-------------------------------------------------------------------------*/ + void CKafkaPubSubClient::DestroyTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -320,7 +535,28 @@ bool CKafkaPubSubClient::BeginTopicDiscovery( const CORE::CString::StringSet& globPatternFilters ) {GUCEF_TRACE; - return false; + try + { + RdKafka::Metadata* metaDataInfo = GUCEF_NULL; + RdKafka::ErrorCode errorCode = m_kafkaProducer->metadata( true, NULL, &metaDataInfo, 10000 ); + if ( RdKafka::ErrorCode::ERR_NO_ERROR == errorCode ) + { + + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClient:BeginTopicDiscovery: Failed to obtain metadata from Kafka" ); + } + + delete metaDataInfo; + metaDataInfo = GUCEF_NULL; + + return true; + } + catch ( const std::exception& ) + { + return false; + } } /*-------------------------------------------------------------------------*/ @@ -366,6 +602,59 @@ CKafkaPubSubClient::GetType( void ) const /*-------------------------------------------------------------------------*/ +bool +CKafkaPubSubClient::SetupKafkaMetaDataAccess( void ) +{GUCEF_TRACE; + + if ( m_config.desiredFeatures.supportsDiscoveryOfAvailableTopics ) + { + // The library wants the addresses as a csv list on its config obj + // we convert and prep as such + CORE::CString csvKafkaBrokerList; + PUBSUB::CPubSubClientConfig::THostAddressVector::const_iterator h = m_config.remoteAddresses.begin(); + while ( h != m_config.remoteAddresses.end() ) + { + // The RdKafka library will re-resolve DNSs on reconnects so we should feed it DNSs + csvKafkaBrokerList += (*h).HostnameAndPortAsString() + ','; + ++h; + } + + std::string errStr; + + RdKafka::Conf* kafkaConf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + kafkaConf->set( "metadata.broker.list", csvKafkaBrokerList, errStr ); + + CKafkaPubSubClientConfig::StringMap::const_iterator m = m_config.kafkaProducerGlobalConfigSettings.begin(); + while ( m != m_config.kafkaProducerGlobalConfigSettings.end() ) + { + if (RdKafka::Conf::CONF_OK != kafkaConf->set((*m).first, (*m).second, errStr ) ) + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClient:LoadConfig: Failed to set Kafka Producer global config entry \"" + + (*m).first + "\"=\"" + (*m).second + "\", error message: " + errStr ); + ++m_kafkaErrorReplies; + return false; + } + ++m; + } + GUCEF_DELETE m_kafkaProducerConf; + m_kafkaProducerConf = kafkaConf; + + RdKafka::Producer* producer = RdKafka::Producer::create( m_kafkaProducerConf, errStr ); + if ( producer == GUCEF_NULL ) + { + GUCEF_ERROR_LOG(CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClient:LoadConfig: Failed to create Kafka producer, error message: " + errStr ); + ++m_kafkaErrorReplies; + return false; + } + m_kafkaProducer = producer; + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient:LoadConfig: Successfully created Kafka producer" ); + } + + return true; +} + +/*-------------------------------------------------------------------------*/ + bool CKafkaPubSubClient::SetupBasedOnConfig( void ) {GUCEF_TRACE; @@ -394,6 +683,8 @@ CKafkaPubSubClient::SetupBasedOnConfig( void ) m_config.metricsPrefix += "kafka."; + SetupKafkaMetaDataAccess(); + return true; } diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp index 5d46e73e5..384ab7df4 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClientTopic.cpp @@ -667,15 +667,24 @@ CKafkaPubSubClientTopic::SetupBasedOnConfig( void ) // Check for the mandatory group ID property. // This may have already been set by the generic custom property setting that occured above std::string confValue; - m_kafkaConsumerConf->get( "group.id", confValue ); - if ( confValue.empty() ) + RdKafka::Conf::ConfResult configResult = m_kafkaConsumerConf->get( "group.id", confValue ); + if ( RdKafka::Conf::ConfResult::CONF_OK != configResult || + confValue.empty() || + confValue.size() == 0 ) { - if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.id", m_config.consumerGroupName, errStr ) ) + if ( !m_config.consumerGroupName.IsNULLOrEmpty() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer group id to \"" + - m_config.consumerGroupName + "\", error message: " + errStr ); - ++m_kafkaErrorReplies; - return false; + if ( RdKafka::Conf::CONF_OK != m_kafkaConsumerConf->set( "group.id", m_config.consumerGroupName, errStr ) ) + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: Failed to set Kafka consumer group id to \"" + + m_config.consumerGroupName + "\", error message: " + errStr ); + ++m_kafkaErrorReplies; + return false; + } + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "KafkaPubSubClientTopic:LoadConfig: consumerGroupName is mandatory but not configured. check the config" ); } } diff --git a/plugins/PUBSUB/pubsubpluginMSMQ/include/pubsubpluginMSMQ_CMsmqPubSubClient.h b/plugins/PUBSUB/pubsubpluginMSMQ/include/pubsubpluginMSMQ_CMsmqPubSubClient.h index dc33f9387..1679ed8d5 100644 --- a/plugins/PUBSUB/pubsubpluginMSMQ/include/pubsubpluginMSMQ_CMsmqPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginMSMQ/include/pubsubpluginMSMQ_CMsmqPubSubClient.h @@ -89,10 +89,10 @@ class PUBSUBPLUGIN_MSMQ_PLUGIN_PRIVATE_CPP CMsmqPubSubClient : public PUBSUB::CP virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginMSMQ/src/pubsubpluginMSMQ_CMsmqPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginMSMQ/src/pubsubpluginMSMQ_CMsmqPubSubClient.cpp index 46c2fd2d5..febe463f2 100644 --- a/plugins/PUBSUB/pubsubpluginMSMQ/src/pubsubpluginMSMQ_CMsmqPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginMSMQ/src/pubsubpluginMSMQ_CMsmqPubSubClient.cpp @@ -207,7 +207,7 @@ CMsmqPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& features /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CMsmqPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -240,7 +240,7 @@ CMsmqPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicC /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CMsmqPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -251,7 +251,7 @@ CMsmqPubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.h b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.h index c3ab42e3e..ec3241333 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.h @@ -102,10 +102,10 @@ class PUBSUBPLUGIN_REDISCLUSTER_PLUGIN_PRIVATE_CPP CRedisClusterPubSubClient : p virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual bool GetMultiTopicAccess( const CORE::CString& topicName , PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp index 54d8aceaf..ce3b2aa7f 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp @@ -242,7 +242,7 @@ CRedisClusterPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CRedisClusterPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -255,7 +255,7 @@ CRedisClusterPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPt if ( CreateMultiTopicAccess( topicConfig, allTopicAccess, pulseGenerator ) && !allTopicAccess.empty() ) { // Caller should really use the CreateMultiTopicAccess() variant - PUBSUB::CPubSubClientTopicPtr tAccess( *(allTopicAccess.begin()) ); + PUBSUB::CPubSubClientTopicBasicPtr tAccess = *allTopicAccess.begin(); return tAccess; } return PUBSUB::CPubSubClientTopicPtr(); @@ -278,6 +278,8 @@ CRedisClusterPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPt GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClient(" + CORE::PointerToString( this ) + "):CreateTopicAccess: created topic access for topic \"" + topicConfig->topicName + "\"" ); + lock.EarlyUnlock(); + TopicAccessCreatedEventData eData( topicConfig->topicName ); NotifyObservers( TopicAccessCreatedEvent, &eData ); } @@ -292,7 +294,7 @@ CRedisClusterPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPt /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CRedisClusterPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -303,7 +305,7 @@ CRedisClusterPubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ @@ -491,7 +493,7 @@ CRedisClusterPubSubClient::CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicCon } else { - PUBSUB::CPubSubClientTopicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + PUBSUB::CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); if ( !tAccess.IsNULL() ) { topicAccess.insert( tAccess ); diff --git a/plugins/PUBSUB/pubsubpluginSTORAGE/include/pubsubpluginSTORAGE_CStoragePubSubClient.h b/plugins/PUBSUB/pubsubpluginSTORAGE/include/pubsubpluginSTORAGE_CStoragePubSubClient.h index d4c5b55eb..bd92c2406 100644 --- a/plugins/PUBSUB/pubsubpluginSTORAGE/include/pubsubpluginSTORAGE_CStoragePubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginSTORAGE/include/pubsubpluginSTORAGE_CStoragePubSubClient.h @@ -90,8 +90,8 @@ class PUBSUBPLUGIN_STORAGE_PLUGIN_PRIVATE_CPP CStoragePubSubClient : public PUBS virtual PUBSUB::CPubSubClientTopicConfigPtr GetOrCreateTopicConfig( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; virtual bool GetMultiTopicAccess( const CORE::CString& topicName , PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; @@ -114,7 +114,7 @@ class PUBSUBPLUGIN_STORAGE_PLUGIN_PRIVATE_CPP CStoragePubSubClient : public PUBS void AutoDestroyTopicAccess( const CORE::CString::StringSet& topicNames ); - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginSTORAGE/src/pubsubpluginSTORAGE_CStoragePubSubClient.cpp b/plugins/PUBSUB/pubsubpluginSTORAGE/src/pubsubpluginSTORAGE_CStoragePubSubClient.cpp index 1013035a5..b2bafe53f 100644 --- a/plugins/PUBSUB/pubsubpluginSTORAGE/src/pubsubpluginSTORAGE_CStoragePubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginSTORAGE/src/pubsubpluginSTORAGE_CStoragePubSubClient.cpp @@ -318,7 +318,7 @@ CStoragePubSubClient::GetMultiTopicAccess( const CORE::CString::StringSet& topic /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CStoragePubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -530,7 +530,7 @@ CStoragePubSubClient::CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicConfigPt { GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "StoragePubSubClient:CreateMultiTopicAccess: Interpreting topic name litterally: " + topicConfig->topicName ); - PUBSUB::CPubSubClientTopicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + PUBSUB::CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); if ( !tAccess.IsNULL() ) { topicAccess.insert( tAccess ); @@ -602,7 +602,7 @@ CStoragePubSubClient::AutoDestroyTopicAccess( const CORE::CString::StringSet& to /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CStoragePubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -613,7 +613,7 @@ CStoragePubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ diff --git a/plugins/PUBSUB/pubsubpluginTEST/include/pubsubpluginTEST_CTestPubSubClient.h b/plugins/PUBSUB/pubsubpluginTEST/include/pubsubpluginTEST_CTestPubSubClient.h index 6e3c89cdf..3fc65e4b3 100644 --- a/plugins/PUBSUB/pubsubpluginTEST/include/pubsubpluginTEST_CTestPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginTEST/include/pubsubpluginTEST_CTestPubSubClient.h @@ -97,8 +97,8 @@ class PUBSUBPLUGIN_TEST_PLUGIN_PRIVATE_CPP CTestPubSubClient : public STORAGE::C */ virtual bool LoadConfig( const PUBSUB::CPubSubClientConfig& cfg ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; CTestPubSubClientConfig& GetConfig( void ); diff --git a/plugins/PUBSUB/pubsubpluginTEST/src/pubsubpluginTEST_CTestPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginTEST/src/pubsubpluginTEST_CTestPubSubClient.cpp index c4fbe26ab..22b0cde2f 100644 --- a/plugins/PUBSUB/pubsubpluginTEST/src/pubsubpluginTEST_CTestPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginTEST/src/pubsubpluginTEST_CTestPubSubClient.cpp @@ -102,7 +102,7 @@ CTestPubSubClient::GetConfig( void ) /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CTestPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; diff --git a/plugins/PUBSUB/pubsubpluginUDP/include/pubsubpluginUDP_CUdpPubSubClient.h b/plugins/PUBSUB/pubsubpluginUDP/include/pubsubpluginUDP_CUdpPubSubClient.h index 852c77077..c9b12f82c 100644 --- a/plugins/PUBSUB/pubsubpluginUDP/include/pubsubpluginUDP_CUdpPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginUDP/include/pubsubpluginUDP_CUdpPubSubClient.h @@ -93,10 +93,10 @@ class PUBSUBPLUGIN_UDP_PLUGIN_PRIVATE_CPP CUdpPubSubClient : public PUBSUB::CPub virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginUDP/src/pubsubpluginUDP_CUdpPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginUDP/src/pubsubpluginUDP_CUdpPubSubClient.cpp index bb8b8bbf5..3e8825041 100644 --- a/plugins/PUBSUB/pubsubpluginUDP/src/pubsubpluginUDP_CUdpPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginUDP/src/pubsubpluginUDP_CUdpPubSubClient.cpp @@ -188,7 +188,7 @@ CUdpPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& features /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CUdpPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -230,7 +230,7 @@ CUdpPubSubClient::GetDefaultTopicConfig( void ) /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CUdpPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; diff --git a/plugins/PUBSUB/pubsubpluginWEB/include/pubsubpluginWEB_CWebPubSubClient.h b/plugins/PUBSUB/pubsubpluginWEB/include/pubsubpluginWEB_CWebPubSubClient.h index d8f9204a5..ef675a92f 100644 --- a/plugins/PUBSUB/pubsubpluginWEB/include/pubsubpluginWEB_CWebPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginWEB/include/pubsubpluginWEB_CWebPubSubClient.h @@ -89,10 +89,10 @@ class PUBSUBPLUGIN_WEB_PLUGIN_PRIVATE_CPP CWebPubSubClient : public PUBSUB::CPub virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , - CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; - virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; diff --git a/plugins/PUBSUB/pubsubpluginWEB/src/pubsubpluginWEB_CWebPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginWEB/src/pubsubpluginWEB_CWebPubSubClient.cpp index e2e5da156..79e91c4ee 100644 --- a/plugins/PUBSUB/pubsubpluginWEB/src/pubsubpluginWEB_CWebPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginWEB/src/pubsubpluginWEB_CWebPubSubClient.cpp @@ -183,7 +183,7 @@ CWebPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& features /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CWebPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; @@ -225,7 +225,7 @@ CWebPubSubClient::GetDefaultTopicConfig( void ) /*-------------------------------------------------------------------------*/ -PUBSUB::CPubSubClientTopicPtr +PUBSUB::CPubSubClientTopicBasicPtr CWebPubSubClient::GetTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; @@ -236,7 +236,7 @@ CWebPubSubClient::GetTopicAccess( const CORE::CString& topicName ) { return (*i).second; } - return PUBSUB::CPubSubClientTopicPtr(); + return PUBSUB::CPubSubClientTopicBasicPtr(); } /*-------------------------------------------------------------------------*/ diff --git a/projects/CMake/RunCMake_Shared_VS2022_Win64_ProcessMetrics.bat b/projects/CMake/RunCMake_Shared_VS2022_Win64_ProcessMetrics.bat new file mode 100644 index 000000000..267dfee1f --- /dev/null +++ b/projects/CMake/RunCMake_Shared_VS2022_Win64_ProcessMetrics.bat @@ -0,0 +1,9 @@ +CALL CMakeCommon.bat + +ECHO *** Set VS2022 specifics and run CMake *** + +SET MAINCMAKE=%SRCROOTDIR%\projects\CMake\targets\GUCEF_exe_ProcessMetrics +SET VS22_OUTPUTDIR="%OUTPUTDIR%\VS2022_x64_ProcessMetrics" + +CMake.exe -DBUILD_SHARED_LIBS=ON -G"Visual Studio 17 2022" -A x64 -H"%MAINCMAKE%" -B%VS22_OUTPUTDIR% +PAUSE \ No newline at end of file diff --git a/tools/ProcessMetrics/include/ProcessMetrics.h b/tools/ProcessMetrics/include/ProcessMetrics.h index b622f8095..bfea4f0ec 100644 --- a/tools/ProcessMetrics/include/ProcessMetrics.h +++ b/tools/ProcessMetrics/include/ProcessMetrics.h @@ -281,7 +281,7 @@ class ProcessMetrics : public CORE::CObservingNotifier CORE::CTimer m_metricsTimer; CORE::CTimer m_procIndexTimer; PUBSUB::CPubSubClientFactory::TProductPtr m_pubSubClient; - PUBSUB::CPubSubClientTopicPtr m_thresholdNotificationPublishTopic; + PUBSUB::CPubSubClientTopicBasicPtr m_thresholdNotificationPublishTopic; PUBSUB::CPubSubClientFeatures m_pubSubFeatures; CORE::CString m_thresholdNotificationPrimaryPayloadCodecType; bool m_gatherMemStats; diff --git a/tools/ProcessMetrics/src/ProcessMetrics.cpp b/tools/ProcessMetrics/src/ProcessMetrics.cpp index 81c97ea55..90f9d62f3 100644 --- a/tools/ProcessMetrics/src/ProcessMetrics.cpp +++ b/tools/ProcessMetrics/src/ProcessMetrics.cpp @@ -487,7 +487,7 @@ ProcessMetrics::ProcessMetrics( void ) , m_metricsTimer() , m_procIndexTimer() , m_pubSubClient() - , m_thresholdNotificationPublishTopic( GUCEF_NULL ) + , m_thresholdNotificationPublishTopic() , m_pubSubFeatures() , m_thresholdNotificationPrimaryPayloadCodecType() , m_gatherMemStats( true ) diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json b/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json index f9c6a4993..a98a3a530 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/kafka2storage.json @@ -43,6 +43,8 @@ "needSubscribeSupport": true, "preferDedicatedConnection": false, "topicName": "ExampleKafkaTopicName", + "consumerGroupName": "pubsubcapture", + "consumerName": "", "CustomConfig": { "addProducerHostnameAsKafkaMsgHeader": false, "consumerModeStartOffset": "", diff --git a/tools/pubsub2storage/include/pubsub2storage.h b/tools/pubsub2storage/include/pubsub2storage.h index 7c97f801c..f57c37aa3 100644 --- a/tools/pubsub2storage/include/pubsub2storage.h +++ b/tools/pubsub2storage/include/pubsub2storage.h @@ -254,12 +254,12 @@ class CPubSubClientChannel : public CORE::CTaskConsumer typedef std::map< CORE::UInt64, PUBSUB::CIPubSubMsg::TNoLockSharedPtr > TUInt64ToIPubSubMsgNoLockSharedPtrMap; - PUBSUB::CPubSubClientTopicPtr topic; /**< the actual backend topic access object */ + PUBSUB::CPubSubClientTopicBasicPtr topic; /**< the actual backend topic access object */ PUBSUB::CPubSubClientTopic::TPublishActionIdVector currentPublishActionIds; /**< temp placeholder to help prevent allocations per invocation */ TUInt64ToIPubSubMsgNoLockSharedPtrMap inFlightMsgs; TopicLink( void ); - TopicLink( PUBSUB::CPubSubClientTopicPtr t ); + TopicLink( PUBSUB::CPubSubClientTopicBasicPtr t ); void AddInFlightMsgs( const PUBSUB::CPubSubClientTopic::TPublishActionIdVector& publishActionIds , const PUBSUB::CPubSubClientTopic::TIPubSubMsgSPtrVector& msgs ); diff --git a/tools/pubsub2storage/src/pubsub2storage.cpp b/tools/pubsub2storage/src/pubsub2storage.cpp index 9cad8b142..6b12b0267 100644 --- a/tools/pubsub2storage/src/pubsub2storage.cpp +++ b/tools/pubsub2storage/src/pubsub2storage.cpp @@ -337,7 +337,7 @@ CPubSubClientChannel::~CPubSubClientChannel() /*-------------------------------------------------------------------------*/ CPubSubClientChannel::TopicLink::TopicLink( void ) - : topic( GUCEF_NULL ) + : topic() , currentPublishActionIds() , inFlightMsgs() {GUCEF_TRACE; @@ -346,7 +346,7 @@ CPubSubClientChannel::TopicLink::TopicLink( void ) /*-------------------------------------------------------------------------*/ -CPubSubClientChannel::TopicLink::TopicLink( PUBSUB::CPubSubClientTopicPtr t ) +CPubSubClientChannel::TopicLink::TopicLink( PUBSUB::CPubSubClientTopicBasicPtr t ) : topic( t ) , currentPublishActionIds() , inFlightMsgs() @@ -632,7 +632,7 @@ CPubSubClientChannel::ConnectPubSubClient( bool reset ) ChannelSettings::TTopicConfigVector::iterator i = m_channelSettings.pubsubClientConfig.topics.begin(); while ( i != m_channelSettings.pubsubClientConfig.topics.end() ) { - PUBSUB::CPubSubClientTopicPtr topic = m_pubsubClient->CreateTopicAccess( (*i) ); + PUBSUB::CPubSubClientTopicBasicPtr topic = m_pubsubClient->CreateTopicAccess( (*i) ); if ( topic.IsNULL() ) { if ( !(*i)->isOptional ) @@ -657,7 +657,7 @@ CPubSubClientChannel::ConnectPubSubClient( bool reset ) while ( t != m_topics.end() ) { TopicLink& topicLink = (*t); - PUBSUB::CPubSubClientTopicPtr topic = topicLink.topic; + PUBSUB::CPubSubClientTopicBasicPtr topic = topicLink.topic; if ( topic->InitializeConnectivity( reset ) ) { @@ -842,7 +842,7 @@ CPubSubClientChannel::TransmitNextPubSubMsgBuffer( void ) while ( i != m_topics.end() ) { TopicLink& topicLink = (*i); - PUBSUB::CPubSubClientTopicPtr topic = topicLink.topic; + PUBSUB::CPubSubClientTopicBasicPtr topic = topicLink.topic; if ( GUCEF_NULL != topic ) {