Skip to content

Commit

Permalink
- pubsub router: add write lock for lazy init of topic routes
Browse files Browse the repository at this point in the history
- pubsub storage: fix topic not respecting per-topic pulseGenerator override on creation
  • Loading branch information
LiberatorUSA committed Feb 20, 2024
1 parent 44251be commit 0b05915
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 20 deletions.
7 changes: 4 additions & 3 deletions platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubFlowRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubFlowRouter : public CORE::CTSGNotifier
CORE::PulseGeneratorPtr destPulseGenerator );
bool MatchTopicRouteConfig( const CPubSubFlowRouteTopicConfigPtr topicRouteConfig );
bool MatchAllTopicRouteConfigs( void );

CRouteTopicLinks* GetTargetTopicLinks( CPubSubClientTopic* fromTopic ,
bool& errorOccured );

CRouteInfo( void );
CRouteInfo( const CRouteInfo& src );
Expand Down Expand Up @@ -317,6 +314,10 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubFlowRouter : public CORE::CTSGNotifier
void DetermineFirstActiveRoute( CRouteInfo& routeInfo, bool attemptConnect );

void DetermineActiveRouteDestination( CRouteInfo& routeInfo, bool attemptConnect );

CRouteTopicLinks* GetTargetTopicLinks( CPubSubClientTopic* fromTopic ,
CRouteInfo& routeInfo ,
bool& errorOccured );

private:

Expand Down
25 changes: 14 additions & 11 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,37 +254,40 @@ CPubSubFlowRouter::CRouteInfo::SwitchAllTopicLinksActiveTopic( RouteType activeS
/*-------------------------------------------------------------------------*/

CPubSubFlowRouter::CRouteTopicLinks*
CPubSubFlowRouter::CRouteInfo::GetTargetTopicLinks( CPubSubClientTopic* fromTopic ,
bool& errorOccured )
CPubSubFlowRouter::GetTargetTopicLinks( CPubSubClientTopic* fromTopic ,
CRouteInfo& routeInfo ,
bool& errorOccured )
{GUCEF_TRACE;

errorOccured = false;

TTopicRawPtrToRouteTopicLinksRawPtrMap::iterator t = fromSideTopicLinks.find( fromTopic );
if ( t != fromSideTopicLinks.end() )
TTopicRawPtrToRouteTopicLinksRawPtrMap::iterator t = routeInfo.fromSideTopicLinks.find( fromTopic );
if ( t != routeInfo.fromSideTopicLinks.end() )
{
return (*t).second;
}
else
{
// Check if we need to worry about auto matching topics
if ( fromTopic != GUCEF_NULL &&
!routeConfig.IsNULL() &&
routeConfig->IsAnyAutoTopicMatchingNeeded() )
{
!routeInfo.routeConfig.IsNULL() &&
routeInfo.routeConfig->IsAnyAutoTopicMatchingNeeded() )
{
// We should always have a target topic considering we are auto matching them and not relying on an explicit map
// Due to topic discovery and auto generation this scenario can happen.
// We will 'lazy init' the mapping now.

GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "RouteInfo:GetTargetTopicLinks: topic links not yet found, will lazy init" );

MT::CScopeWriterLock lock( m_lock );

CORE::PulseGeneratorPtr nullPulseGenerator;
if ( MatchTopicRouteConfig( fromTopic, nullPulseGenerator ) )
if ( routeInfo.MatchTopicRouteConfig( fromTopic, nullPulseGenerator ) )
{
// Now lets try again after lazy matching the topic route configs

t = fromSideTopicLinks.find( fromTopic );
if ( t != fromSideTopicLinks.end() )
t = routeInfo.fromSideTopicLinks.find( fromTopic );
if ( t != routeInfo.fromSideTopicLinks.end() )
{
return (*t).second;
}
Expand Down Expand Up @@ -1640,7 +1643,7 @@ CPubSubFlowRouter::PublishMsgs( CPubSubClientSide* fromSide
// Doing so would mess up sequencing and intermix new and older messages. We should just wait for the spillover egress to finish
if ( !routeInfo.IsSpilloverEgressActive() )
{
CRouteTopicLinks* topicLinks = routeInfo.GetTargetTopicLinks( fromTopic, errorOccured );
CRouteTopicLinks* topicLinks = GetTargetTopicLinks( fromTopic, routeInfo, errorOccured );

switch ( routeType )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class PUBSUBPLUGIN_STORAGE_PLUGIN_PRIVATE_CPP CStoragePubSubClientTopic : public
static bool SyncBookmarkToBookmarkInfo( const PUBSUB::CPubSubBookmark& bookmark ,
CStorageBookmarkInfo& info );

void RegisterEventHandlers( void );
void RegisterEventHandlers( CORE::PulseGeneratorPtr pulseGenerator );

bool SetupToSubscribe( PUBSUB::CPubSubClientTopicConfig& config );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ CStoragePubSubClientTopic::CStoragePubSubClientTopic( CStoragePubSubClient* clie
, m_storageDeserializationFailures( 0 )
{GUCEF_TRACE;

GUCEF_ASSERT( GUCEF_NULL != client );

// Check if an override pulse generator was provided
if ( pulseGenerator.IsNULL() && GUCEF_NULL != client )
{
pulseGenerator = client->GetConfig().pulseGenerator;
SetPulseGenerator( pulseGenerator );
}

m_publishSuccessActionEventData.LinkTo( &m_publishSuccessActionIds );
m_publishFailureActionEventData.LinkTo( &m_publishFailureActionIds );

Expand All @@ -499,7 +508,7 @@ CStoragePubSubClientTopic::CStoragePubSubClientTopic( CStoragePubSubClient* clie
m_noAckRetransmitTimer = GUCEF_NEW CORE::CTimer( pulseGenerator, GUCEF_DEFAULT_NOACK_RETRANSMIT_CHECK_CYCLETIME_IN_MS );
}

RegisterEventHandlers();
RegisterEventHandlers( pulseGenerator );
}

/*-------------------------------------------------------------------------*/
Expand Down Expand Up @@ -557,7 +566,7 @@ CStoragePubSubClientTopic::GetClient( void )
/*-------------------------------------------------------------------------*/

void
CStoragePubSubClientTopic::RegisterEventHandlers( void )
CStoragePubSubClientTopic::RegisterEventHandlers( CORE::PulseGeneratorPtr pulseGenerator )
{GUCEF_TRACE;

if ( GUCEF_NULL != m_reconnectTimer )
Expand Down Expand Up @@ -605,9 +614,9 @@ CStoragePubSubClientTopic::RegisterEventHandlers( void )
callback2 );

TEventCallback callback3( this, &CStoragePubSubClientTopic::OnPulseCycle );
SubscribeTo( m_client->GetConfig().pulseGenerator.GetPointerAlways() ,
CORE::CPulseGenerator::PulseEvent ,
callback3 );
SubscribeTo( pulseGenerator.GetPointerAlways() ,
CORE::CPulseGenerator::PulseEvent ,
callback3 );
}

/*-------------------------------------------------------------------------*/
Expand Down

0 comments on commit 0b05915

Please sign in to comment.