Skip to content

Commit

Permalink
- variant deserializer: fixed incorrect fixed var size given to varia…
Browse files Browse the repository at this point in the history
…nt set function. this bug only impacted cases where a BSOB type would be deserialized which would almost always fail

- pubsub topiclink: moved more bool settings to link to avoid overhead
- pubsub: New setting "needToTrackInFlightPublishedMsgsForAckDefault" which provides a configurable default vs the historical hardcoded default of false. This also switched the default to true.
- pubsub kafka backend: fixed type not being set on msg index which in turn meant that acks (commits) were broken
  • Loading branch information
LiberatorUSA committed Mar 23, 2024
1 parent 52cef87 commit 91e981d
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 92 deletions.
9 changes: 9 additions & 0 deletions platform/gucefCORE/include/gucefCORE_CVariant.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ class GUCEF_CORE_PUBLIC_CPP CVariant
bool IsInitialized( void ) const;
bool IsNULLOrEmpty( void ) const;

/**
* This member function allows you to set the type ID
* In most cases you should not need this as setting/assigning/linking automatically sets the correct type id
* However there can be cases where more direct manipulation of the underlying variant data is needed or occurs
* In such use cases this member function can be used to 'correct' the type to make the class aware of such changes.
*/
void OverrideTypeId( UInt8 typeId );

UInt8 GetTypeId( void ) const;
const char* GetTypeNameC( void ) const;
CString GetTypeName( void ) const;
Expand Down Expand Up @@ -222,6 +230,7 @@ class GUCEF_CORE_PUBLIC_CPP CVariant
* Returns the size of the storage used in bytes by the stored type
*/
UInt32 ByteSize( bool includeNullTerm = true ) const;
static UInt32 ByteSizeOfFixedSizeType( UInt8 varType );

bool operator==( const CVariant& other ) const;
bool operator!=( const CVariant& other ) const;
Expand Down
9 changes: 9 additions & 0 deletions platform/gucefCORE/include/gucefCORE_VariantData.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ TypeNameForGucefTypeId( UInt8 typeId );
GUCEF_CORE_PUBLIC_C UInt8
GucefTypeIdForTypeName( const char* typeName );

/*-------------------------------------------------------------------------*/

/**
* Returns the size of fixed size types based on their type id
* if the type id given is not known or not that of a fixed size type 0 will be returned as the size
*/
GUCEF_CORE_PUBLIC_C UInt32
GucefByteSizeOfFixedSizeType( UInt8 varType );

/*-------------------------------------------------------------------------//
// //
// NAMESPACE //
Expand Down
27 changes: 27 additions & 0 deletions platform/gucefCORE/src/gucefCORE_CVariant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@

#include "gucefCORE_CVariant.h"

#ifndef GUCEF_CORE_LOGGING_H
#include "gucefCORE_Logging.h"
#define GUCEF_CORE_LOGGING_H
#endif /* GUCEF_CORE_LOGGING_H ? */

#ifndef GUCEF_CORE_ESSENTIALS_H
#include "gucef_essentials.h"
#define GUCEF_CORE_ESSENTIALS_H
Expand Down Expand Up @@ -617,6 +622,15 @@ CVariant::IsNULLOrEmpty( void ) const

/*-------------------------------------------------------------------------*/

void
CVariant::OverrideTypeId( UInt8 typeId )
{GUCEF_TRACE;

m_variantData.containedType = typeId;
}

/*-------------------------------------------------------------------------*/

