Skip to content

Commit

Permalink
- pubsub: switch interface to use basic ptr instead since multi-topic…
Browse files Browse the repository at this point in the history
… interfaces use that and it was inconsistent

- pubsub kafka: initial wip code to support topic discovery
  • Loading branch information
LiberatorUSA committed Mar 5, 2024
1 parent 6f4d1a4 commit c9ad6b7
Show file tree
Hide file tree
Showing 27 changed files with 489 additions and 128 deletions.
16 changes: 8 additions & 8 deletions platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClient : public CORE::CTSGNotifier

virtual bool GetSupportedFeatures( CPubSubClientFeatures& features ) const = 0;

virtual CPubSubClientTopicPtr CreateTopicAccess( CPubSubClientTopicConfigPtr topicConfig ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) = 0;
virtual CPubSubClientTopicBasicPtr CreateTopicAccess( CPubSubClientTopicConfigPtr topicConfig ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) = 0;

/**
* Same as the version that takes an entire config except the expectation here is that the topic
* is already configured via a CPubSubClientConfig but not yet instantiated
* This would be the typical case when using global app config defined topics and not programatic topic access
*/
virtual CPubSubClientTopicPtr CreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() );
virtual CPubSubClientTopicBasicPtr CreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() );

virtual CPubSubClientTopicPtr GetTopicAccess( const CString& topicName ) = 0;
virtual CPubSubClientTopicBasicPtr GetTopicAccess( const CString& topicName ) = 0;

virtual CPubSubClientTopicPtr GetOrCreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() );
virtual CPubSubClientTopicBasicPtr GetOrCreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() );

virtual bool GetMultiTopicAccess( CPubSubClientTopicConfigPtr topicConfig ,
PubSubClientTopicSet& topicAccess );
Expand Down Expand Up @@ -317,7 +317,7 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClient : public CORE::CTSGNotifier

virtual bool ConfigureJournal( CPubSubClientConfigPtr clientConfig );

