diff --git a/platform/gucefCORE/include/gucefCORE_CTaskManager.h b/platform/gucefCORE/include/gucefCORE_CTaskManager.h index a9fe3a5f8..e0395937e 100644 --- a/platform/gucefCORE/include/gucefCORE_CTaskManager.h +++ b/platform/gucefCORE/include/gucefCORE_CTaskManager.h @@ -83,6 +83,7 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier static const CEvent ThreadPoolCreatedEvent; static const CEvent ThreadPoolDestructionEvent; + static const CEvent ThreadPoolUnregisteredEvent; static const CEvent GlobalTaskConsumerFactoryRegisteredEvent; static const CEvent GlobalTaskConsumerFactoryUnregisteredEvent; static const CEvent GlobalTaskDataFactoryRegisteredEvent; @@ -90,6 +91,7 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier typedef TCloneableString ThreadPoolCreatedEventData; /**< name of the thread pool that is created */ typedef TCloneableString ThreadPoolDestructionEventData; /**< name of the thread pool that is being destroyed */ + typedef TCloneableString ThreadPoolUnregisteredEventData; /**< name of the thread pool that is unregistered */ typedef TCloneableString GlobalTaskConsumerFactoryRegisteredEventData; /**< name of the task consumer factory which is newly registered */ typedef TCloneableString GlobalTaskConsumerFactoryUnregisteredEventData; /**< name of the task consumer factory which is no longer registered */ typedef TCloneableString GlobalTaskDataFactoryRegisteredEventData; /**< name of the task data factory which is newly registered */ @@ -120,6 +122,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier bool UnregisterThreadPool( const CString& threadPoolName ); bool UnregisterThreadPool( ThreadPoolPtr threadPool ); + + void UnregisterAllThreadPools( void ); /** * Queues a task for execution as soon as a thread is available @@ -188,6 +192,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier void UnregisterTaskConsumerFactory( const CString& taskType ); + void UnregisterAllTaskConsumerFactories( void ); + void GetAllRegisteredTaskConsumerFactoryTypes( CORE::CString::StringSet& taskTypes ); void GetRegisteredTaskConsumerFactoryTypes( const CString& threadPoolName , @@ -198,6 +204,8 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier void UnregisterTaskDataFactory( const CString& taskType ); + void UnregisterAllTaskDataFactories( void ); + bool IsTaskOfTaskTypeExecutable( const CString& taskType, const CString& threadPoolName = CString::Empty ) const; bool IsCustomTaskDataForTaskTypeSerializable( const CString& taskType, const CString& threadPoolName = CString::Empty ) const; @@ -287,12 +295,37 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier CDataNode& domNode , const CDataNodeSerializableSettings& serializerSettings ) const; + /** + * Gracefully shuts everything down and unregisters all that was registered + */ + void Shutdown( void ); + protected: virtual void OnPumpedNotify( CNotifier* notifier , const CEvent& eventid , CICloneable* eventdata = NULL ) GUCEF_VIRTUAL_OVERRIDE; + void OnThreadPoolThreadStarted( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ); + + void OnThreadPoolThreadKilled( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ); + + void OnThreadPoolThreadFinished( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ); + + void OnThreadPoolDestruction( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ); + + void OnAppShutdown( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ); + private: friend class CTaskConsumer; @@ -311,6 +344,10 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier void RemoveConsumer( UInt32 taskID ); + void RegisterThreadPoolEventHandlers( CThreadPool* threadPool ); + + void RegisterEventHandlers( void ); + CTaskManager( const CTaskManager& src ); /**< not implemented */ CTaskManager& operator=( const CTaskManager& src ); /**< not implemented */ @@ -320,6 +357,7 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier typedef CTAbstractFactory< CString, CTaskConsumer, MT::CMutex > TAbstractTaskConsumerFactory; typedef CTAbstractFactory< CString, CIDataNodeSerializableTaskData, MT::CMutex > TAbstractTaskDataFactory; typedef CTaskConsumer::TTaskIdGenerator TTaskIdGenerator; + typedef CTEventHandlerFunctor< CTaskManager > TEventCallback; TTaskIdGenerator m_taskIdGenerator; TAbstractTaskConsumerFactory m_consumerFactory; diff --git a/platform/gucefCORE/include/gucefCORE_CThreadPool.h b/platform/gucefCORE/include/gucefCORE_CThreadPool.h index ccf0f1c1b..3d2b8b349 100644 --- a/platform/gucefCORE/include/gucefCORE_CThreadPool.h +++ b/platform/gucefCORE/include/gucefCORE_CThreadPool.h @@ -113,6 +113,8 @@ class GUCEF_CORE_PUBLIC_CPP CThreadPool : public CTSGNotifier , { public: + static const CString ClassTypeName; + static const CEvent ThreadKilledEvent; static const CEvent ThreadStartedEvent; static const CEvent ThreadPausedEvent; diff --git a/platform/gucefCORE/src/dvcppstringutils.cpp b/platform/gucefCORE/src/dvcppstringutils.cpp index 269c09105..b0e4dbc47 100644 --- a/platform/gucefCORE/src/dvcppstringutils.cpp +++ b/platform/gucefCORE/src/dvcppstringutils.cpp @@ -375,14 +375,15 @@ Base64Encode( const void* byteBuffer, UInt32 bufferSize ) CString result; UInt32 base64StrLength = ( ( bufferSize + 2 ) / 3 ) * 4; - char* str = result.Reserve( base64StrLength+1, (Int32) base64StrLength ); + char* base64StringBuffer = result.Reserve( base64StrLength+1, (Int32) base64StrLength ); + char* str = base64StringBuffer; // Base64 encoding can really add up wrt size requirements // We need to sanity check that we were able to allocate enough memory if ( GUCEF_NULL == str || base64StrLength+1 != result.ByteSize() ) return CString::Empty; - memset( str, '=', base64StrLength ); + memset( base64StringBuffer, '=', base64StrLength ); const char* charByteBuffer = (const char*) byteBuffer; int i = 0; @@ -433,7 +434,7 @@ Base64Encode( const void* byteBuffer, UInt32 bufferSize ) } - str[ base64StrLength ] = '\0'; + base64StringBuffer[ base64StrLength ] = '\0'; result.DetermineLength(); return result; diff --git a/platform/gucefCORE/src/gucefCORE_CTaskManager.cpp b/platform/gucefCORE/src/gucefCORE_CTaskManager.cpp index cf0a0b283..4bd822fc4 100644 --- a/platform/gucefCORE/src/gucefCORE_CTaskManager.cpp +++ b/platform/gucefCORE/src/gucefCORE_CTaskManager.cpp @@ -97,6 +97,7 @@ namespace CORE { const CEvent CTaskManager::ThreadPoolCreatedEvent = "GUCEF::CORE::CTaskManager::ThreadPoolCreatedEvent"; const CEvent CTaskManager::ThreadPoolDestructionEvent = "GUCEF::CORE::CTaskManager::ThreadPoolDestructionEvent"; +const CEvent CTaskManager::ThreadPoolUnregisteredEvent = "GUCEF::CORE::CTaskManager::ThreadPoolUnregisteredEvent"; const CEvent CTaskManager::GlobalTaskConsumerFactoryRegisteredEvent = "GUCEF::CORE::CTaskManager::GlobalTaskConsumerFactoryRegisteredEvent"; const CEvent CTaskManager::GlobalTaskConsumerFactoryUnregisteredEvent = "GUCEF::CORE::CTaskManager::GlobalTaskConsumerFactoryUnregisteredEvent"; const CEvent CTaskManager::GlobalTaskDataFactoryRegisteredEvent = "GUCEF::CORE::CTaskManager::GlobalTaskDataFactoryRegisteredEvent"; @@ -117,6 +118,7 @@ CTaskManager::RegisterEvents( void ) ThreadPoolCreatedEvent.Initialize(); ThreadPoolDestructionEvent.Initialize(); + ThreadPoolUnregisteredEvent.Initialize(); GlobalTaskConsumerFactoryRegisteredEvent.Initialize(); GlobalTaskConsumerFactoryUnregisteredEvent.Initialize(); GlobalTaskDataFactoryRegisteredEvent.Initialize(); @@ -138,7 +140,9 @@ CTaskManager::CTaskManager( void ) ThreadPoolPtr defaultPool = ( GUCEF_NEW CThreadPool( CORE::CCoreGlobal::Instance()->GetPulseGenerator(), DefaultThreadPoolName ) )->CreateSharedPtr(); m_threadPools[ DefaultThreadPoolName ] = defaultPool; - SubscribeTo( defaultPool.GetPointerAlways() ); + RegisterEventHandlers(); + RegisterThreadPoolEventHandlers( defaultPool.GetPointerAlways() ); + ThreadPoolCreatedEventData eData( DefaultThreadPoolName ); NotifyObserversFromThread( ThreadPoolCreatedEvent, &eData ); } @@ -165,22 +169,126 @@ CTaskManager::GetClassTypeName( void ) const /*-------------------------------------------------------------------------*/ +void +CTaskManager::RegisterEventHandlers( void ) +{GUCEF_TRACE; + + CGUCEFApplication& app = CORE::CCoreGlobal::Instance()->GetApplication(); + + TEventCallback callback( this, &CTaskManager::OnAppShutdown ); + SubscribeTo( &app, CGUCEFApplication::AppShutdownEvent, callback ); +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::OnAppShutdown( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ) +{GUCEF_TRACE; + + Shutdown(); +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::Shutdown( void ) +{GUCEF_TRACE; + + RequestAllThreadsToStop( true, false ); + UnregisterAllTaskDataFactories(); + UnregisterAllTaskConsumerFactories(); + UnregisterAllThreadPools(); +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::RegisterThreadPoolEventHandlers( CThreadPool* threadPool ) +{GUCEF_TRACE; + + TEventCallback callback( this, &CTaskManager::OnThreadPoolThreadStarted ); + SubscribeTo( threadPool, CThreadPool::ThreadStartedEvent, callback ); + + TEventCallback callback2( this, &CTaskManager::OnThreadPoolThreadKilled ); + SubscribeTo( threadPool, CThreadPool::ThreadKilledEvent, callback2 ); + + TEventCallback callback3( this, &CTaskManager::OnThreadPoolThreadFinished ); + SubscribeTo( threadPool, CThreadPool::ThreadFinishedEvent, callback3 ); + + TEventCallback callback4( this, &CTaskManager::OnThreadPoolDestruction ); + SubscribeTo( threadPool, CNotifier::DestructionEvent, callback4 ); + +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::OnThreadPoolThreadStarted( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ) +{GUCEF_TRACE; + + MT::CObjectScopeLock lock( this, GUCEF_MT_LONG_LOCK_TIMEOUT ); + ++m_activeGlobalNrOfThreads; +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::OnThreadPoolThreadKilled( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ) +{GUCEF_TRACE; + + MT::CObjectScopeLock lock( this, GUCEF_MT_LONG_LOCK_TIMEOUT ); + --m_activeGlobalNrOfThreads; +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::OnThreadPoolThreadFinished( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ) +{GUCEF_TRACE; + + MT::CObjectScopeLock lock( this, GUCEF_MT_LONG_LOCK_TIMEOUT ); + --m_activeGlobalNrOfThreads; +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::OnThreadPoolDestruction( CNotifier* notifier , + const CEvent& eventId , + CICloneable* eventData ) +{GUCEF_TRACE; + + if ( CThreadPool::ClassTypeName == notifier->GetClassTypeName() ) + { + CThreadPool* threadPool = static_cast< CThreadPool* >( notifier ); + CString threadPoolName = threadPool->GetThreadPoolName(); + + // The threadpool should already be unregistered at this point but just in case lets make double sure + // no-op if it was already not registered + UnregisterThreadPool( threadPoolName ); + + // Now what we are mainly after: Notify observers of the threadpool's demise by name + ThreadPoolDestructionEventData eventData( threadPoolName ); + NotifyObserversFromThread( ThreadPoolDestructionEvent, &eventData ); + } +} + +/*-------------------------------------------------------------------------*/ + void CTaskManager::OnPumpedNotify( CNotifier* notifier , const CEvent& eventId , CICloneable* eventData ) {GUCEF_TRACE; - if ( CThreadPool::ThreadStartedEvent == eventId ) - { - ++m_activeGlobalNrOfThreads; - } - else - if ( CThreadPool::ThreadKilledEvent == eventId || - CThreadPool::ThreadFinishedEvent == eventId ) - { - --m_activeGlobalNrOfThreads; - } } /*-------------------------------------------------------------------------*/ @@ -219,10 +327,10 @@ CTaskManager::GetOrCreateThreadPool( const CString& threadPoolName , { ThreadPoolPtr newPool = ( GUCEF_NEW CThreadPool( threadPoolPulseContext, threadPoolName ) )->CreateSharedPtr(); m_threadPools[ threadPoolName ] = newPool; - lock.EarlyUnlock(); - SubscribeTo( newPool.GetPointerAlways() ); + RegisterThreadPoolEventHandlers( newPool.GetPointerAlways() ); + ThreadPoolCreatedEventData eData( threadPoolName ); NotifyObserversFromThread( ThreadPoolCreatedEvent, &eData ); return newPool; @@ -254,6 +362,11 @@ CTaskManager::UnregisterThreadPool( const CString& threadPoolName ) { m_threadPools.erase( i ); GUCEF_SYSTEM_LOG( LOGLEVEL_NORMAL, "TaskManager:UnregisterThreadPool: Thread pool with name \"" + threadPoolName + "\" has been unregistered" ); + + lock.EarlyUnlock(); + + ThreadPoolUnregisteredEventData eventData( threadPoolName ); + NotifyObserversFromThread( ThreadPoolUnregisteredEvent, &eventData ); } return true; @@ -270,6 +383,26 @@ CTaskManager::UnregisterThreadPool( ThreadPoolPtr threadPool ) return UnregisterThreadPool( threadPool->GetThreadPoolName() ); } + +/*-------------------------------------------------------------------------*/ + +void +CTaskManager::UnregisterAllThreadPools( void ) +{GUCEF_TRACE; + + // We dont bulk unregister because we want to provide proper notifications per entry + + CString::StringSet poolNames; + GetAllThreadPoolNames( poolNames ); + + CString::StringSet::iterator i = poolNames.begin(); + while ( i != poolNames.end() ) + { + UnregisterThreadPool( (*i) ); + ++i; + } +} + /*-------------------------------------------------------------------------*/ TTaskStatus @@ -559,6 +692,25 @@ CTaskManager::UnregisterTaskConsumerFactory( const CString& taskType ) /*-------------------------------------------------------------------------*/ +void +CTaskManager::UnregisterAllTaskConsumerFactories( void ) +{GUCEF_TRACE; + + // We dont bulk unregister because we want to provide proper notifications + + CString::StringSet taskTypes; + m_consumerFactory.ObtainKeySet( taskTypes ); + + CString::StringSet::iterator i = taskTypes.begin(); + while ( i != taskTypes.end() ) + { + UnregisterTaskConsumerFactory( (*i) ); + ++i; + } +} + +/*-------------------------------------------------------------------------*/ + void CTaskManager::RegisterTaskDataFactory( const CString& taskType , TTaskDataFactory* factory ) @@ -584,6 +736,25 @@ CTaskManager::UnregisterTaskDataFactory( const CString& taskType ) /*-------------------------------------------------------------------------*/ +void +CTaskManager::UnregisterAllTaskDataFactories( void ) +{GUCEF_TRACE; + + // We dont bulk unregister because we want to provide proper notifications + + CString::StringSet taskTypes; + m_taskDataFactory.ObtainKeySet( taskTypes ); + + CString::StringSet::iterator i = taskTypes.begin(); + while ( i != taskTypes.end() ) + { + UnregisterTaskDataFactory( (*i) ); + ++i; + } +} + +/*-------------------------------------------------------------------------*/ + void CTaskManager::RegisterTaskConsumerId( CTaskConsumer::TTaskId& taskId ) {GUCEF_TRACE; diff --git a/platform/gucefCORE/src/gucefCORE_CThreadPool.cpp b/platform/gucefCORE/src/gucefCORE_CThreadPool.cpp index f0d1dfb0d..5d4c270b6 100644 --- a/platform/gucefCORE/src/gucefCORE_CThreadPool.cpp +++ b/platform/gucefCORE/src/gucefCORE_CThreadPool.cpp @@ -90,6 +90,8 @@ namespace CORE { // // //-------------------------------------------------------------------------*/ +const CString CThreadPool::ClassTypeName = "GUCEF::CORE::CThreadPool"; + const CEvent CThreadPool::ThreadKilledEvent = "GUCEF::CORE::CThreadPool::ThreadKilledEvent"; const CEvent CThreadPool::ThreadStartedEvent = "GUCEF::CORE::CThreadPool::ThreadStartedEvent"; const CEvent CThreadPool::ThreadPausedEvent = "GUCEF::CORE::CThreadPool::ThreadPausedEvent"; @@ -258,8 +260,7 @@ const CString& CThreadPool::GetClassTypeName( void ) const {GUCEF_TRACE; - static const CString typeName = "GUCEF::CORE::CThreadPool"; - return typeName; + return ClassTypeName; } /*-------------------------------------------------------------------------*/ diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSub2PubSub.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSub2PubSub.cpp index f9ab386a9..630d8acb7 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSub2PubSub.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSub2PubSub.cpp @@ -1636,6 +1636,7 @@ PubSub2PubSub::OnAppShutdown( CORE::CNotifier* notifier , // Since we are shutting down the app gracefully close the C&C API now m_httpServer.Close(); + m_httpRouter.RemoveAllResourceMappings(); // Now get rid of all the channels we created based on the settings m_channels.clear(); diff --git a/platform/gucefWEB/include/gucefWEB_CTaskManagerServerResource.h b/platform/gucefWEB/include/gucefWEB_CTaskManagerServerResource.h index 64ec936a3..5a75bba8d 100644 --- a/platform/gucefWEB/include/gucefWEB_CTaskManagerServerResource.h +++ b/platform/gucefWEB/include/gucefWEB_CTaskManagerServerResource.h @@ -145,6 +145,10 @@ class GUCEF_WEB_PUBLIC_CPP CTaskManagerServerResource : public CORE::CTSGNotifie const CORE::CEvent& eventId , CORE::CICloneable* eventData = GUCEF_NULL ); + virtual void OnThreadPoolUnregistered( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData = GUCEF_NULL ); + virtual void OnGlobalTaskConsumerFactoryRegistered( CORE::CNotifier* notifier , const CORE::CEvent& eventId , CORE::CICloneable* eventData = GUCEF_NULL ); diff --git a/platform/gucefWEB/include/gucefWEB_CWebGlobal.h b/platform/gucefWEB/include/gucefWEB_CWebGlobal.h index d6c55732b..14d7ecb29 100644 --- a/platform/gucefWEB/include/gucefWEB_CWebGlobal.h +++ b/platform/gucefWEB/include/gucefWEB_CWebGlobal.h @@ -64,6 +64,7 @@ class GUCEF_WEB_PUBLIC_CPP CWebGlobal CGlobalHttpCodecLinks& GetGlobalHttpCodecLinks( void ) const; private: + friend class CModule; static void Deinstance( void ); diff --git a/platform/gucefWEB/src/gucefWEB_CModule.cpp b/platform/gucefWEB/src/gucefWEB_CModule.cpp index 360ae38ec..3a7ae251b 100644 --- a/platform/gucefWEB/src/gucefWEB_CModule.cpp +++ b/platform/gucefWEB/src/gucefWEB_CModule.cpp @@ -23,6 +23,11 @@ // // //-------------------------------------------------------------------------*/ +#ifndef GUCEF_WEB_CWEBGLOBAL_H +#include "gucefWEB_CWebGlobal.h" +#define GUCEF_WEB_CWEBGLOBAL_H +#endif /* GUCEF_WEB_CWEBGLOBAL_H ? */ + #include "gucefWEB_CModule.h" /*-------------------------------------------------------------------------// @@ -43,6 +48,7 @@ namespace WEB { bool CModule::Load( void ) { + CWebGlobal::Instance(); return true; } @@ -51,6 +57,7 @@ CModule::Load( void ) bool CModule::Unload( void ) { + CWebGlobal::Deinstance(); return true; } diff --git a/platform/gucefWEB/src/gucefWEB_CTaskManagerServerResource.cpp b/platform/gucefWEB/src/gucefWEB_CTaskManagerServerResource.cpp index afdaa8cff..4b7614b21 100644 --- a/platform/gucefWEB/src/gucefWEB_CTaskManagerServerResource.cpp +++ b/platform/gucefWEB/src/gucefWEB_CTaskManagerServerResource.cpp @@ -194,18 +194,21 @@ CTaskManagerServerResource::RegisterEventHandlers( void ) TEventCallback callback2( this, &CTaskManagerServerResource::OnThreadPoolDestruction ); SubscribeTo( &taskManager, CORE::CTaskManager::ThreadPoolDestructionEvent, callback2 ); + + TEventCallback callback3( this, &CTaskManagerServerResource::OnThreadPoolUnregistered ); + SubscribeTo( &taskManager, CORE::CTaskManager::ThreadPoolUnregisteredEvent, callback3 ); - TEventCallback callback3( this, &CTaskManagerServerResource::OnGlobalTaskConsumerFactoryRegistered ); - SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskConsumerFactoryRegisteredEvent, callback3 ); + TEventCallback callback4( this, &CTaskManagerServerResource::OnGlobalTaskConsumerFactoryRegistered ); + SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskConsumerFactoryRegisteredEvent, callback4 ); - TEventCallback callback4( this, &CTaskManagerServerResource::OnGlobalTaskConsumerFactoryUnregistered ); - SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskConsumerFactoryUnregisteredEvent, callback4 ); + TEventCallback callback5( this, &CTaskManagerServerResource::OnGlobalTaskConsumerFactoryUnregistered ); + SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskConsumerFactoryUnregisteredEvent, callback5 ); - TEventCallback callback5( this, &CTaskManagerServerResource::OnGlobalTaskDataFactoryRegistered ); - SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskDataFactoryRegisteredEvent, callback5 ); + TEventCallback callback6( this, &CTaskManagerServerResource::OnGlobalTaskDataFactoryRegistered ); + SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskDataFactoryRegisteredEvent, callback6 ); - TEventCallback callback6( this, &CTaskManagerServerResource::OnGlobalTaskDataFactoryUnregistered ); - SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskDataFactoryUnregisteredEvent, callback6 ); + TEventCallback callback7( this, &CTaskManagerServerResource::OnGlobalTaskDataFactoryUnregistered ); + SubscribeTo( &taskManager, CORE::CTaskManager::GlobalTaskDataFactoryUnregisteredEvent, callback7 ); } /*-------------------------------------------------------------------------*/ @@ -376,7 +379,30 @@ CTaskManagerServerResource::OnThreadPoolDestruction( CORE::CNotifier* notifier if ( GUCEF_NULL != m_router ) { - const CORE::CString& poolName = static_cast< CORE::CTaskManager::ThreadPoolCreatedEventData* >( eventData )->GetData(); + const CORE::CString& poolName = static_cast< CORE::CTaskManager::ThreadPoolDestructionEventData* >( eventData )->GetData(); + + m_router->RemoveResourceMapping( m_rootPath + "threadpools/" + poolName ); + m_threadPoolMetaDataMap.erase( poolName ); + m_threadPoolInfoMap.erase( poolName ); + } + } +} + +/*-------------------------------------------------------------------------*/ + +void +CTaskManagerServerResource::OnThreadPoolUnregistered( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ) +{GUCEF_TRACE; + + if ( GUCEF_NULL != m_router ) + { + MT::CScopeWriterLock writeLock( m_rwLock, GUCEF_MT_LONG_LOCK_TIMEOUT ); + + if ( GUCEF_NULL != m_router ) + { + const CORE::CString& poolName = static_cast< CORE::CTaskManager::ThreadPoolUnregisteredEventData* >( eventData )->GetData(); m_router->RemoveResourceMapping( m_rootPath + "threadpools/" + poolName ); m_threadPoolMetaDataMap.erase( poolName ); diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp index 92376fb0e..6e29aefce 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp @@ -832,16 +832,31 @@ CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds if ( notify && !msgs.empty() ) for ( size_t i=batchStartN; iGetPubSubClientFactory().RegisterConcreteFactory( CKafkaPubSubClient::TypeName, &g_kafkaClusterPubSubClientFactory ); GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "Load finished for PUBSUB plugin KAFKA" ); @@ -94,8 +107,17 @@ GUCEFPlugin_Unload( void ) GUCEF_PLUGIN_CALLSPEC_SUFFIX PUBSUB::CPubSubGlobal::Instance()->GetPubSubClientFactory().UnregisterConcreteFactory( CKafkaPubSubClient::TypeName ); - RdKafka::wait_destroyed( 5000 ); + // RdKafka does not expose the global init function + // Without knowing if the factory was used to create an instance we also dont know if we can safely call the shutdown function + // as such we create a dummy instance to force an init of the library indirectly + if ( GUCEF_NULL != g_dummyProducerToInitLibrary ) + { + delete g_dummyProducerToInitLibrary; + g_dummyProducerToInitLibrary = GUCEF_NULL; + RdKafka::wait_destroyed( 5000 ); + } + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "Unload finished for PUBSUB plugin KAFKA" ); } diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp index 08d0e013b..ccc4a5aef 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp @@ -438,7 +438,7 @@ CKafkaPubSubClient::AutoCreateMultiTopicAccess( const TTopicConfigPtrToStringSet if ( newTopicAccessCount > 0 ) { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created " + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created " + CORE::ToString( newTopicAccessCount ) + " topics based on template configs" ); TopicsAccessAutoCreatedEventData eData( topicAccess ); @@ -806,7 +806,7 @@ CKafkaPubSubClient::Disconnect( void ) if ( IsConnected() ) { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Disconnect: Beginning topic disconnect" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Disconnect: Beginning topic disconnect" ); bool totalSuccess = true; TTopicMap::iterator i = m_topicMap.begin(); @@ -816,7 +816,7 @@ CKafkaPubSubClient::Disconnect( void ) ++i; } - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Disconnect: Finished topic disconnect" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Disconnect: Finished topic disconnect" ); return totalSuccess; } @@ -844,7 +844,7 @@ CKafkaPubSubClient::Connect( bool reset ) MT::CScopeMutex lock( m_lock ); - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Connect: Beginning topic connect" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient(" + CORE::PointerToString( this ) + "):Connect: Beginning topic connect" ); bool totalSuccess = true; TTopicMap::iterator i = m_topicMap.begin();