Skip to content

Commit

Permalink
- pubsub sqs backend wip towards initial usable version
Browse files Browse the repository at this point in the history
- pubsub example configs: fixed side id property naming in route section: the "Id" postfix is required
- core task manager: allow unregistering of threadpools for more explicit early cleanup
- core config store: fixed issue that could cause boot loop when bootstrap module auto register a non-bootstrap global configurable
  • Loading branch information
LiberatorUSA committed Jun 2, 2024
1 parent 9c213b7 commit 6f68a8a
Show file tree
Hide file tree
Showing 28 changed files with 1,226 additions and 127 deletions.
2 changes: 1 addition & 1 deletion dependencies/librdkafka/src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ static void rd_kafka_global_srand(void) {
* @returns the current number of active librdkafka instances
*/
static int rd_kafka_global_cnt_get(void) {
int r;
int r = 0;
mtx_lock(&rd_kafka_global_lock);
r = rd_kafka_global_cnt;
mtx_unlock(&rd_kafka_global_lock);
Expand Down
2 changes: 2 additions & 0 deletions dependencies/librdkafka/src/tinycthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ void mtx_destroy(mtx_t *mtx)
if (!mtx->mTimed)
{
DeleteCriticalSection(&(mtx->mHandle.cs));
memset( &(mtx->mHandle.cs), 0, sizeof mtx->mHandle.cs );
}
else
{
CloseHandle(mtx->mHandle.mut);
mtx->mHandle.mut = NULL;
}
#else
pthread_mutex_destroy(mtx);
Expand Down
1 change: 1 addition & 0 deletions platform/gucefCORE/include/dvcppstringutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ inline CString ToString( const CUtf8String::StringVector& el ) { CUtf8String out
inline CString ToString( const CAsciiString::StringVector& el ) { CAsciiString out; return CUtf8String( out.Combine( el, ',' ) ); }
inline CString ToString( const CUtf8String::StringSet& el ) { CUtf8String out; return out.Combine( el, ',' ); }
inline CString ToString( const CAsciiString::StringSet& el ) { CAsciiString out; return CUtf8String( out.Combine( el, ',' ) ); }
inline CString ToString( const CUtf8String::StringMap& el ) { CUtf8String out; return out.Combine( el, '=', ',' ); }
#endif

inline CString ToString( UInt8 value ) { return UInt8ToString( value ); }
Expand Down
4 changes: 4 additions & 0 deletions platform/gucefCORE/include/gucefCORE_CAsciiString.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class GUCEF_CORE_PUBLIC_CPP CAsciiString

char operator[]( const UInt32 index ) const;

char& operator[]( const UInt32 index );

bool IsNULLOrEmpty( void ) const;

operator std::string() const;
Expand Down Expand Up @@ -298,6 +300,8 @@ class GUCEF_CORE_PUBLIC_CPP CAsciiString

CAsciiString RemoveChar( const char charToRemove ) const;

bool HasRepeatingChar( const char charToCheck ) const;

CAsciiString CompactRepeatingChar( const char charToCompact ) const;

StringVector ParseElements( char seperator ,
Expand Down
4 changes: 4 additions & 0 deletions platform/gucefCORE/include/gucefCORE_CTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier
ThreadPoolPtr GetOrCreateThreadPool( const CString& threadPoolName ,
bool createIfNotExists = true );

bool UnregisterThreadPool( const CString& threadPoolName );

bool UnregisterThreadPool( ThreadPoolPtr threadPool );

/**
* Queues a task for execution as soon as a thread is available
* to execute it.
Expand Down
1 change: 1 addition & 0 deletions platform/gucefCORE/include/gucefCORE_CUtf8String.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ class GUCEF_CORE_PUBLIC_CPP CUtf8String

CUtf8String Combine( const StringVector& elements, Int32 seperator ) const;
CUtf8String Combine( const StringSet& elements, Int32 seperator ) const;
CUtf8String Combine( const StringMap& elements, Int32 valueSeperator, Int32 kvSeperator ) const;

void Clear( void );

Expand Down
1 change: 1 addition & 0 deletions platform/gucefCORE/include/gucefCORE_CVariant.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class GUCEF_CORE_PUBLIC_CPP CVariant

bool IsInteger( void ) const;
bool IsFloat( void ) const;
bool IsNumber( void ) const;
bool IsString( void ) const;
bool IsBoolean( void ) const;
bool IsBinary( void ) const;
Expand Down
5 changes: 3 additions & 2 deletions platform/gucefCORE/src/CConfigStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,10 @@ CConfigStore::ApplyConfigData( const CDataNode& loadedConfig ,
errorOccured = true;
GUCEF_ERROR_LOG( LOGLEVEL_IMPORTANT, "ConfigStore:ApplyConfigData: Loading of config failed for a late-addition configureable with type name \"" + (*i)->GetClassTypeName() + "\"" );
}
m_configureables.insert( (*i) );
m_newConfigureables.erase( (*i) );
}

m_configureables.insert( (*i) );
m_newConfigureables.erase( (*i) );
++i;
}
}
Expand Down
35 changes: 35 additions & 0 deletions platform/gucefCORE/src/gucefCORE_CAsciiString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,20 @@ CAsciiString::operator[]( const UInt32 index ) const

/*-------------------------------------------------------------------------*/

char&
CAsciiString::operator[]( const UInt32 index )
{GUCEF_TRACE;

static char outOfBoundsChar = '\0';
if ( index <= m_length )
return m_string[ index ];

GUCEF_ASSERT_ALWAYS;
return outOfBoundsChar;
}

/*-------------------------------------------------------------------------*/

bool
CAsciiString::operator<( const CAsciiString& other ) const
{GUCEF_TRACE;
Expand Down Expand Up @@ -1140,6 +1154,27 @@ CAsciiString::RemoveChar( const char charToRemove ) const

/*-------------------------------------------------------------------------*/

bool
CAsciiString::HasRepeatingChar( const char charToCheck ) const
{GUCEF_TRACE;

for ( UInt32 i=0; i<m_length; ++i )
{
if ( m_string[ i ] == charToCheck )
{
if ( ( i+1 < m_length ) &&
( m_string[ i+1 ] == charToCheck ) )
{
return true;
}
}
}

return false;
}

/*-------------------------------------------------------------------------*/

CAsciiString
CAsciiString::CompactRepeatingChar( const char charToCompact ) const
{GUCEF_TRACE;
Expand Down
29 changes: 29 additions & 0 deletions platform/gucefCORE/src/gucefCORE_CTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,35 @@ CTaskManager::GetOrCreateThreadPool( const CString& threadPoolName ,

/*-------------------------------------------------------------------------*/

bool
CTaskManager::UnregisterThreadPool( const CString& threadPoolName )
{GUCEF_TRACE;

MT::CObjectScopeLock lock( this, GUCEF_MT_LONG_LOCK_TIMEOUT );

ThreadPoolMap::const_iterator i = m_threadPools.find( threadPoolName );
if ( i != m_threadPools.end() )
{
m_threadPools.erase( i );
GUCEF_SYSTEM_LOG( LOGLEVEL_NORMAL, "TaskManager:UnregisterThreadPool: Thread pool with name \"" + threadPoolName + "\" has been unregistered" );
}

return true;
}

/*-------------------------------------------------------------------------*/

bool
CTaskManager::UnregisterThreadPool( ThreadPoolPtr threadPool )
{GUCEF_TRACE;

if ( threadPool.IsNULL() )
return true; // nothing to do

return UnregisterThreadPool( threadPool->GetThreadPoolName() );
}
/*-------------------------------------------------------------------------*/

TTaskStatus
CTaskManager::QueueTask( const CString& threadPoolName ,
const CString& taskType ,
Expand Down
89 changes: 89 additions & 0 deletions platform/gucefCORE/src/gucefCORE_CUtf8String.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,95 @@ CUtf8String::Combine( const StringSet& elements, Int32 seperator ) const

/*-------------------------------------------------------------------------*/

CUtf8String
CUtf8String::Combine( const StringMap& elements, Int32 valueSeperator, Int32 kvSeperator ) const
{GUCEF_TRACE;

if ( elements.empty() && 0 == m_length )
return CUtf8String();

// Determine storage size needed

size_t valueSepCpSize = utf8codepointsize( valueSeperator );
size_t kvSepCpSize = utf8codepointsize( kvSeperator );
size_t totalExtraCpSize = ( valueSepCpSize * elements.size() ) + ( kvSepCpSize * elements.size() );

UInt32 bufferSize = (UInt32) totalExtraCpSize;
if ( m_byteSize > 0 )
bufferSize += m_byteSize + (UInt32) kvSepCpSize;

StringMap::const_iterator i = elements.begin();
while ( i != elements.end() )
{
const CUtf8String& key = (*i).first;
const CUtf8String& value = (*i).second;

UInt32 keySize = key.IsNULLOrEmpty() ? 0 : key.m_byteSize-1;
UInt32 valueSize = value.IsNULLOrEmpty() ? 0 : value.m_byteSize-1;
bufferSize += keySize + valueSize;

++i;
}

// Allocate total storage

CUtf8String comboStr;
if ( GUCEF_NULL == comboStr.Reserve( bufferSize ) )
return CUtf8String::Empty;

// Write contents
char* cpPos = comboStr.m_string;
size_t bufferSpaceLeft = bufferSize;
if ( GUCEF_NULL != m_string && 0 < m_length )
{
memcpy( comboStr.m_string, m_string, m_byteSize );
bufferSpaceLeft -= m_byteSize;

cpPos = (char*) utf8catcodepoint( cpPos, kvSeperator, bufferSpaceLeft );
bufferSpaceLeft -= kvSepCpSize;
}
else
{
comboStr.m_string[ 0 ] = '\0';
}
i = elements.begin();
while ( i != elements.end() )
{
const CUtf8String& key = (*i).first;
const CUtf8String& value = (*i).second;

UInt32 keySize = key.IsNULLOrEmpty() ? 0 : key.m_byteSize-1;
UInt32 valueSize = value.IsNULLOrEmpty() ? 0 : value.m_byteSize-1;

memcpy( cpPos, key.m_string, keySize );
cpPos += keySize;
bufferSpaceLeft -= keySize;

cpPos = (char*) utf8catcodepoint( cpPos, valueSeperator, bufferSpaceLeft );
bufferSpaceLeft -= valueSepCpSize;

memcpy( cpPos, value.m_string, valueSize );
cpPos += valueSize;
bufferSpaceLeft -= valueSize;

++i;
if ( i != elements.end() )
{
cpPos = (char*) utf8catcodepoint( cpPos, kvSeperator, bufferSpaceLeft );
bufferSpaceLeft -= kvSepCpSize;
}

*cpPos = '\0';
}

// Determine new string length codepoint wise
comboStr.m_length = (UInt32) utf8len_s( comboStr.m_string, comboStr.m_byteSize );
comboStr.m_string[ comboStr.m_byteSize-1 ] = '\0';
return comboStr;
}

/*-------------------------------------------------------------------------*/

bool
CUtf8String::IsFormattingValid( void ) const
{GUCEF_TRACE;
Expand Down
10 changes: 10 additions & 0 deletions platform/gucefCORE/src/gucefCORE_CVariant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,16 @@ CVariant::IsFloat( void ) const

/*-------------------------------------------------------------------------*/

bool
CVariant::IsNumber( void ) const
{GUCEF_TRACE;

return IsInteger() || IsFloat() ||
m_variantData.containedType == GUCEF_DATATYPE_NUMERIC;
}

/*-------------------------------------------------------------------------*/

bool
CVariant::IsString( void ) const
{GUCEF_TRACE;
Expand Down
18 changes: 15 additions & 3 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouterConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,27 @@ CPubSubFlowRouterConfig::LoadConfig( const CORE::CDataNode& cfg )
CPubSubFlowRouteConfigPtr route = CPubSubFlowRouteConfig::CreateSharedObj();
if ( route->LoadConfig( *routeNode ) )
{
routes.push_back( route );
if ( route->fromSideId.IsNULLOrEmpty() &&
route->toSideId.IsNULLOrEmpty() &&
route->spilloverBufferSideId.IsNULLOrEmpty() &&
route->deadLetterSideId.IsNULLOrEmpty() &&
route->failoverSideId.IsNULLOrEmpty() )
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::ToString( this ) +
"):LoadConfig: route node has no side ids specified and is thus invalid. It will be ignored" );
}
else
{
routes.push_back( route );
}
}
++i;
}
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::PointerToString( this ) +
"):LoadConfig: Missing 'routes' config is malformed" );
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::ToString( this ) +
"):LoadConfig: Missing 'routes' thus the config is malformed" );
return false;
}

Expand Down
Loading

0 comments on commit 6f68a8a

Please sign in to comment.