UInt8
CVariant::GetTypeId( void ) const
{GUCEF_TRACE;
Expand Down Expand Up @@ -1075,6 +1089,15 @@ CVariant::AsVoidPtr( const void* defaultIfNeeded ) const

/*-------------------------------------------------------------------------*/

UInt32
CVariant::ByteSizeOfFixedSizeType( UInt8 varType )
{GUCEF_TRACE;

return GucefByteSizeOfFixedSizeType( varType );
}

/*-------------------------------------------------------------------------*/

UInt32
CVariant::ByteSize( bool includeNullTerm ) const
{GUCEF_TRACE;
Expand Down Expand Up @@ -1491,7 +1514,11 @@ CVariant::Set( const void* data, UInt32 dataSize, UInt8 varType, bool linkOnlyFo
case GUCEF_DATATYPE_BINARY_BSOB:
{
if ( GUCEF_NULL == data || dataSize > sizeof(m_variantData.union_data.bsob_data) )
{
GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "Variant(" + CORE::ToString( this ) +
"):Set: Refusing to set BSOB data with ptr " + ToString( data ) + " and size " + ToString( dataSize ) );
return false;
}
memset( m_variantData.union_data.bsob_data, 0, sizeof( m_variantData.union_data.bsob_data ) );
memcpy( m_variantData.union_data.bsob_data, data, dataSize );
m_variantData.containedType = GUCEF_DATATYPE_BINARY_BSOB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ CVariantBinarySerializer::Deserialize( CVariant& var, UInt32 currentSourceOffset
}
else
{
bool result = var.Set( source.GetConstBufferPtr( currentSourceOffset ), source.GetDataSize() - currentSourceOffset, typeId, linkWherePossible );
bool result = var.Set( source.GetConstBufferPtr( currentSourceOffset ), GucefByteSizeOfFixedSizeType( typeId ), typeId, linkWherePossible );
bytesRead += var.ByteSize();
return result;
}
Expand Down
39 changes: 39 additions & 0 deletions platform/gucefCORE/src/gucefCORE_VariantData.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,45 @@ GucefTypeIdForTypeName( const char* typeName )
return GUCEF_DATATYPE_UNKNOWN;
}

/*-------------------------------------------------------------------------*/

UInt32
GucefByteSizeOfFixedSizeType( UInt8 varType )
{
switch ( varType )
{
case GUCEF_DATATYPE_INT8: return sizeof( Int8 );
case GUCEF_DATATYPE_UINT8: return sizeof( UInt8 );
case GUCEF_DATATYPE_LE_INT16: return sizeof( Int16 );
case GUCEF_DATATYPE_BE_INT16: return sizeof( Int16 );
case GUCEF_DATATYPE_LE_UINT16: return sizeof( UInt16 );
case GUCEF_DATATYPE_BE_UINT16: return sizeof( UInt16 );
case GUCEF_DATATYPE_LE_INT32: return sizeof( Int32 );
case GUCEF_DATATYPE_BE_INT32: return sizeof( Int32 );
case GUCEF_DATATYPE_LE_UINT32: return sizeof( UInt32 );
case GUCEF_DATATYPE_BE_UINT32: return sizeof( UInt32 );
case GUCEF_DATATYPE_LE_INT64: return sizeof( Int64 );
case GUCEF_DATATYPE_BE_INT64: return sizeof( Int64 );
case GUCEF_DATATYPE_LE_UINT64: return sizeof( UInt64 );
case GUCEF_DATATYPE_BE_UINT64: return sizeof( UInt64 );
case GUCEF_DATATYPE_LE_FLOAT32: return sizeof( Float32 );
case GUCEF_DATATYPE_BE_FLOAT32: return sizeof( Float32 );
case GUCEF_DATATYPE_LE_FLOAT64: return sizeof( Float64 );
case GUCEF_DATATYPE_BE_FLOAT64: return sizeof( Float64 );
case GUCEF_DATATYPE_BOOLEAN_INT32: return sizeof( Int32 );
case GUCEF_DATATYPE_LE_TIMESTAMP_IN_SECS_SINCE_UNIX_EPOCH: return sizeof( UInt64 );
case GUCEF_DATATYPE_BE_TIMESTAMP_IN_SECS_SINCE_UNIX_EPOCH: return sizeof( UInt64 );
case GUCEF_DATATYPE_LE_TIMESTAMP_IN_MS_SINCE_UNIX_EPOCH: return sizeof( UInt64 );
case GUCEF_DATATYPE_BE_TIMESTAMP_IN_MS_SINCE_UNIX_EPOCH: return sizeof( UInt64 );

case GUCEF_DATATYPE_BINARY_BSOB: return GUCEF_VARIANT_BSOB_SIZE;

case GUCEF_DATATYPE_NULL:
case GUCEF_DATATYPE_NIL:
default: return 0;
}
}

/*-------------------------------------------------------------------------//
// //
// NAMESPACE //
Expand Down
10 changes: 7 additions & 3 deletions platform/gucefPUBSUB/include/gucefPUBSUB_CPubSubClientSide.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientSide : public CORE::CTaskConsumer
const CORE::CEvent& eventId ,
CORE::CICloneable* eventData );

bool ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSubSideSettings ,
CPubSubClientTopicBasicPtr topic ,
bool reset );
bool ConfigureTopicLink( CPubSubSideChannelSettingsPtr pubSubSideSettings ,
CPubSubClientTopicBasicPtr topic ,
bool reset );

bool ConnectPubSubClientTopic( CPubSubClientTopic& topic ,
const CPubSubClientFeatures& clientFeatures ,
Expand Down Expand Up @@ -381,6 +381,8 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientSide : public CORE::CTaskConsumer

bool AcknowledgeReceipt( CIPubSubMsg::TNoLockSharedPtr& msg );

void SetNeedToTrackInFlightPublishedMsgsForAck( bool isNeeded );

virtual const CString& GetClassTypeName( void ) const GUCEF_VIRTUAL_OVERRIDE;

private:
Expand Down Expand Up @@ -495,6 +497,8 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientSide : public CORE::CTaskConsumer
CPubSubFlowRouter* flowRouter;
CPubSubClientSide* side;
CPubSubClientFeatures clientFeatures;
bool needToTrackInFlightPublishedMsgsForAck;
bool retryFailedPublishAttempts;
TIPubSubBookmarkPersistenceBasicPtr pubsubBookmarkPersistence;
bool awaitingFailureReport;
CORE::UInt64 totalMsgsInFlight;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubSideChannelConfig : public CORE::CIConfigur
bool treatPublishWithoutTargetTopicAsBroadcast;
bool retryFailedPublishAttempts;
bool allowOutOfOrderPublishRetry;
bool needToTrackInFlightPublishedMsgsForAckDefault;
CORE::Int32 maxMsgPublishRetryAttempts;
CORE::Int32 maxMsgPublishRetryTotalTimeInMs;
CORE::Int32 maxPublishedMsgInFlightTimeInMs;
Expand Down
43 changes: 32 additions & 11 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ CPubSubClientSide::TopicLink::TopicLink( void )
, inFlightMsgs()
, publishFailedMsgs()
, publishAckdMsgsMailbox()
, maxASyncPublishMailboxSize( 1000 )
, maxASyncPublishMailboxSizeDuringAFR( 1000 )
, metricFriendlyTopicName()
, metricsPrefix()
, metrics( CPubSubClientSideMetrics::CreateSharedObj() )
Expand All @@ -191,6 +193,8 @@ CPubSubClientSide::TopicLink::TopicLink( void )
, flowRouter( GUCEF_NULL )
, side( GUCEF_NULL )
, clientFeatures()
, needToTrackInFlightPublishedMsgsForAck( true ) // safer default even through its less performant
, retryFailedPublishAttempts( true ) // safer default even through its less performant
, pubsubBookmarkPersistence()
, awaitingFailureReport( false )
, totalMsgsInFlight( 0 )
Expand All @@ -212,6 +216,8 @@ CPubSubClientSide::TopicLink::TopicLink( CPubSubClientTopicBasicPtr t )
, inFlightMsgs()
, publishFailedMsgs()
, publishAckdMsgsMailbox()
, maxASyncPublishMailboxSize( 1000 )
, maxASyncPublishMailboxSizeDuringAFR( 1000 )
, metricFriendlyTopicName()
, metricsPrefix()
, metrics( CPubSubClientSideMetrics::CreateSharedObj() )
Expand All @@ -222,6 +228,8 @@ CPubSubClientSide::TopicLink::TopicLink( CPubSubClientTopicBasicPtr t )
, flowRouter( GUCEF_NULL )
, side( GUCEF_NULL )
, clientFeatures()
, needToTrackInFlightPublishedMsgsForAck( true ) // safer default even through its less performant
, retryFailedPublishAttempts( true ) // safer default even through its less performant
, pubsubBookmarkPersistence()
, awaitingFailureReport( false )
, totalMsgsInFlight( 0 )
Expand Down Expand Up @@ -252,6 +260,15 @@ CPubSubClientSide::TopicLink::GetClassTypeName( void ) const

/*-------------------------------------------------------------------------*/

void
CPubSubClientSide::TopicLink::SetNeedToTrackInFlightPublishedMsgsForAck( bool isNeeded )
{GUCEF_TRACE;

needToTrackInFlightPublishedMsgsForAck = isNeeded;
}

/*-------------------------------------------------------------------------*/

void
CPubSubClientSide::TopicLink::AddInFlightMsg( CORE::UInt64 publishActionId ,
CIPubSubMsg::TNoLockSharedPtr& msg )
Expand Down Expand Up @@ -1043,11 +1060,11 @@ CPubSubClientSide::TopicLink::PublishMsgsSync( const TMsgCollection& msgs )
if ( topic->IsPublishingSupported() )
{
currentPublishActionIds.clear();
if ( topic->Publish( currentPublishActionIds, msgs, sideSettings->needToTrackInFlightPublishedMsgsForAck ) )
if ( topic->Publish( currentPublishActionIds, msgs, needToTrackInFlightPublishedMsgsForAck ) )
{
totalMsgsInFlight += msgs.size();

if ( sideSettings->needToTrackInFlightPublishedMsgsForAck )
if ( needToTrackInFlightPublishedMsgsForAck )
AddInFlightMsgs( currentPublishActionIds, msgs, true );

GUCEF_DEBUG_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "TopicLink(" + CORE::ToString( this ) +
Expand Down Expand Up @@ -1089,6 +1106,8 @@ CPubSubClientSide::TopicLink::ApplySettings( const CPubSubSideChannelSettingsPtr
metricsPrefix = sideSettings->metricsPrefix;
maxASyncPublishMailboxSize = sideSettings->maxASyncPublishMailboxSize;
maxASyncPublishMailboxSizeDuringAFR = sideSettings->maxASyncPublishMailboxSizeDuringAFR;
needToTrackInFlightPublishedMsgsForAck = sideSettings->needToTrackInFlightPublishedMsgsForAck;
retryFailedPublishAttempts = sideSettings->retryFailedPublishAttempts;

if ( sideSettings->maxPublishedMsgInFlightTimeInMs > 0 )
{
Expand Down Expand Up @@ -2460,8 +2479,10 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsPublished( CORE::CNotifier* notif
if ( GUCEF_NULL == eventData || GUCEF_NULL == notifier || GUCEF_NULL == side )
return;

if ( !side->GetSideSettings()->needToTrackInFlightPublishedMsgsForAck )
if ( !needToTrackInFlightPublishedMsgsForAck )
{
GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "TopicLink(" + CORE::ToString( this ) +
"):OnPubSubTopicMsgsPublished: Ignoring msg acks from client because we are not tracking msgs for acks" );
return;
}

Expand Down Expand Up @@ -2715,9 +2736,9 @@ CPubSubClientSide::DisconnectPubSubClient( bool destroyClient )
/*-------------------------------------------------------------------------*/

bool
CPubSubClientSide::ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSubSideSettings ,
CPubSubClientTopicBasicPtr topic ,
bool reset )
CPubSubClientSide::ConfigureTopicLink( CPubSubSideChannelSettingsPtr pubSubSideSettings ,
CPubSubClientTopicBasicPtr topic ,
bool reset )
{GUCEF_TRACE;

if ( pubSubSideSettings.IsNULL() )
Expand Down Expand Up @@ -2745,7 +2766,7 @@ CPubSubClientSide::ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSu
topicConfig = CPubSubClientTopicConfigPtr( GUCEF_NEW CPubSubClientTopicConfig() );
if ( topic->SaveConfig( *topicConfig ) )
{
m_sideSettings->pubsubClientConfig.topics.push_back( topicConfig );
pubSubSideSettings->pubsubClientConfig.topics.push_back( topicConfig );

GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) +
"):ConfigureTopicLink: Obtained a copy of the topic config from the topic itself for topic which has no predefined config. topicName=" + topicName + " SideId=" + m_sideId );
Expand All @@ -2758,7 +2779,7 @@ CPubSubClientSide::ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSu
}

CORE::PulseGeneratorPtr pulseGenerator;
if ( m_sideSettings->useBackendTopicThreadForTopicIfAvailable )
if ( pubSubSideSettings->useBackendTopicThreadForTopicIfAvailable )
pulseGenerator = topic->GetPulseGenerator();
if ( pulseGenerator.IsNULL() )
pulseGenerator = GetPulseGenerator();
Expand Down Expand Up @@ -2821,7 +2842,7 @@ CPubSubClientSide::ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSu

// Create and configure the pub-sub bookmark persistence
// we create a private copy per topic link to minimize potential contention across threads
CPubSubBookmarkPersistenceConfig& pubsubBookmarkPersistenceConfig = m_sideSettings->pubsubBookmarkPersistenceConfig;
CPubSubBookmarkPersistenceConfig& pubsubBookmarkPersistenceConfig = pubSubSideSettings->pubsubBookmarkPersistenceConfig;
TIPubSubBookmarkPersistenceBasicPtr pubsubBookmarkPersistence = CPubSubGlobal::Instance()->GetPubSubBookmarkPersistenceFactory().Create( pubsubBookmarkPersistenceConfig.bookmarkPersistenceType, pubsubBookmarkPersistenceConfig );
if ( pubsubBookmarkPersistence.IsNULL() )
{
Expand All @@ -2835,7 +2856,7 @@ CPubSubClientSide::ConfigureTopicLink( const CPubSubSideChannelSettingsPtr pubSu
topicLink->SetFlowRouter( m_flowRouter );
topicLink->SetParentSide( this );
topicLink->SetTopic( topic );
topicLink->ApplySettings( m_sideSettings );
topicLink->ApplySettings( pubSubSideSettings );
topicLink->SetClientFeatures( m_clientFeatures );
topicLink->SetPubsubBookmarkPersistence( pubsubBookmarkPersistence );
topicLink->SetPubsubBookmarkNamespace( m_bookmarkNamespace );
Expand Down Expand Up @@ -3146,7 +3167,7 @@ CPubSubClientSide::ConnectPubSubClient( bool reset )
// Whether we need to track successfull message handoff (garanteed handling) depends on various factors outside the scope of any one side
// as such we need to ask the overarching infra to come up with a conclusion on this need
// We will cache the outcome as a side local setting to negate locking needs
m_sideSettings->needToTrackInFlightPublishedMsgsForAck = false;
m_sideSettings->needToTrackInFlightPublishedMsgsForAck = m_sideSettings->needToTrackInFlightPublishedMsgsForAckDefault;
if ( GUCEF_NULL != m_flowRouter )
m_sideSettings->needToTrackInFlightPublishedMsgsForAck = m_flowRouter->IsTrackingInFlightPublishedMsgsForAcksNeeded( this );

Expand Down
3 changes: 2 additions & 1 deletion platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1974,7 +1974,8 @@ CPubSubFlowRouter::ConfigureSpillover( CPubSubClientSide* spilloverSide, bool fl
{GUCEF_TRACE;

// pull a copy of the config
CPubSubSideChannelSettingsPtr sideSettings = spilloverSide->GetSideSettings();
CPubSubSideChannelSettingsPtr originalSideSettings = spilloverSide->GetSideSettings();
CPubSubSideChannelSettingsPtr sideSettings = CPubSubSideChannelSettings::CreateSharedObjWithParam( *originalSideSettings );

// (Re)Configure for the intended flow direction
if ( flowIntoSpillover )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ CPubSubMsgBinarySerializer::Deserialize( const CPubSubMsgBinarySerializerOptions

if ( options.msgIdIncluded )
{
// Write the ID using the variable length variant serializer
// Read the ID using the variable length variant serializer
UInt32 varByteSize = 0;
if ( !CORE::CVariantBinarySerializer::Deserialize( msg.GetMsgId(), currentSourceOffset, source, linkWherePossible, varByteSize ) )
return false;
Expand All @@ -433,7 +433,7 @@ CPubSubMsgBinarySerializer::Deserialize( const CPubSubMsgBinarySerializerOptions

if ( options.msgIndexIncluded )
{
// Write the msg index using the variable length variant serializer
// Read the msg index using the variable length variant serializer
UInt32 varByteSize = 0;
if ( !CORE::CVariantBinarySerializer::Deserialize( msg.GetMsgIndex(), currentSourceOffset, source, linkWherePossible, varByteSize ) )
return false;
Expand Down
Loading

0 comments on commit 91e981d

Please sign in to comment.