virtual bool ConfigureJournal( CPubSubClientTopicPtr topic ,
virtual bool ConfigureJournal( CPubSubClientTopicBasicPtr topic ,
CPubSubClientTopicConfigPtr topicConfig );

private:
Expand Down
14 changes: 7 additions & 7 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ CPubSubClient::SetPulseGenerator( CORE::PulseGeneratorPtr newPulseGenerator )

/*-------------------------------------------------------------------------*/

CPubSubClientTopicPtr
CPubSubClientTopicBasicPtr
CPubSubClient::GetOrCreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator )
{GUCEF_TRACE;

MT::CObjectScopeLock lock( this );

CPubSubClientTopicPtr topicAccess = GetTopicAccess( topicName );
CPubSubClientTopicBasicPtr topicAccess = GetTopicAccess( topicName );
if ( topicAccess.IsNULL() )
{
topicAccess = CreateTopicAccess( topicName, pulseGenerator );
Expand All @@ -235,7 +235,7 @@ CPubSubClient::GetOrCreateTopicAccess( const CString& topicName ,

/*-------------------------------------------------------------------------*/

CPubSubClientTopicPtr
CPubSubClientTopicBasicPtr
CPubSubClient::CreateTopicAccess( const CString& topicName ,
CORE::PulseGeneratorPtr pulseGenerator )
{GUCEF_TRACE;
Expand All @@ -245,7 +245,7 @@ CPubSubClient::CreateTopicAccess( const CString& topicName ,
CPubSubClientTopicConfigPtr topicConfig = GetOrCreateTopicConfig( topicName );
if ( !topicConfig.IsNULL() )
{
CPubSubClientTopicPtr topicAccess = CreateTopicAccess( topicConfig, pulseGenerator );
CPubSubClientTopicBasicPtr topicAccess = CreateTopicAccess( topicConfig, pulseGenerator );
return topicAccess;
}
return CPubSubClientTopicPtr();
Expand Down Expand Up @@ -275,7 +275,7 @@ CPubSubClient::GetMultiTopicAccess( const CString& topicName ,
// As such it redirects to the basic GetTopicAccess()
// Backends should override this if they support pattern matching access

CPubSubClientTopicPtr tAccess = GetTopicAccess( topicName );
CPubSubClientTopicBasicPtr tAccess = GetTopicAccess( topicName );
if ( !tAccess.IsNULL() )
{
topicAccess.insert( tAccess );
Expand Down Expand Up @@ -353,7 +353,7 @@ CPubSubClient::CreateMultiTopicAccess( CPubSubClientTopicConfigPtr topicConfig ,
// As such it redirects to the basic CreateTopicAccess()
// Backends should override this if they support pattern matching access

CPubSubClientTopicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator );
CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator );
if ( !tAccess.IsNULL() )
{
topicAccess.insert( tAccess );
Expand Down Expand Up @@ -568,7 +568,7 @@ CPubSubClient::ConfigureJournal( CPubSubClientConfigPtr clientConfig )
/*-------------------------------------------------------------------------*/

bool
CPubSubClient::ConfigureJournal( CPubSubClientTopicPtr topic ,
CPubSubClient::ConfigureJournal( CPubSubClientTopicBasicPtr topic ,
CPubSubClientTopicConfigPtr topicConfig )
{GUCEF_TRACE;

Expand Down
4 changes: 2 additions & 2 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ CPubSubClientSide::OnTopicAccessCreated( CORE::CNotifier* notifier ,
return;

CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData );
CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
if ( !topicAccess.IsNULL() )
{
MT::CScopeWriterLock lock( m_rwdataLock );
Expand Down Expand Up @@ -706,7 +706,7 @@ CPubSubClientSide::OnTopicAccessDestroyed( CORE::CNotifier* notifier ,
}

CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData );
CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
if ( !topicAccess.IsNULL() )
{
MT::CScopeWriterLock lock( m_rwdataLock );
Expand Down
98 changes: 60 additions & 38 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,49 +381,62 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi
if ( !topicAccess.IsNULL() )
{
const CORE::CString& topicName = topicAccess->GetTopicName();
if ( routeConfig->preferFromTopicThreadForDestination && fromPulseGenerator.IsNULL() )
{
fromPulseGenerator = topicAccess->GetPulseGenerator();
if ( fromPulseGenerator.IsNULL() )
fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator();

if ( !fromPulseGenerator.IsNULL() )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, will be using pulse generator for thread " +
CORE::ToString( fromPulseGenerator->GetPulseDriverThreadId() ) );
}
else
if ( topicName.HasChar( '*' ) < 0 )
{
if ( routeConfig->preferFromTopicThreadForDestination && fromPulseGenerator.IsNULL() )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, however no suitable pulse generator is available" );
fromPulseGenerator = topicAccess->GetPulseGenerator();
if ( fromPulseGenerator.IsNULL() )
fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator();

if ( !fromPulseGenerator.IsNULL() )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, will be using pulse generator for thread " +
CORE::ToString( fromPulseGenerator->GetPulseDriverThreadId() ) );
}
else
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Preference given to use 'from' thread for driving destinations as well, however no suitable pulse generator is available" );
}
}
}

CPubSubFlowRouteTopicConfigPtr autoTopicRouteConfig = CPubSubFlowRouteTopicConfig::CreateSharedObj();
autoTopicRouteConfig->fromSideTopicName = topicName;
CPubSubFlowRouteTopicConfigPtr autoTopicRouteConfig = CPubSubFlowRouteTopicConfig::CreateSharedObj();
autoTopicRouteConfig->fromSideTopicName = topicName;

if ( routeConfig->toSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->toSideTopicName = topicName;
else
autoTopicRouteConfig->toSideTopicName = topicRouteConfig->toSideTopicName;
if ( routeConfig->toSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->toSideTopicName = topicName;
else
autoTopicRouteConfig->toSideTopicName = topicRouteConfig->toSideTopicName;

if ( routeConfig->failoverSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->failoverSideTopicName = topicName;
else
autoTopicRouteConfig->failoverSideTopicName = topicRouteConfig->failoverSideTopicName;
if ( routeConfig->failoverSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->failoverSideTopicName = topicName;
else
autoTopicRouteConfig->failoverSideTopicName = topicRouteConfig->failoverSideTopicName;

if ( routeConfig->spilloverSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->spilloverSideTopicName = topicName;
else
autoTopicRouteConfig->spilloverSideTopicName = topicRouteConfig->spilloverSideTopicName;
if ( routeConfig->spilloverSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->spilloverSideTopicName = topicName;
else
autoTopicRouteConfig->spilloverSideTopicName = topicRouteConfig->spilloverSideTopicName;

if ( routeConfig->deadLetterSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->deadLetterSideTopicName = topicName;
else
autoTopicRouteConfig->deadLetterSideTopicName = topicRouteConfig->deadLetterSideTopicName;
if ( routeConfig->deadLetterSideTopicsAutoMatchFromSide )
autoTopicRouteConfig->deadLetterSideTopicName = topicName;
else
autoTopicRouteConfig->deadLetterSideTopicName = topicRouteConfig->deadLetterSideTopicName;

totalSuccess = MatchTopicRouteConfig( autoTopicRouteConfig ,
fromPulseGenerator ,
destPulseGenerator ) && totalSuccess;
totalSuccess = MatchTopicRouteConfig( autoTopicRouteConfig ,
fromPulseGenerator ,
destPulseGenerator ) && totalSuccess;
}
else
{
// Something went wrong. We should not get back a topic with a topic mame which still
// has wildcards in it. That will confuse this code. The backend should resolve to non wildcard containing names
totalSuccess = false;
GUCEF_ERROR_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: glob pattern topic \"" +
topicRouteConfig->fromSideTopicName + "\" for side \"" + fromSide->GetSideId() + "\" was matched by the backend to topic \"" +
topicName + "\" which still contains a wildcard. This is not supported." );
}
}
++i;
}
Expand Down Expand Up @@ -456,7 +469,8 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi

if ( fromPulseGenerator.IsNULL() && routeConfig->preferFromTopicThreadForDestination )
{
fromPulseGenerator = topicLinks.fromTopic->GetPulseGenerator();
if ( GUCEF_NULL != topicLinks.fromTopic )
fromPulseGenerator = topicLinks.fromTopic->GetPulseGenerator();
if ( fromPulseGenerator.IsNULL() )
fromPulseGenerator = fromSideClient->GetDefaultTopicPulseGenerator();

Expand All @@ -477,8 +491,16 @@ CPubSubFlowRouter::CRouteInfo::MatchTopicRouteConfig( const CPubSubFlowRouteTopi

if ( topicLinks.fromTopic != oldFromTopic )
{
fromSideTopicLinks.erase( oldFromTopic );
fromSideTopicLinks[ topicLinks.fromTopic ] = &topicLinks;
if ( GUCEF_NULL != oldFromTopic )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Erasing map entry for old topic implementation: " + CORE::ToString( oldFromTopic ) );
fromSideTopicLinks.erase( oldFromTopic );
}
if ( GUCEF_NULL != topicLinks.fromTopic )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouter:RouteInfo:MatchTopicRouteConfig: Adding map entry for new topic implementation: " + CORE::ToString( topicLinks.fromTopic ) );
fromSideTopicLinks[ topicLinks.fromTopic ] = &topicLinks;
}
}

if ( GUCEF_NULL != toSide && !topicRouteConfig->toSideTopicName.IsNULLOrEmpty() )
Expand Down Expand Up @@ -2779,7 +2801,7 @@ CPubSubFlowRouter::OnSidePubSubClientTopicCreation( CORE::CNotifier* notifier
return;

CORE::CString topicName = *static_cast< CPubSubClient::TopicAccessCreatedEventData* >( eventData );
CPubSubClientTopicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
CPubSubClientTopicBasicPtr topicAccess = pubsubClient->GetTopicAccess( topicName );
if ( !topicAccess.IsNULL() )
{
// create the topic association but dont connect yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB

virtual PUBSUB::CPubSubClientTopicConfigPtr GetDefaultTopicConfig( void ) GUCEF_VIRTUAL_OVERRIDE;

virtual PUBSUB::CPubSubClientTopicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE;
virtual PUBSUB::CPubSubClientTopicBasicPtr CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig ,
CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE;

virtual PUBSUB::CPubSubClientTopicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE;
virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE;

virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ CAwsSqsPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& featur

/*-------------------------------------------------------------------------*/

PUBSUB::CPubSubClientTopicPtr
PUBSUB::CPubSubClientTopicBasicPtr
CAwsSqsPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig ,
CORE::PulseGeneratorPtr pulseGenerator )
{GUCEF_TRACE;
Expand Down Expand Up @@ -211,7 +211,7 @@ CAwsSqsPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topi

/*-------------------------------------------------------------------------*/

PUBSUB::CPubSubClientTopicPtr
PUBSUB::CPubSubClientTopicBasicPtr
CAwsSqsPubSubClient::GetTopicAccess( const CORE::CString& topicName )
{GUCEF_TRACE;

Expand All @@ -222,7 +222,7 @@ CAwsSqsPubSubClient::GetTopicAccess( const CORE::CString& topicName )
{
return (*i).second;
}
return PUBSUB::CPubSubClientTopicPtr();
return PUBSUB::CPubSubClientTopicBasicPtr();
}

/*-------------------------------------------------------------------------*/
Expand Down
Loading

0 comments on commit c9ad6b7

Please sign in to comment.