Skip to content

Commit

Permalink
- core: fixed base64 encode causing memory corruption
Browse files Browse the repository at this point in the history
- various: added clean shutdown mechanics in a few spots to reduce noise when looking for memory leaks after a run
- pubsub kafka backend: due to the plugin pattern we need an extra hack to avoid tripping up the RdKafka code not being initialized yet shutdown being called
- pubsub sqs backend: fixed multiple batch message sends running into max bytes size limit
  • Loading branch information
LiberatorUSA committed Jul 4, 2024
1 parent aff58b1 commit 7a85e6d
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 44 deletions.
38 changes: 38 additions & 0 deletions platform/gucefCORE/include/gucefCORE_CTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier

static const CEvent ThreadPoolCreatedEvent;
static const CEvent ThreadPoolDestructionEvent;
static const CEvent ThreadPoolUnregisteredEvent;
static const CEvent GlobalTaskConsumerFactoryRegisteredEvent;
static const CEvent GlobalTaskConsumerFactoryUnregisteredEvent;
static const CEvent GlobalTaskDataFactoryRegisteredEvent;
static const CEvent GlobalTaskDataFactoryUnregisteredEvent;

typedef TCloneableString ThreadPoolCreatedEventData; /**< name of the thread pool that is created */
typedef TCloneableString ThreadPoolDestructionEventData; /**< name of the thread pool that is being destroyed */
typedef TCloneableString ThreadPoolUnregisteredEventData; /**< name of the thread pool that is unregistered */
typedef TCloneableString GlobalTaskConsumerFactoryRegisteredEventData; /**< name of the task consumer factory which is newly registered */
typedef TCloneableString GlobalTaskConsumerFactoryUnregisteredEventData; /**< name of the task consumer factory which is no longer registered */
typedef TCloneableString GlobalTaskDataFactoryRegisteredEventData; /**< name of the task data factory which is newly registered */
Expand Down Expand Up @@ -120,6 +122,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier
bool UnregisterThreadPool( const CString& threadPoolName );

bool UnregisterThreadPool( ThreadPoolPtr threadPool );

void UnregisterAllThreadPools( void );

/**
* Queues a task for execution as soon as a thread is available
Expand Down Expand Up @@ -188,6 +192,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier

void UnregisterTaskConsumerFactory( const CString& taskType );

void UnregisterAllTaskConsumerFactories( void );

void GetAllRegisteredTaskConsumerFactoryTypes( CORE::CString::StringSet& taskTypes );

void GetRegisteredTaskConsumerFactoryTypes( const CString& threadPoolName ,
Expand All @@ -198,6 +204,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier

void UnregisterTaskDataFactory( const CString& taskType );

void UnregisterAllTaskDataFactories( void );

bool IsTaskOfTaskTypeExecutable( const CString& taskType, const CString& threadPoolName = CString::Empty ) const;

bool IsCustomTaskDataForTaskTypeSerializable( const CString& taskType, const CString& threadPoolName = CString::Empty ) const;
Expand Down Expand Up @@ -287,12 +295,37 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier
CDataNode& domNode ,
const CDataNodeSerializableSettings& serializerSettings ) const;

/**
* Gracefully shuts everything down and unregisters all that was registered
*/
void Shutdown( void );

protected:

virtual void OnPumpedNotify( CNotifier* notifier ,
const CEvent& eventid ,
CICloneable* eventdata = NULL ) GUCEF_VIRTUAL_OVERRIDE;

void OnThreadPoolThreadStarted( CNotifier* notifier ,
const CEvent& eventId ,
CICloneable* eventData );

void OnThreadPoolThreadKilled( CNotifier* notifier ,
const CEvent& eventId ,
CICloneable* eventData );

void OnThreadPoolThreadFinished( CNotifier* notifier ,
const CEvent& eventId ,
CICloneable* eventData );

void OnThreadPoolDestruction( CNotifier* notifier ,
const CEvent& eventId ,
CICloneable* eventData );

void OnAppShutdown( CORE::CNotifier* notifier ,
const CORE::CEvent& eventId ,
CORE::CICloneable* eventData );

private:
friend class CTaskConsumer;

Expand All @@ -311,6 +344,10 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier

void RemoveConsumer( UInt32 taskID );

void RegisterThreadPoolEventHandlers( CThreadPool* threadPool );

void RegisterEventHandlers( void );

CTaskManager( const CTaskManager& src ); /**< not implemented */
CTaskManager& operator=( const CTaskManager& src ); /**< not implemented */

Expand All @@ -320,6 +357,7 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier
typedef CTAbstractFactory< CString, CTaskConsumer, MT::CMutex > TAbstractTaskConsumerFactory;
typedef CTAbstractFactory< CString, CIDataNodeSerializableTaskData, MT::CMutex > TAbstractTaskDataFactory;
typedef CTaskConsumer::TTaskIdGenerator TTaskIdGenerator;
typedef CTEventHandlerFunctor< CTaskManager > TEventCallback;

TTaskIdGenerator m_taskIdGenerator;
TAbstractTaskConsumerFactory m_consumerFactory;
Expand Down
2 changes: 2 additions & 0 deletions platform/gucefCORE/include/gucefCORE_CThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class GUCEF_CORE_PUBLIC_CPP CThreadPool : public CTSGNotifier ,
{
public:

static const CString ClassTypeName;

static const CEvent ThreadKilledEvent;
static const CEvent ThreadStartedEvent;
static const CEvent ThreadPausedEvent;
Expand Down
7 changes: 4 additions & 3 deletions platform/gucefCORE/src/dvcppstringutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,15 @@ Base64Encode( const void* byteBuffer, UInt32 bufferSize )

CString result;
UInt32 base64StrLength = ( ( bufferSize + 2 ) / 3 ) * 4;
char* str = result.Reserve( base64StrLength+1, (Int32) base64StrLength );
char* base64StringBuffer = result.Reserve( base64StrLength+1, (Int32) base64StrLength );
char* str = base64StringBuffer;

// Base64 encoding can really add up wrt size requirements
// We need to sanity check that we were able to allocate enough memory
if ( GUCEF_NULL == str || base64StrLength+1 != result.ByteSize() )
return CString::Empty;

memset( str, '=', base64StrLength );
memset( base64StringBuffer, '=', base64StrLength );
const char* charByteBuffer = (const char*) byteBuffer;

int i = 0;
Expand Down Expand Up @@ -433,7 +434,7 @@ Base64Encode( const void* byteBuffer, UInt32 bufferSize )

}

str[ base64StrLength ] = '\0';
base64StringBuffer[ base64StrLength ] = '\0';
result.DetermineLength();

return result;
Expand Down
Loading

0 comments on commit 7a85e6d

Please sign in to comment.