diff --git a/dependencies/librdkafka/src/rdkafka.c b/dependencies/librdkafka/src/rdkafka.c index 99d9c1744..eaebd56f8 100644 --- a/dependencies/librdkafka/src/rdkafka.c +++ b/dependencies/librdkafka/src/rdkafka.c @@ -185,7 +185,7 @@ static void rd_kafka_global_srand(void) { * @returns the current number of active librdkafka instances */ static int rd_kafka_global_cnt_get(void) { - int r; + int r = 0; mtx_lock(&rd_kafka_global_lock); r = rd_kafka_global_cnt; mtx_unlock(&rd_kafka_global_lock); diff --git a/dependencies/librdkafka/src/tinycthread.c b/dependencies/librdkafka/src/tinycthread.c index b0ec8e956..db27a269b 100644 --- a/dependencies/librdkafka/src/tinycthread.c +++ b/dependencies/librdkafka/src/tinycthread.c @@ -97,10 +97,12 @@ void mtx_destroy(mtx_t *mtx) if (!mtx->mTimed) { DeleteCriticalSection(&(mtx->mHandle.cs)); + memset( &(mtx->mHandle.cs), 0, sizeof mtx->mHandle.cs ); } else { CloseHandle(mtx->mHandle.mut); + mtx->mHandle.mut = NULL; } #else pthread_mutex_destroy(mtx); diff --git a/platform/gucefCORE/include/dvcppstringutils.h b/platform/gucefCORE/include/dvcppstringutils.h index e396a19b2..4ba65765b 100644 --- a/platform/gucefCORE/include/dvcppstringutils.h +++ b/platform/gucefCORE/include/dvcppstringutils.h @@ -490,6 +490,7 @@ inline CString ToString( const CUtf8String::StringVector& el ) { CUtf8String out inline CString ToString( const CAsciiString::StringVector& el ) { CAsciiString out; return CUtf8String( out.Combine( el, ',' ) ); } inline CString ToString( const CUtf8String::StringSet& el ) { CUtf8String out; return out.Combine( el, ',' ); } inline CString ToString( const CAsciiString::StringSet& el ) { CAsciiString out; return CUtf8String( out.Combine( el, ',' ) ); } +inline CString ToString( const CUtf8String::StringMap& el ) { CUtf8String out; return out.Combine( el, '=', ',' ); } #endif inline CString ToString( UInt8 value ) { return UInt8ToString( value ); } diff --git a/platform/gucefCORE/include/gucefCORE_CAsciiString.h b/platform/gucefCORE/include/gucefCORE_CAsciiString.h index 58e92259c..dec1421f7 100644 --- a/platform/gucefCORE/include/gucefCORE_CAsciiString.h +++ b/platform/gucefCORE/include/gucefCORE_CAsciiString.h @@ -143,6 +143,8 @@ class GUCEF_CORE_PUBLIC_CPP CAsciiString char operator[]( const UInt32 index ) const; + char& operator[]( const UInt32 index ); + bool IsNULLOrEmpty( void ) const; operator std::string() const; @@ -298,6 +300,8 @@ class GUCEF_CORE_PUBLIC_CPP CAsciiString CAsciiString RemoveChar( const char charToRemove ) const; + bool HasRepeatingChar( const char charToCheck ) const; + CAsciiString CompactRepeatingChar( const char charToCompact ) const; StringVector ParseElements( char seperator , diff --git a/platform/gucefCORE/include/gucefCORE_CTaskManager.h b/platform/gucefCORE/include/gucefCORE_CTaskManager.h index 5244c7756..a9fe3a5f8 100644 --- a/platform/gucefCORE/include/gucefCORE_CTaskManager.h +++ b/platform/gucefCORE/include/gucefCORE_CTaskManager.h @@ -117,6 +117,10 @@ class GUCEF_CORE_PUBLIC_CPP CTaskManager : public CTSGNotifier ThreadPoolPtr GetOrCreateThreadPool( const CString& threadPoolName , bool createIfNotExists = true ); + bool UnregisterThreadPool( const CString& threadPoolName ); + + bool UnregisterThreadPool( ThreadPoolPtr threadPool ); + /** * Queues a task for execution as soon as a thread is available * to execute it. diff --git a/platform/gucefCORE/include/gucefCORE_CUtf8String.h b/platform/gucefCORE/include/gucefCORE_CUtf8String.h index d4067147e..c810c65a9 100644 --- a/platform/gucefCORE/include/gucefCORE_CUtf8String.h +++ b/platform/gucefCORE/include/gucefCORE_CUtf8String.h @@ -499,6 +499,7 @@ class GUCEF_CORE_PUBLIC_CPP CUtf8String CUtf8String Combine( const StringVector& elements, Int32 seperator ) const; CUtf8String Combine( const StringSet& elements, Int32 seperator ) const; + CUtf8String Combine( const StringMap& elements, Int32 valueSeperator, Int32 kvSeperator ) const; void Clear( void ); diff --git a/platform/gucefCORE/include/gucefCORE_CVariant.h b/platform/gucefCORE/include/gucefCORE_CVariant.h index a175823ba..00cff7194 100644 --- a/platform/gucefCORE/include/gucefCORE_CVariant.h +++ b/platform/gucefCORE/include/gucefCORE_CVariant.h @@ -163,6 +163,7 @@ class GUCEF_CORE_PUBLIC_CPP CVariant bool IsInteger( void ) const; bool IsFloat( void ) const; + bool IsNumber( void ) const; bool IsString( void ) const; bool IsBoolean( void ) const; bool IsBinary( void ) const; diff --git a/platform/gucefCORE/src/CConfigStore.cpp b/platform/gucefCORE/src/CConfigStore.cpp index 5f1c6694e..19f35f94c 100644 --- a/platform/gucefCORE/src/CConfigStore.cpp +++ b/platform/gucefCORE/src/CConfigStore.cpp @@ -597,9 +597,10 @@ CConfigStore::ApplyConfigData( const CDataNode& loadedConfig , errorOccured = true; GUCEF_ERROR_LOG( LOGLEVEL_IMPORTANT, "ConfigStore:ApplyConfigData: Loading of config failed for a late-addition configureable with type name \"" + (*i)->GetClassTypeName() + "\"" ); } - m_configureables.insert( (*i) ); - m_newConfigureables.erase( (*i) ); } + + m_configureables.insert( (*i) ); + m_newConfigureables.erase( (*i) ); ++i; } } diff --git a/platform/gucefCORE/src/gucefCORE_CAsciiString.cpp b/platform/gucefCORE/src/gucefCORE_CAsciiString.cpp index 46e7c8332..b1b4a9bee 100644 --- a/platform/gucefCORE/src/gucefCORE_CAsciiString.cpp +++ b/platform/gucefCORE/src/gucefCORE_CAsciiString.cpp @@ -424,6 +424,20 @@ CAsciiString::operator[]( const UInt32 index ) const /*-------------------------------------------------------------------------*/ +char& +CAsciiString::operator[]( const UInt32 index ) +{GUCEF_TRACE; + + static char outOfBoundsChar = '\0'; + if ( index <= m_length ) + return m_string[ index ]; + + GUCEF_ASSERT_ALWAYS; + return outOfBoundsChar; +} + +/*-------------------------------------------------------------------------*/ + bool CAsciiString::operator<( const CAsciiString& other ) const {GUCEF_TRACE; @@ -1140,6 +1154,27 @@ CAsciiString::RemoveChar( const char charToRemove ) const /*-------------------------------------------------------------------------*/ +bool +CAsciiString::HasRepeatingChar( const char charToCheck ) const +{GUCEF_TRACE; + + for ( UInt32 i=0; iGetThreadPoolName() ); +} +/*-------------------------------------------------------------------------*/ + TTaskStatus CTaskManager::QueueTask( const CString& threadPoolName , const CString& taskType , diff --git a/platform/gucefCORE/src/gucefCORE_CUtf8String.cpp b/platform/gucefCORE/src/gucefCORE_CUtf8String.cpp index 4f40ab161..63feafe67 100644 --- a/platform/gucefCORE/src/gucefCORE_CUtf8String.cpp +++ b/platform/gucefCORE/src/gucefCORE_CUtf8String.cpp @@ -2582,6 +2582,95 @@ CUtf8String::Combine( const StringSet& elements, Int32 seperator ) const /*-------------------------------------------------------------------------*/ +CUtf8String +CUtf8String::Combine( const StringMap& elements, Int32 valueSeperator, Int32 kvSeperator ) const +{GUCEF_TRACE; + + if ( elements.empty() && 0 == m_length ) + return CUtf8String(); + + // Determine storage size needed + + size_t valueSepCpSize = utf8codepointsize( valueSeperator ); + size_t kvSepCpSize = utf8codepointsize( kvSeperator ); + size_t totalExtraCpSize = ( valueSepCpSize * elements.size() ) + ( kvSepCpSize * elements.size() ); + + UInt32 bufferSize = (UInt32) totalExtraCpSize; + if ( m_byteSize > 0 ) + bufferSize += m_byteSize + (UInt32) kvSepCpSize; + + StringMap::const_iterator i = elements.begin(); + while ( i != elements.end() ) + { + const CUtf8String& key = (*i).first; + const CUtf8String& value = (*i).second; + + UInt32 keySize = key.IsNULLOrEmpty() ? 0 : key.m_byteSize-1; + UInt32 valueSize = value.IsNULLOrEmpty() ? 0 : value.m_byteSize-1; + bufferSize += keySize + valueSize; + + ++i; + } + + // Allocate total storage + + CUtf8String comboStr; + if ( GUCEF_NULL == comboStr.Reserve( bufferSize ) ) + return CUtf8String::Empty; + + // Write contents + char* cpPos = comboStr.m_string; + size_t bufferSpaceLeft = bufferSize; + if ( GUCEF_NULL != m_string && 0 < m_length ) + { + memcpy( comboStr.m_string, m_string, m_byteSize ); + bufferSpaceLeft -= m_byteSize; + + cpPos = (char*) utf8catcodepoint( cpPos, kvSeperator, bufferSpaceLeft ); + bufferSpaceLeft -= kvSepCpSize; + } + else + { + comboStr.m_string[ 0 ] = '\0'; + } + i = elements.begin(); + while ( i != elements.end() ) + { + const CUtf8String& key = (*i).first; + const CUtf8String& value = (*i).second; + + UInt32 keySize = key.IsNULLOrEmpty() ? 0 : key.m_byteSize-1; + UInt32 valueSize = value.IsNULLOrEmpty() ? 0 : value.m_byteSize-1; + + memcpy( cpPos, key.m_string, keySize ); + cpPos += keySize; + bufferSpaceLeft -= keySize; + + cpPos = (char*) utf8catcodepoint( cpPos, valueSeperator, bufferSpaceLeft ); + bufferSpaceLeft -= valueSepCpSize; + + memcpy( cpPos, value.m_string, valueSize ); + cpPos += valueSize; + bufferSpaceLeft -= valueSize; + + ++i; + if ( i != elements.end() ) + { + cpPos = (char*) utf8catcodepoint( cpPos, kvSeperator, bufferSpaceLeft ); + bufferSpaceLeft -= kvSepCpSize; + } + + *cpPos = '\0'; + } + + // Determine new string length codepoint wise + comboStr.m_length = (UInt32) utf8len_s( comboStr.m_string, comboStr.m_byteSize ); + comboStr.m_string[ comboStr.m_byteSize-1 ] = '\0'; + return comboStr; +} + +/*-------------------------------------------------------------------------*/ + bool CUtf8String::IsFormattingValid( void ) const {GUCEF_TRACE; diff --git a/platform/gucefCORE/src/gucefCORE_CVariant.cpp b/platform/gucefCORE/src/gucefCORE_CVariant.cpp index 97885e233..22cc37358 100644 --- a/platform/gucefCORE/src/gucefCORE_CVariant.cpp +++ b/platform/gucefCORE/src/gucefCORE_CVariant.cpp @@ -432,6 +432,16 @@ CVariant::IsFloat( void ) const /*-------------------------------------------------------------------------*/ +bool +CVariant::IsNumber( void ) const +{GUCEF_TRACE; + + return IsInteger() || IsFloat() || + m_variantData.containedType == GUCEF_DATATYPE_NUMERIC; +} + +/*-------------------------------------------------------------------------*/ + bool CVariant::IsString( void ) const {GUCEF_TRACE; diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouterConfig.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouterConfig.cpp index 7e46d7133..4e31ffa02 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouterConfig.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubFlowRouterConfig.cpp @@ -223,15 +223,27 @@ CPubSubFlowRouterConfig::LoadConfig( const CORE::CDataNode& cfg ) CPubSubFlowRouteConfigPtr route = CPubSubFlowRouteConfig::CreateSharedObj(); if ( route->LoadConfig( *routeNode ) ) { - routes.push_back( route ); + if ( route->fromSideId.IsNULLOrEmpty() && + route->toSideId.IsNULLOrEmpty() && + route->spilloverBufferSideId.IsNULLOrEmpty() && + route->deadLetterSideId.IsNULLOrEmpty() && + route->failoverSideId.IsNULLOrEmpty() ) + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::ToString( this ) + + "):LoadConfig: route node has no side ids specified and is thus invalid. It will be ignored" ); + } + else + { + routes.push_back( route ); + } } ++i; } } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::PointerToString( this ) + - "):LoadConfig: Missing 'routes' config is malformed" ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubFlowRouterConfig(" + CORE::ToString( this ) + + "):LoadConfig: Missing 'routes' thus the config is malformed" ); return false; } diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h index 6959e8536..c13c3dc6b 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClient.h @@ -75,6 +75,8 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB static const CORE::CString TypeName; + typedef std::map< CAwsSqsPubSubClientTopicConfigPtr , CORE::CString::StringSet > TTopicConfigPtrToStringSetMap; + CAwsSqsPubSubClient( const PUBSUB::CPubSubClientConfig& config ); virtual ~CAwsSqsPubSubClient() GUCEF_VIRTUAL_OVERRIDE; @@ -92,6 +94,27 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB virtual PUBSUB::CPubSubClientTopicBasicPtr GetTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; + virtual bool GetMultiTopicAccess( const CORE::CString& topicName , + PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; + + virtual bool GetMultiTopicAccess( const CORE::CString::StringSet& topicNames , + PubSubClientTopicSet& topicAccess ) GUCEF_VIRTUAL_OVERRIDE; + + virtual bool CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator = CORE::PulseGeneratorPtr() ) GUCEF_VIRTUAL_OVERRIDE; + + bool AutoCreateMultiTopicAccess( CAwsSqsPubSubClientTopicConfigPtr templateTopicConfig , + const CORE::CString::StringSet& topicNameList , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ); + + bool AutoCreateMultiTopicAccess( const TTopicConfigPtrToStringSetMap& topicsToCreate , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ); + + void AutoDestroyTopicAccess( const CORE::CString::StringSet& topicNames ); + virtual void DestroyTopicAccess( const CORE::CString& topicName ) GUCEF_VIRTUAL_OVERRIDE; virtual bool BeginTopicDiscovery( const CORE::CString::StringSet& globPatternFilters = CORE::CString::EmptyStringSet ) GUCEF_VIRTUAL_OVERRIDE; @@ -148,6 +171,32 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB PUBSUB::CPubSubClientConfig& GetConfig( void ); Aws::SQS::SQSClient& GetSqsClient( void ); + + /** + * Helper function to extract the queue name from a queue url + */ + static std::string GetQueueNameFromUrl( const std::string& queueUrl ); + + Aws::String GetSqsQueueUrlForQueueName( const CORE::CString& queueName , + Aws::SQS::SQSError& errorCode ); + + bool IsQueueEmpty( const std::string& queueUrl ); + + bool TryGetNrOfMessagesInQueue( const std::string& queueUrl , + CORE::Int64& messageCount ); + + bool TryGetQueueAttributes( const std::string& queueUrl , + CORE::CString::StringMap& attributes ); + + CAwsSqsPubSubClientTopicConfigPtr FindTemplateConfigForTopicName( const CORE::CString& topicName ) const; + + bool GetAllGlobPatternTopicNames( CORE::CString::StringSet& allGlobPatternTopicNames ); + + bool ListAllQueues( const CORE::CString::StringSet& globPatternFilters , + CORE::CString::StringSet& queueNames ); + + bool ListAllQueues( const CORE::CString& globPatternFilter , + CORE::CString::StringSet& queueNames ); private: @@ -156,6 +205,11 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB const CORE::CEvent& eventId , CORE::CICloneable* eventData ); + void + OnQueueIndexingTimerCycle( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ); + void OnTopicHealthStatusChange( CORE::CNotifier* notifier , const CORE::CEvent& eventId , @@ -174,6 +228,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClient : public PUBSUB PUBSUB::CPubSubClientConfig m_config; CORE::CTimer* m_metricsTimer; + CORE::CTimer* m_queueIndexingTimer; TTopicMap m_topicMap; Aws::SQS::SQSClient m_sqsClient; }; diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h index 842cb5a8c..dce557e55 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h @@ -112,6 +112,19 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P virtual void Shutdown( void ); + class TopicMetrics + { + public: + + TopicMetrics( void ); + + CORE::UInt32 sqsMessagesTransmitted; + CORE::UInt32 sqsMessagesReceived; + CORE::UInt32 sqsMessagesInQueue; + CORE::UInt32 sqsMessagesFiltered; + CORE::UInt32 sqsErrorReplies; + }; + void OnMetricsTimerCycle( CORE::CNotifier* notifier , const CORE::CEvent& eventId , @@ -126,6 +139,8 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P virtual const CORE::CString& GetClassTypeName( void ) const GUCEF_VIRTUAL_OVERRIDE; + bool IsQueueEmpty( void ); + protected: virtual MT::TLockStatus Lock( CORE::UInt32 lockWaitTimeoutInMs = GUCEF_MT_DEFAULT_LOCK_TIMEOUT_IN_MS ) const GUCEF_VIRTUAL_OVERRIDE; @@ -142,7 +157,18 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P template< class T > bool - TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMsg* msg, CORE::UInt32& msgByteSize ); + AddAttributesToSqsMsg( T& sqsMsg , + const PUBSUB::CIPubSubMsg::TKeyValuePairs& kvPairs , + bool addPrefix , + const CORE::CAsciiString& prefixToAdd ); + + template< class T > + bool + TranslateToSqsMsg( T& sqsMsg , + const PUBSUB::CIPubSubMsg* msg , + CORE::UInt32& msgByteSize ); + + bool ApplySqsMessageAttributeNameContraints( CORE::CAsciiString& candidateName ); private: @@ -173,7 +199,8 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P /*-------------------------------------------------------------------------*/ -typedef CAwsSqsPubSubClientTopic::TSharedPtrType CAwsSqsPubSubClientTopicPtr; +typedef CAwsSqsPubSubClientTopic::TSharedPtrType CAwsSqsPubSubClientTopicPtr; +typedef CAwsSqsPubSubClientTopic::TBasicSharedPtrType CAwsSqsPubSubClientTopicBasicPtr; /*-------------------------------------------------------------------------// // // diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.h b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.h index 8b06c8a73..84bc6d329 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.h +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.h @@ -83,6 +83,10 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopicConfig : pu bool topicNameIsQueueName; bool tryToUseSendMessageBatch; + bool addPrefixWhenSendingKvPairs; + CORE::CAsciiString kvPairPrefixToAddOnSend; + bool addPrefixWhenSendingMetaDataKvPairs; + CORE::CAsciiString metaDatakvPairPrefixToAddOnSend; }; /*-------------------------------------------------------------------------*/ diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp index df9c57edc..cef2e5c40 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp @@ -51,6 +51,13 @@ #include "pubsubpluginAWSSQS_CAwsSqsPubSubClient.h" +#include +#include +#include +#include +#include +#include + /*-------------------------------------------------------------------------// // // // NAMESPACE // @@ -79,6 +86,7 @@ CAwsSqsPubSubClient::CAwsSqsPubSubClient( const PUBSUB::CPubSubClientConfig& con : PUBSUB::CPubSubClient( config.pulseGenerator ) , m_config( config ) , m_metricsTimer( GUCEF_NULL ) + , m_queueIndexingTimer( GUCEF_NULL ) , m_topicMap() , m_sqsClient( PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->GetDefaultAwsClientConfig() ) {GUCEF_TRACE; @@ -88,11 +96,26 @@ CAwsSqsPubSubClient::CAwsSqsPubSubClient( const PUBSUB::CPubSubClientConfig& con GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "AwsSqsPubSubClient: Failed to load config at construction" ); } + ConfigureJournal( m_config ); + + if ( m_config.pulseGenerator.IsNULL() ) + { + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient: No pulseGenerator provided, will fall back to global one" ); + m_config.pulseGenerator = CORE::CCoreGlobal::Instance()->GetPulseGenerator(); + } + + if ( m_config.topicPulseGenerator.IsNULL() ) + m_config.topicPulseGenerator = m_config.pulseGenerator; + if ( config.desiredFeatures.supportsMetrics ) { - m_metricsTimer = GUCEF_NEW CORE::CTimer( config.pulseGenerator, 1000 ); + m_metricsTimer = GUCEF_NEW CORE::CTimer( m_config.pulseGenerator, 1000 ); m_metricsTimer->SetEnabled( config.desiredFeatures.supportsMetrics ); } + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames ) + { // @TODO: interval + m_queueIndexingTimer = GUCEF_NEW CORE::CTimer( m_config.pulseGenerator, 100000 ); + } m_config.metricsPrefix += "sqs."; @@ -107,6 +130,9 @@ CAwsSqsPubSubClient::~CAwsSqsPubSubClient() GUCEF_DELETE m_metricsTimer; m_metricsTimer = GUCEF_NULL; + GUCEF_DELETE m_queueIndexingTimer; + m_queueIndexingTimer = GUCEF_NULL; + TTopicMap::iterator i = m_topicMap.begin(); while ( i != m_topicMap.end() ) { @@ -197,8 +223,8 @@ CAwsSqsPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& featur features.supportsMsgIndexBasedBookmark = false; // Since SQS is a queue where you consume the messages: You cannot provide a msg index to resume from a given point features.supportsTopicIndexBasedBookmark = false; // Since SQS is a queue where you consume the messages: You cannot provide a msg index to resume from a given point features.supportsMsgDateTimeBasedBookmark = false; // Since SQS is a queue where you consume the messages: You cannot provide a datetime to resume from a given point in time - features.supportsDiscoveryOfAvailableTopics = false; // @TODO: need to look into this - features.supportsGlobPatternTopicNames = false; + features.supportsDiscoveryOfAvailableTopics = true; // All accessable SQS queues can be discovered + features.supportsGlobPatternTopicNames = true; // We support pattern matching based topic creation features.supportsSubscriptionMsgArrivalDelayRequests = false; features.supportsSubscriptionEndOfDataEvent = false; return true; @@ -211,30 +237,49 @@ CAwsSqsPubSubClient::CreateTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topi CORE::PulseGeneratorPtr pulseGenerator ) {GUCEF_TRACE; - CAwsSqsPubSubClientTopicPtr topicAccess; + // Check to see if this logical/conceptual 'topic' represents multiple pattern matched Redis Streams + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicConfig->topicName.HasChar( '*' ) > -1 ) { - MT::CObjectScopeLock lock( this ); - - topicAccess = ( GUCEF_NEW CAwsSqsPubSubClientTopic( this ) )->CreateSharedPtr(); - if ( topicAccess->LoadConfig( *topicConfig ) ) + PubSubClientTopicSet allTopicAccess; + if ( CreateMultiTopicAccess( topicConfig, allTopicAccess, pulseGenerator ) && !allTopicAccess.empty() ) { - m_topicMap[ topicConfig->topicName ] = topicAccess; - RegisterTopicEventHandlers( topicAccess ); - } - else - { - topicAccess->Shutdown(); - topicAccess.Unlink(); + // Caller should really use the CreateMultiTopicAccess() variant + PUBSUB::CPubSubClientTopicBasicPtr tAccess = *allTopicAccess.begin(); + return tAccess; } + return PUBSUB::CPubSubClientTopicPtr(); } - - if ( !topicAccess.IsNULL() ) + else { - TopicAccessCreatedEventData eData( topicConfig->topicName ); - NotifyObservers( TopicAccessCreatedEvent, &eData ); - } + CAwsSqsPubSubClientTopicPtr topicAccess; + { + MT::CObjectScopeLock lock( this ); - return topicAccess; + topicAccess = ( GUCEF_NEW CAwsSqsPubSubClientTopic( this ) )->CreateSharedPtr(); + if ( topicAccess->LoadConfig( *topicConfig ) ) + { + m_topicMap[ topicConfig->topicName ] = topicAccess; + + ConfigureJournal( topicAccess, topicConfig ); + PUBSUB::CIPubSubJournalBasicPtr journal = topicAccess->GetJournal(); + if ( !journal.IsNULL() && topicConfig->journalConfig.useJournal ) + journal->AddTopicCreatedJournalEntry(); + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::PointerToString( this ) + "):CreateTopicAccess: created topic access for topic \"" + topicConfig->topicName + "\"" ); + + lock.EarlyUnlock(); + + TopicAccessCreatedEventData eData( topicConfig->topicName ); + NotifyObservers( TopicAccessCreatedEvent, &eData ); + } + else + { + topicAccess.Unlink(); + } + } + return topicAccess; + } } /*-------------------------------------------------------------------------*/ @@ -271,23 +316,268 @@ CAwsSqsPubSubClient::GetAllCreatedTopicAccess( PubSubClientTopicSet& topicAccess /*-------------------------------------------------------------------------*/ +bool +CAwsSqsPubSubClient::GetMultiTopicAccess( const CORE::CString& topicName , + PubSubClientTopicSet& topicAccess ) +{GUCEF_TRACE; + + // Check to see if this logical/conceptual 'topic' name represents multiple pattern matched Redis Streams + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicName.HasChar( '*' ) > -1 ) + { + // We create the actual topic objects from the wildcard glob pattern topic which is used + // as a template. As such we need to match the pattern again to find the various topics that could have been spawned + bool matchesFound = false; + TTopicMap::iterator i = m_topicMap.begin(); + while ( i != m_topicMap.end() ) + { + if ( (*i).first.WildcardEquals( topicName, '*', true ) ) + { + topicAccess.insert( (*i).second ); + matchesFound = true; + } + ++i; + } + return matchesFound; + } + else + { + TTopicMap::iterator i = m_topicMap.find( topicName ); + if ( i != m_topicMap.end() ) + { + topicAccess.insert( (*i).second ); + return true; + } + return false; + } +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::GetMultiTopicAccess( const CORE::CString::StringSet& topicNames , + PubSubClientTopicSet& topicAccess ) +{GUCEF_TRACE; + + bool totalSuccess = true; + CORE::CString::StringSet::const_iterator i = topicNames.begin(); + while ( i != topicNames.end() ) + { + totalSuccess = GetMultiTopicAccess( (*i), topicAccess ) && totalSuccess; + ++i; + } + return totalSuccess; +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::AutoCreateMultiTopicAccess( const TTopicConfigPtrToStringSetMap& topicsToCreate , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + CORE::UInt32 newTopicAccessCount = 0; + bool totalSuccess = true; + { + MT::CObjectScopeLock lock( this ); + + TTopicConfigPtrToStringSetMap::const_iterator m = topicsToCreate.begin(); + while ( m != topicsToCreate.end() ) + { + PUBSUB::CPubSubClientTopicConfigPtr templateTopicConfig( ((*m).first) ); + if ( !templateTopicConfig.IsNULL() ) + { + const CORE::CString::StringSet& topicNameList = (*m).second; + + CORE::CString::StringSet::const_iterator i = topicNameList.begin(); + while ( i != topicNameList.end() ) + { + CAwsSqsPubSubClientTopicConfigPtr topicConfig = CAwsSqsPubSubClientTopicConfig::CreateSharedObj(); + topicConfig->LoadConfig( *templateTopicConfig.GetPointerAlways() ); + topicConfig->topicName = (*i); + + CAwsSqsPubSubClientTopicPtr tAccess; + { + MT::CObjectScopeLock lock( this ); + + tAccess = ( GUCEF_NEW CAwsSqsPubSubClientTopic( this ) )->CreateSharedPtr(); + if ( tAccess->LoadConfig( *topicConfig ) ) + { + m_topicMap[ topicConfig->topicName ] = tAccess; + topicAccess.insert( tAccess ); + m_config.topics.push_back( topicConfig ); + ++newTopicAccessCount; + + ConfigureJournal( tAccess, topicConfig ); + PUBSUB::CIPubSubJournalBasicPtr journal = tAccess->GetJournal(); + if ( !journal.IsNULL() && topicConfig->journalConfig.useJournal ) + journal->AddTopicCreatedJournalEntry(); + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created topic \"" + + topicConfig->topicName + "\" based on template config \"" + templateTopicConfig->topicName + "\"" ); + } + else + { + tAccess.Unlink(); + totalSuccess = false; + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Failed to load config for topic \"" + + topicConfig->topicName + "\" based on template config \"" + templateTopicConfig->topicName + "\"" ); + } + } + ++i; + } + } + ++m; + } + } + + if ( newTopicAccessCount > 0 ) + { + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::PointerToString( this ) + "):AutoCreateMultiTopicAccess: Auto created " + + CORE::ToString( newTopicAccessCount ) + " topics based on template configs" ); + + TopicsAccessAutoCreatedEventData eData( topicAccess ); + NotifyObservers( TopicsAccessAutoCreatedEvent, &eData ); + } + + return totalSuccess; +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::AutoCreateMultiTopicAccess( CAwsSqsPubSubClientTopicConfigPtr templateTopicConfig , + const CORE::CString::StringSet& topicNameList , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + TTopicConfigPtrToStringSetMap topicToCreate; + topicToCreate[ templateTopicConfig ] = topicNameList; + return AutoCreateMultiTopicAccess( topicToCreate, topicAccess, pulseGenerator ); +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::CreateMultiTopicAccess( PUBSUB::CPubSubClientTopicConfigPtr topicConfig , + PubSubClientTopicSet& topicAccess , + CORE::PulseGeneratorPtr pulseGenerator ) +{GUCEF_TRACE; + + if ( m_config.desiredFeatures.supportsGlobPatternTopicNames && + topicConfig->topicName.HasChar( '*' ) > -1 ) + { + CORE::CString::StringSet topicNameList; + if ( ListAllQueues( topicConfig->topicName , + topicNameList ) ) + { + if ( GUCEF_NULL != m_queueIndexingTimer ) + m_queueIndexingTimer->SetEnabled( true ); + + if ( !topicNameList.empty() ) + return AutoCreateMultiTopicAccess( topicConfig, topicNameList, topicAccess, pulseGenerator ); + return true; // Since its pattern based potential creation at a later time also counts as success + } + return false; + } + else + { + PUBSUB::CPubSubClientTopicBasicPtr tAccess = CreateTopicAccess( topicConfig, pulseGenerator ); + if ( !tAccess.IsNULL() ) + { + topicAccess.insert( tAccess ); + return true; + } + } + return false; +} + +/*-------------------------------------------------------------------------*/ + void CAwsSqsPubSubClient::DestroyTopicAccess( const CORE::CString& topicName ) {GUCEF_TRACE; - MT::CObjectScopeLock lock( this ); - - TTopicMap::iterator i = m_topicMap.find( topicName ); - if ( i != m_topicMap.end() ) { - CAwsSqsPubSubClientTopicPtr topicAccess = (*i).second; - m_topicMap.erase( i ); + MT::CObjectScopeLock lock( this ); - TopicAccessDestroyedEventData eData( topicName ); - NotifyObservers( TopicAccessDestroyedEvent, &eData ); - - topicAccess->Shutdown(); - topicAccess.Unlink(); + TTopicMap::iterator i = m_topicMap.find( topicName ); + if ( i != m_topicMap.end() ) + { + CAwsSqsPubSubClientTopicPtr topicAccess = (*i).second; + m_topicMap.erase( i ); + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::ToString( this ) + "):DestroyTopicAccess: destroyed topic access for topic \"" + topicName + "\"" ); + + topicAccess->Shutdown(); + topicAccess.Unlink(); + } + } + + TopicAccessDestroyedEventData eData( topicName ); + NotifyObservers( TopicAccessDestroyedEvent, &eData ); +} + +/*-------------------------------------------------------------------------*/ + +void +CAwsSqsPubSubClient::AutoDestroyTopicAccess( const CORE::CString::StringSet& topicNames ) +{GUCEF_TRACE; + + PubSubClientTopicSet topicAccess; + { + MT::CObjectScopeLock lock( this ); + + CORE::CString::StringSet::const_iterator t = topicNames.begin(); + while ( t != topicNames.end() ) + { + const CORE::CString& topicName = (*t); + TTopicMap::iterator i = m_topicMap.find( topicName ); + if ( i != m_topicMap.end() ) + { + CAwsSqsPubSubClientTopicPtr tAccess = (*i).second; + topicAccess.insert( tAccess ); + } + ++t; + } + } + + if ( !topicAccess.empty() ) + { + TopicsAccessAutoDestroyedEventData eData( topicAccess ); + NotifyObservers( TopicsAccessAutoDestroyedEvent, &eData ); + + { + // Now that everyone has been duely warned we can proceed with the actual destruction + MT::CObjectScopeLock lock( this ); + + CORE::UInt32 destroyedTopicAccessCount = 0; + PubSubClientTopicSet::iterator t = topicAccess.begin(); + while ( t != topicAccess.end() ) + { + PUBSUB::CPubSubClientTopicBasicPtr tAccess = (*t); + + CORE::CString topicName = tAccess->GetTopicName(); + m_topicMap.erase( topicName ); + { + CAwsSqsPubSubClientTopicBasicPtr topicAccess = tAccess.StaticCast< CAwsSqsPubSubClientTopic >(); + topicAccess->Shutdown(); + topicAccess.Unlink(); + } + tAccess.Unlink(); + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::ToString( this ) + "):AutoDestroyTopicAccess: destroyed topic access for topic \"" + topicName + "\"" ); + + ++destroyedTopicAccessCount; + ++t; + } + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::ToString( this ) + "):AutoDestroyTopicAccess: destroyed topic access for " + + CORE::ToString( destroyedTopicAccessCount ) + "topics" ); + } } } @@ -343,10 +633,134 @@ CAwsSqsPubSubClient::GetOrCreateTopicConfig( const CORE::CString& topicName ) /*-------------------------------------------------------------------------*/ +Aws::String +CAwsSqsPubSubClient::GetSqsQueueUrlForQueueName( const CORE::CString& queueName , + Aws::SQS::SQSError& errorCode ) +{GUCEF_TRACE; + + try + { + Aws::SQS::Model::GetQueueUrlRequest gqu_req; + gqu_req.SetQueueName( queueName ); + + Aws::SQS::Model::GetQueueUrlOutcome gqu_out = m_sqsClient.GetQueueUrl( gqu_req ); + if ( gqu_out.IsSuccess() ) + { + const Aws::String& queueUrl = gqu_out.GetResult().GetQueueUrl(); + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient:GetSqsQueueUrlForQueueName: Resolved queue name \"" + queueName = "\" to URL: " + queueUrl ); + return queueUrl; + } + else + { + errorCode = gqu_out.GetError(); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient:GetSqsQueueUrlForQueueName: Error getting URL for queue name \"" + queueName = "\":" + gqu_out.GetError().GetMessage() ); + } + } + catch ( const std::exception& e ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, CORE::CString( "AwsSqsPubSubClient:GetSqsQueueUrlForQueueName: experienced an exception: " ) + e.what() ); + } + catch ( ... ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, "AwsSqsPubSubClient:GetSqsQueueUrlForQueueName: experienced an unknown exception, your application may be unstable" ); + } + + return Aws::String(); +} + +/*-------------------------------------------------------------------------*/ + +std::string +CAwsSqsPubSubClient::GetQueueNameFromUrl( const std::string& queueUrl ) +{GUCEF_TRACE; + + // Find the last slash in the URL + size_t pos = queueUrl.find_last_of('/'); + if (pos != std::string::npos) { + // Extract and return the queue name + return queueUrl.substr(pos + 1); + } + return ""; +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::ListAllQueues( const CORE::CString::StringSet& globPatternFilters , + CORE::CString::StringSet& queueNames ) +{GUCEF_TRACE; + + try + { + Aws::SQS::Model::ListQueuesRequest listQueuesRequest; + Aws::SQS::Model::ListQueuesOutcome listQueuesOutcome = m_sqsClient.ListQueues( listQueuesRequest ); + + if ( listQueuesOutcome.IsSuccess() ) + { + const Aws::Vector< Aws::String >& queueUrls = listQueuesOutcome.GetResult().GetQueueUrls(); + Aws::Vector< Aws::String >::const_iterator i = queueUrls.begin(); + while ( i != queueUrls.end() ) + { + const Aws::String& url = (*i); + CORE::CString queueName = GetQueueNameFromUrl( url ); + + if ( !globPatternFilters.empty() ) + { + if ( queueName.WildcardEquals( globPatternFilters ) ) + queueNames.insert( queueName ); + } + else + { + queueNames.insert( queueName ); + } + ++i; + } + return true; + } + else + { + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "AwsSqsPubSubClient:ListAllQueues: Error listing queues: " + + listQueuesOutcome.GetError().GetMessage() ); + return false; + } + } + catch ( const std::exception& e ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "AwsSqsPubSubClient:ListAllQueues: Exception occured: " + CORE::CString( e.what() ) ); + return false; + } +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::ListAllQueues( const CORE::CString& globPatternFilter , + CORE::CString::StringSet& queueNames ) +{GUCEF_TRACE; + + CORE::CString::StringSet filters; + filters.insert( globPatternFilter ); + + return ListAllQueues( filters, queueNames ); +} + +/*-------------------------------------------------------------------------*/ + bool CAwsSqsPubSubClient::BeginTopicDiscovery( const CORE::CString::StringSet& globPatternFilters ) {GUCEF_TRACE; + TopicDiscoveryEventData topicNames; + if ( ListAllQueues( globPatternFilters, topicNames ) && !topicNames.empty() ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "AwsSqsPubSubClient:BeginTopicDiscovery: obtained list of " + + CORE::ToString( topicNames.size() ) + " queue names to be used as topic names" ); + + NotifyObservers( TopicDiscoveryEvent, &topicNames ); + return true; + } return false; } @@ -512,6 +926,13 @@ CAwsSqsPubSubClient::RegisterEventHandlers( void ) CORE::CTimer::TimerUpdateEvent , callback ); } + if ( GUCEF_NULL != m_queueIndexingTimer ) + { + TEventCallback callback( this, &CAwsSqsPubSubClient::OnQueueIndexingTimerCycle ); + SubscribeTo( m_queueIndexingTimer , + CORE::CTimer::TimerUpdateEvent , + callback ); + } } /*-------------------------------------------------------------------------*/ @@ -531,6 +952,240 @@ CAwsSqsPubSubClient::RegisterTopicEventHandlers( CAwsSqsPubSubClientTopicPtr& to /*-------------------------------------------------------------------------*/ +CAwsSqsPubSubClientTopicConfigPtr +CAwsSqsPubSubClient::FindTemplateConfigForTopicName( const CORE::CString& topicName ) const +{GUCEF_TRACE; + + MT::CObjectScopeLock lock( this ); + + PUBSUB::CPubSubClientConfig::TPubSubClientTopicConfigPtrVector::const_iterator i = m_config.topics.begin(); + while ( i != m_config.topics.end() ) + { + if ( (*i)->topicName.HasChar( '*' ) > -1 ) + { + if ( topicName.WildcardEquals( (*i)->topicName, '*', true ) ) + { + return (*i); + } + } + ++i; + } + return CAwsSqsPubSubClientTopicConfigPtr(); +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::GetAllGlobPatternTopicNames( CORE::CString::StringSet& allGlobPatternTopicNames ) +{GUCEF_TRACE; + + // Check config'd topic + PUBSUB::CPubSubClientConfig::TPubSubClientTopicConfigPtrVector::iterator i = m_config.topics.begin(); + while ( i != m_config.topics.end() ) + { + if ( (*i)->topicName.HasChar( '*' ) > -1 ) + { + allGlobPatternTopicNames.insert( (*i)->topicName ); + } + ++i; + } + + return true; +} + +/*-------------------------------------------------------------------------*/ + +void +CAwsSqsPubSubClient::OnQueueIndexingTimerCycle( CORE::CNotifier* notifier , + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ) +{GUCEF_TRACE; + + GUCEF_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "AwsSqsPubSubClient(" + CORE::ToString( this ) + "):OnQueueIndexingTimerCycle: Checking for new queues matching glob pattterns" ); + + // First get a new list of all glob patterns we need to match + CORE::CString::StringSet allGlobPatternTopicNames; + if( !GetAllGlobPatternTopicNames( allGlobPatternTopicNames ) ) + return; + + // Next we will need to fetch all stream names in the cluster + CORE::CString::StringSet allMatchingQueueNames; + if ( ListAllQueues( allGlobPatternTopicNames , + allMatchingQueueNames ) ) + { + // Check created topics, filtering the ones we already created + TTopicMap::iterator t = m_topicMap.begin(); + while ( t != m_topicMap.end() ) + { + CORE::CString::StringSet::iterator i = allMatchingQueueNames.find( (*t).first ); + if ( i != allMatchingQueueNames.end() ) + { + allMatchingQueueNames.erase( i ); + } + ++t; + } + + // Now we automatically create topic access for all the remaining topics + if ( !allMatchingQueueNames.empty() ) + { + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient(" + CORE::ToString( this ) + "):OnQueueIndexingTimerCycle: Found " + + CORE::ToString( allMatchingQueueNames.size() ) + " new queues that match glob patterns. Will attempt to auto create topics for these" ); + } + + CORE::CString::StringSet::iterator n = allMatchingQueueNames.begin(); + while ( n != allMatchingQueueNames.end() ) + { + CAwsSqsPubSubClientTopicConfigPtr templateConfig = FindTemplateConfigForTopicName( (*n) ); + if ( !templateConfig.IsNULL() ) + { + CORE::CString::StringSet topicNameList; + topicNameList.insert( (*n) ); + + PubSubClientTopicSet topicAccess; + + AutoCreateMultiTopicAccess( templateConfig, topicNameList, topicAccess, m_config.topicPulseGenerator ); + } + ++n; + } + } +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::IsQueueEmpty( const std::string& queueUrl ) +{GUCEF_TRACE; + + CORE::Int64 messageCount = -1; + TryGetNrOfMessagesInQueue( queueUrl, messageCount ); + return 0 == messageCount; +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::TryGetNrOfMessagesInQueue( const std::string& queueUrl , + CORE::Int64& messageCount ) +{GUCEF_TRACE; + + try + { + // Create a request to get queue attributes + Aws::SQS::Model::GetQueueAttributesRequest getQueueAttributesRequest; + getQueueAttributesRequest.SetQueueUrl( queueUrl ); + getQueueAttributesRequest.AddAttributeNames( Aws::SQS::Model::QueueAttributeName::ApproximateNumberOfMessages ); + + // Call GetQueueAttributes + Aws::SQS::Model::GetQueueAttributesOutcome getQueueAttributesOutcome = m_sqsClient.GetQueueAttributes( getQueueAttributesRequest ); + + if ( getQueueAttributesOutcome.IsSuccess() ) + { + const Aws::Map& attributes = getQueueAttributesOutcome.GetResult().GetAttributes(); + auto it = attributes.find( Aws::SQS::Model::QueueAttributeName::ApproximateNumberOfMessages ); + if ( it != attributes.end() ) + { + messageCount = CORE::StringToInt64( it->second, messageCount ); + return true; + } + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient:TryGetNrOfMessagesInQueue: Error getting queue attributes: " + + getQueueAttributesOutcome.GetError().GetMessage() ); + } + return false; + } + catch ( const std::exception& e ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TryGetNrOfMessagesInQueue: exception occured: " + CORE::CString( e.what() ) ); + return false; + } +} + +/*-------------------------------------------------------------------------*/ + +bool +CAwsSqsPubSubClient::TryGetQueueAttributes( const std::string& queueUrl , + CORE::CString::StringMap& attributes ) +{GUCEF_TRACE; + + try + { + // Create a request to get queue attributes + Aws::SQS::Model::GetQueueAttributesRequest getQueueAttributesRequest; + getQueueAttributesRequest.SetQueueUrl( queueUrl ); + getQueueAttributesRequest.AddAttributeNames( Aws::SQS::Model::QueueAttributeName::All ); + + // Call GetQueueAttributes + Aws::SQS::Model::GetQueueAttributesOutcome getQueueAttributesOutcome = m_sqsClient.GetQueueAttributes( getQueueAttributesRequest ); + + if ( getQueueAttributesOutcome.IsSuccess() ) + { + const Aws::Map& awsAttributes = getQueueAttributesOutcome.GetResult().GetAttributes(); + + Aws::Map::const_iterator i = awsAttributes.begin(); + while ( i != awsAttributes.end() ) + { + Aws::SQS::Model::QueueAttributeName enumName = (*i).first; + const Aws::String& value = (*i).second; + + switch ( enumName ) + { + case Aws::SQS::Model::QueueAttributeName::NOT_SET : attributes[ "NOT_SET" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::All : attributes[ "All" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::Policy : attributes[ "Policy" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::VisibilityTimeout : attributes[ "VisibilityTimeout" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::MaximumMessageSize : attributes[ "MaximumMessageSize" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::MessageRetentionPeriod : attributes[ "MessageRetentionPeriod" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ApproximateNumberOfMessages : attributes[ "ApproximateNumberOfMessages" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ApproximateNumberOfMessagesNotVisible : attributes[ "ApproximateNumberOfMessagesNotVisible" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::CreatedTimestamp : attributes[ "CreatedTimestamp" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::LastModifiedTimestamp : attributes[ "LastModifiedTimestamp" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::QueueArn : attributes[ "QueueArn" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ApproximateNumberOfMessagesDelayed : attributes[ "ApproximateNumberOfMessagesDelayed" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::DelaySeconds : attributes[ "DelaySeconds" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ReceiveMessageWaitTimeSeconds : attributes[ "ReceiveMessageWaitTimeSeconds" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::RedrivePolicy : attributes[ "RedrivePolicy" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::FifoQueue : attributes[ "FifoQueue" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ContentBasedDeduplication : attributes[ "ContentBasedDeduplication" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::KmsMasterKeyId : attributes[ "KmsMasterKeyId" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::KmsDataKeyReusePeriodSeconds : attributes[ "KmsDataKeyReusePeriodSeconds" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::DeduplicationScope : attributes[ "DeduplicationScope" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::FifoThroughputLimit : attributes[ "FifoThroughputLimit" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::RedriveAllowPolicy : attributes[ "RedriveAllowPolicy" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::SqsManagedSseEnabled : attributes[ "SqsManagedSseEnabled" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::SentTimestamp : attributes[ "SentTimestamp" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ApproximateFirstReceiveTimestamp : attributes[ "ApproximateFirstReceiveTimestamp" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::ApproximateReceiveCount : attributes[ "ApproximateReceiveCount" ] = value; break; + case Aws::SQS::Model::QueueAttributeName::SenderId : attributes[ "SenderId" ] = value; break; + + default: + { + attributes[ CORE::ToString( static_cast< CORE::Int64 >( enumName ) ) ] = value; + break; + } + } + + ++i; + } + return true; + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClient:TryGetQueueAttributes: Error getting queue attributes: " + + getQueueAttributesOutcome.GetError().GetMessage() ); + } + return false; + } + catch ( const std::exception& e ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TryGetQueueAttributes: exception occured: " + CORE::CString( e.what() ) ); + return false; + } +} + +/*-------------------------------------------------------------------------*/ + void CAwsSqsPubSubClient::OnTopicHealthStatusChange( CORE::CNotifier* notifier , const CORE::CEvent& eventId , diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp index 05e488d13..1795d5aa8 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp @@ -153,10 +153,13 @@ void CAwsSqsPubSubClientTopic::RegisterEventHandlers( void ) {GUCEF_TRACE; - TEventCallback callback( this, &CAwsSqsPubSubClientTopic::OnPulseCycle ); - SubscribeTo( m_client->GetConfig().pulseGenerator.GetPointerAlways() , - CORE::CPulseGenerator::PulseEvent , - callback ); + if ( GUCEF_NULL != m_client ) + { + TEventCallback callback( this, &CAwsSqsPubSubClientTopic::OnPulseCycle ); + SubscribeTo( m_client->GetConfig().pulseGenerator.GetPointerAlways() , + CORE::CPulseGenerator::PulseEvent , + callback ); + } } /*-------------------------------------------------------------------------*/ @@ -232,6 +235,9 @@ CAwsSqsPubSubClientTopic::Publish( CORE::UInt64& publishActionId, const PUBSUB:: bool success = false; MT::CScopeMutex lock( m_lock ); + if ( GUCEF_NULL == m_client ) + return false; + if ( 0 == publishActionId ) { publishActionId = m_currentPublishActionId; @@ -321,47 +327,82 @@ CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CPubSubBookmark& boo /*-------------------------------------------------------------------------*/ -template< class T > bool -CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMsg* msg, CORE::UInt32& msgByteSize ) +CAwsSqsPubSubClientTopic::ApplySqsMessageAttributeNameContraints( CORE::CAsciiString& candidateName ) {GUCEF_TRACE; - if ( GUCEF_NULL == msg ) - { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: NULL Message passed" ); - return false; - } + // From the AWS documentation: + // Name – The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore (_), hyphen (-), and period (.). The following restrictions apply: + // - Can be up to 256 characters long + // - Can't start with AWS. or Amazon. (or any casing variations) + // - Is case-sensitive + // - Must be unique among all attribute names for the message + // - Must not start or end with a period + // - Must not have periods in a sequence + + bool wasAdjusted = false; - const CORE::CVariant& bodyPayload = msg->GetPrimaryPayload(); - if ( bodyPayload.IsInitialized() ) + if ( !candidateName.IsNULLOrEmpty() ) { - // A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: - // #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF - // The minimum size is one character. The maximum size is 256 KB. - - // We request the payload as a string. Note that this auto converts - // Binary is Base64 encoded. For SQS strings are Unicode with UTF-8 binary encoding - CORE::CUtf8String bodyPayloadStr = bodyPayload.AsUtf8String(); - - if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= SQSCLIENT_MAX_PAYLOAD_SIZE ) + // Take care of this requirement: 'Can't start with AWS. or Amazon.' + CORE::CAsciiString lcTestString = candidateName.Lowercase(); + if ( 0 == lcTestString.HasSubstr( "aws" ) ) { - sqsMsg.SetMessageBody( bodyPayloadStr ); + candidateName = candidateName.CutChars( 3, true ); + wasAdjusted = true; } else + if ( 0 == lcTestString.HasSubstr( "amazon" ) ) { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: Message body size as string has an invalid size. Must be between 1-256KB. Cannot translate" ); - return false; + candidateName = candidateName.CutChars( 6, true ); + wasAdjusted = true; + } + + if ( !candidateName.IsNULLOrEmpty() ) + { + // Take care of this requirement: 'Must not start or end with a period' + if ( '.' == candidateName[ 0 ] ) + { + candidateName[ 0 ] = '_'; + wasAdjusted = true; + } + if ( '.' == candidateName[ candidateName.Length()-1 ] ) + { + candidateName[ candidateName.Length()-1 ] = '_'; + wasAdjusted = true; + } + + // Take care of this requirement: 'Must not have periods in a sequence' + if ( candidateName.HasRepeatingChar( '.' ) ) + { + candidateName = candidateName.CompactRepeatingChar( '.' ); + wasAdjusted = true; + } + + // Take care of this requirement: 'Can be up to 256 characters long' + if ( candidateName.Length() > 256 ) + { + candidateName = candidateName.CutChars( candidateName.Length() - 256, false ); + wasAdjusted = true; + } } - } - else - { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: Message does not have a body which SQS does not allow. Cannot publish" ); - return false; } - const PUBSUB::CIPubSubMsg::TKeyValuePairs& kvPairs = msg->GetKeyValuePairs(); + return wasAdjusted; +} + +/*-------------------------------------------------------------------------*/ + +template< class T > +bool +CAwsSqsPubSubClientTopic::AddAttributesToSqsMsg( T& sqsMsg , + const PUBSUB::CIPubSubMsg::TKeyValuePairs& kvPairs , + bool addPrefix , + const CORE::CAsciiString& prefixToAdd ) +{GUCEF_TRACE; + if ( !kvPairs.empty() ) - { + { typedef Aws::Map< Aws::String, Aws::SQS::Model::MessageAttributeValue > TSqsMsgAttributeMap; TSqsMsgAttributeMap attribs; @@ -376,24 +417,30 @@ CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMs { Aws::SQS::Model::MessageAttributeValue sqsAttribValue; - if ( kvValue.IsBinary() ) - { - sqsAttribValue.SetDataType( "Binary" ); - sqsAttribValue.SetBinaryValue( Aws::Utils::ByteBuffer( static_cast< const unsigned char* >( kvValue.AsVoidPtr() ), kvValue.ByteSize() ) ); - } - else if ( kvValue.IsString() ) { sqsAttribValue.SetDataType( "String" ); sqsAttribValue.SetStringValue( kvValue.AsUtf8String() ); } else + if ( kvValue.IsNumber() ) { sqsAttribValue.SetDataType( "StringValue" ); sqsAttribValue.SetStringValue( kvValue.AsUtf8String() ); } - - sqsMsg.AddMessageAttributes( kvKey.AsUtf8String(), sqsAttribValue ); // if there are dupicate keys last one wins + else // kvValue.IsBinary() ? catch-all + { + sqsAttribValue.SetDataType( "Binary" ); + sqsAttribValue.SetBinaryValue( Aws::Utils::ByteBuffer( static_cast< const unsigned char* >( kvValue.AsVoidPtr() ), kvValue.ByteSize() ) ); + } + + CORE::CAsciiString kvKeyName = kvKey.AsAsciiString(); + if ( addPrefix ) + { + kvKeyName = prefixToAdd + kvKeyName; + } + ApplySqsMessageAttributeNameContraints( kvKeyName ); + sqsMsg.AddMessageAttributes( kvKeyName, sqsAttribValue ); // if there are dupicate keys last one wins } else { @@ -408,13 +455,65 @@ CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMs /*-------------------------------------------------------------------------*/ +template< class T > +bool +CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMsg* msg, CORE::UInt32& msgByteSize ) +{GUCEF_TRACE; + + if ( GUCEF_NULL == msg ) + { + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: NULL Message passed" ); + return false; + } + + const CORE::CVariant& bodyPayload = msg->GetPrimaryPayload(); + if ( bodyPayload.IsInitialized() ) + { + // A message can include only XML, JSON, and unformatted text. The following Unicode characters are allowed: + // #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF + // The minimum size is one character. The maximum size is 256 KB. + + // We request the payload as a string. Note that this auto converts + // Binary is Base64 encoded. For SQS strings are Unicode with UTF-8 binary encoding + CORE::CUtf8String bodyPayloadStr = bodyPayload.AsUtf8String(); + + if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= SQSCLIENT_MAX_PAYLOAD_SIZE ) + { + sqsMsg.SetMessageBody( bodyPayloadStr ); + } + else + { + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: Message body size as string has an invalid size. Must be between 1-256KB. Cannot translate" ); + return false; + } + } + else + { + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:TranslateToSqsMsg: Message does not have a body which SQS does not allow. Cannot publish" ); + return false; + } + + bool totalSuccess = true; + const PUBSUB::CIPubSubMsg::TKeyValuePairs& kvPairs = msg->GetKeyValuePairs(); + totalSuccess = AddAttributesToSqsMsg( sqsMsg, kvPairs, m_config.addPrefixWhenSendingKvPairs, m_config.kvPairPrefixToAddOnSend ) && totalSuccess; + const PUBSUB::CIPubSubMsg::TKeyValuePairs& metaKvPairs = msg->GetMetaDataKeyValuePairs(); + totalSuccess = AddAttributesToSqsMsg( sqsMsg, metaKvPairs, m_config.addPrefixWhenSendingMetaDataKvPairs, m_config.metaDatakvPairPrefixToAddOnSend ) && totalSuccess; + + return totalSuccess; +} + +/*-------------------------------------------------------------------------*/ + bool -CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds , +CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds , const PUBSUB::CIPubSubMsg::TIPubSubMsgConstRawPtrVector& msgs , - bool notify ) + bool notify ) {GUCEF_TRACE; MT::CScopeMutex lock( m_lock ); + + if ( GUCEF_NULL == m_client ) + return false; bool totalSuccess = true; try @@ -539,6 +638,17 @@ CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds /*-------------------------------------------------------------------------*/ +bool +CAwsSqsPubSubClientTopic::IsQueueEmpty( void ) +{GUCEF_TRACE; + + if ( GUCEF_NULL != m_client ) + m_client->IsQueueEmpty( m_queueUrl ); + return false; +} + +/*-------------------------------------------------------------------------*/ + void CAwsSqsPubSubClientTopic::OnPulseCycle( CORE::CNotifier* notifier , const CORE::CEvent& eventId , @@ -579,11 +689,6 @@ CAwsSqsPubSubClientTopic::LoadConfig( const PUBSUB::CPubSubClientTopicConfig& co m_config = config; - if ( m_config.topicNameIsQueueName ) - m_queueUrl = GetSqsQueueUrlForQueueName( m_config.topicName ); - else - m_queueUrl = m_config.topicName; - return true; } @@ -693,15 +798,50 @@ CAwsSqsPubSubClientTopic::InitializeConnectivity( bool reset ) MT::CScopeMutex lock( m_lock ); + if ( GUCEF_NULL != m_client ) + { + if ( m_config.topicNameIsQueueName ) + m_queueUrl = GetSqsQueueUrlForQueueName( m_config.topicName ); + else + m_queueUrl = m_config.topicName; + + if ( !m_queueUrl.empty() ) + { + CORE::CString::StringMap queueAttributes; + if ( m_client->TryGetQueueAttributes( m_queueUrl, queueAttributes ) ) + { + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:InitializeConnectivity: Retrieved attributes for queue \"" + CORE::ToString( m_queueUrl ) + + "\" which are as follows: " + CORE::ToString( queueAttributes ) ); + return true; + } + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:InitializeConnectivity: No queue URL could be determined thus cannot use SQS queue" ); + } + } + return false; } /*-------------------------------------------------------------------------*/ +CAwsSqsPubSubClientTopic::TopicMetrics::TopicMetrics( void ) + : sqsMessagesTransmitted( 0 ) + , sqsMessagesReceived( 0 ) + , sqsMessagesInQueue( 0 ) + , sqsMessagesFiltered( 0 ) + , sqsErrorReplies( 0 ) +{GUCEF_TRACE; + +} + +/*-------------------------------------------------------------------------*/ + void CAwsSqsPubSubClientTopic::OnMetricsTimerCycle( CORE::CNotifier* notifier , - const CORE::CEvent& eventId , - CORE::CICloneable* eventData ) + const CORE::CEvent& eventId , + CORE::CICloneable* eventData ) {GUCEF_TRACE; diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.cpp index 807db655a..46bbc6d18 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopicConfig.cpp @@ -50,6 +50,10 @@ CAwsSqsPubSubClientTopicConfig::CAwsSqsPubSubClientTopicConfig( void ) , CORE::CTSharedObjCreator< CAwsSqsPubSubClientTopicConfig, MT::CMutex >( this ) , topicNameIsQueueName( false ) , tryToUseSendMessageBatch( false ) + , addPrefixWhenSendingKvPairs( false ) + , kvPairPrefixToAddOnSend() + , addPrefixWhenSendingMetaDataKvPairs( false ) + , metaDatakvPairPrefixToAddOnSend() {GUCEF_TRACE; } @@ -61,6 +65,10 @@ CAwsSqsPubSubClientTopicConfig::CAwsSqsPubSubClientTopicConfig( const CAwsSqsPub , CORE::CTSharedObjCreator< CAwsSqsPubSubClientTopicConfig, MT::CMutex >( this ) , topicNameIsQueueName( src.topicNameIsQueueName ) , tryToUseSendMessageBatch( src.tryToUseSendMessageBatch ) + , addPrefixWhenSendingKvPairs( src.addPrefixWhenSendingKvPairs ) + , kvPairPrefixToAddOnSend( src.kvPairPrefixToAddOnSend ) + , addPrefixWhenSendingMetaDataKvPairs( src.addPrefixWhenSendingMetaDataKvPairs ) + , metaDatakvPairPrefixToAddOnSend( src.metaDatakvPairPrefixToAddOnSend ) {GUCEF_TRACE; } @@ -72,6 +80,10 @@ CAwsSqsPubSubClientTopicConfig::CAwsSqsPubSubClientTopicConfig( const PUBSUB::CP , CORE::CTSharedObjCreator< CAwsSqsPubSubClientTopicConfig, MT::CMutex >( this ) , topicNameIsQueueName( false ) , tryToUseSendMessageBatch( false ) + , addPrefixWhenSendingKvPairs( false ) + , kvPairPrefixToAddOnSend() + , addPrefixWhenSendingMetaDataKvPairs( false ) + , metaDatakvPairPrefixToAddOnSend() {GUCEF_TRACE; LoadCustomConfig( genericConfig.customConfig ); @@ -92,7 +104,10 @@ CAwsSqsPubSubClientTopicConfig::LoadCustomConfig( const CORE::CDataNode& config topicNameIsQueueName = config.GetAttributeValueOrChildValueByName( "topicNameIsQueueName" ).AsBool( topicNameIsQueueName, true ); tryToUseSendMessageBatch = config.GetAttributeValueOrChildValueByName( "tryToUseSendMessageBatch" ).AsBool( tryToUseSendMessageBatch, true ); - + addPrefixWhenSendingKvPairs = config.GetAttributeValueOrChildValueByName( "addPrefixWhenSendingKvPairs" ).AsBool( addPrefixWhenSendingKvPairs, true ); + kvPairPrefixToAddOnSend = config.GetAttributeValueOrChildValueByName( "kvPairPrefixToAddOnSend" ).AsAsciiString( kvPairPrefixToAddOnSend, true ); + addPrefixWhenSendingMetaDataKvPairs = config.GetAttributeValueOrChildValueByName( "addPrefixWhenSendingMetaDataKvPairs" ).AsBool( addPrefixWhenSendingMetaDataKvPairs, true ); + metaDatakvPairPrefixToAddOnSend = config.GetAttributeValueOrChildValueByName( "metaDatakvPairPrefixToAddOnSend" ).AsAsciiString( metaDatakvPairPrefixToAddOnSend, true ); return true; } @@ -135,6 +150,10 @@ CAwsSqsPubSubClientTopicConfig::operator=( const CAwsSqsPubSubClientTopicConfig& PUBSUB::CPubSubClientTopicConfig::operator=( src ); topicNameIsQueueName = src.topicNameIsQueueName; tryToUseSendMessageBatch = src.tryToUseSendMessageBatch; + addPrefixWhenSendingKvPairs = addPrefixWhenSendingKvPairs; + kvPairPrefixToAddOnSend = src.kvPairPrefixToAddOnSend; + addPrefixWhenSendingMetaDataKvPairs = src.addPrefixWhenSendingMetaDataKvPairs; + metaDatakvPairPrefixToAddOnSend = src.metaDatakvPairPrefixToAddOnSend; } return *this; } diff --git a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp index 41b348492..08d0e013b 100644 --- a/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginKAFKA/src/pubsubpluginKAFKA_CKafkaPubSubClient.cpp @@ -711,6 +711,7 @@ CKafkaPubSubClient::SetupBasedOnConfig( void ) // Use the global pulse generator. // NOT what you want if you want thread isolation or basically any time you are not // writing a single threaded app + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClient:SetupBasedOnConfig: No pulseGenerator provided, will fall back to global one" ); m_config.pulseGenerator = CORE::CCoreGlobal::Instance()->GetPulseGenerator(); } diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp index a33ff772c..6bd6e6e6f 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClient.cpp @@ -100,7 +100,10 @@ CRedisClusterPubSubClient::CRedisClusterPubSubClient( const PUBSUB::CPubSubClien ConfigureJournal( m_config ); if ( m_config.pulseGenerator.IsNULL() ) + { + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClient: No pulseGenerator provided, will fall back to global one" ); m_config.pulseGenerator = CORE::CCoreGlobal::Instance()->GetPulseGenerator(); + } if ( m_config.topicPulseGenerator.IsNULL() ) m_config.topicPulseGenerator = m_config.pulseGenerator; @@ -112,11 +115,11 @@ CRedisClusterPubSubClient::CRedisClusterPubSubClient( const PUBSUB::CPubSubClien } if ( m_config.desiredFeatures.supportsAutoReconnect ) { - m_redisReconnectTimer = GUCEF_NEW CORE::CTimer( config.pulseGenerator, config.reconnectDelayInMs ); + m_redisReconnectTimer = GUCEF_NEW CORE::CTimer( m_config.pulseGenerator, config.reconnectDelayInMs ); } if ( m_config.desiredFeatures.supportsGlobPatternTopicNames ) { // @TODO: interval - m_streamIndexingTimer = GUCEF_NEW CORE::CTimer( config.pulseGenerator, 100000 ); + m_streamIndexingTimer = GUCEF_NEW CORE::CTimer( m_config.pulseGenerator, 100000 ); } m_config.metricsPrefix += "redis."; @@ -141,7 +144,10 @@ CRedisClusterPubSubClient::~CRedisClusterPubSubClient() MT::CScopeMutex lock( m_lock ); if ( !m_threadPool.IsNULL() ) + { m_threadPool->RequestAllThreadsToStop( true, false ); + CORE::CCoreGlobal::Instance()->GetTaskManager().UnregisterThreadPool( m_threadPool ); + } GUCEF_DELETE m_metricsTimer; m_metricsTimer = GUCEF_NULL; @@ -1047,6 +1053,9 @@ CRedisClusterPubSubClient::Connect( bool reset ) sw::redis::ConnectionOptions rppConnectionOptions; rppConnectionOptions.host = m_config.remoteAddresses.front().GetHostname(); // Required. rppConnectionOptions.port = m_config.remoteAddresses.front().GetPortInHostByteOrder(); // Optional. The default port is 6379. + if ( 0 == rppConnectionOptions.port ) + rppConnectionOptions.port = 6379; + //rppConnectionOptions.password = "auth"; // Optional. No password by default. //rppConnectionOptions.db = 1; // Optional. Use the 0th database by default. diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2kafka.json b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2kafka.json index 4ae97bc35..247acd81a 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2kafka.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2kafka.json @@ -202,11 +202,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "msmq-source", - "toSide": "kafka-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "", + "fromSideId": "msmq-source", + "toSideId": "kafka-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "", "toSideTopicsAutoMatchFromSide": true, "failoverSideTopicsAutoMatchFromSide": true, "spilloverSideTopicsAutoMatchFromSide": true, diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2storage.json b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2storage.json index b7da5ee9b..43e782aab 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2storage.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2storage.json @@ -224,11 +224,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "msmq-source", - "toSide": "storage-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "msmq-source", + "toSideId": "storage-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2web.json b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2web.json index ebb006fdf..e5c20b828 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/msmq2web.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/msmq2web.json @@ -166,11 +166,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "msmq-source", - "toSide": "web-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "msmq-source", + "toSideId": "web-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/storage2kafka.json b/tools/pubsub2pubsub/config/examples/channel_templates/storage2kafka.json index e7152d746..b3dc370c8 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/storage2kafka.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/storage2kafka.json @@ -225,11 +225,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "storage-source", - "toSide": "kafka-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "storage-source", + "toSideId": "kafka-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/udp2kafka.json b/tools/pubsub2pubsub/config/examples/channel_templates/udp2kafka.json index b1c5cc504..0622533c3 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/udp2kafka.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/udp2kafka.json @@ -201,11 +201,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "udp-source", - "toSide": "kafka-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "udp-source", + "toSideId": "kafka-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json b/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json index 0907b8775..45f122902 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json @@ -189,11 +189,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "redis-source", - "toSide": "storage-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "redis-source", + "toSideId": "storage-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/udp2storage.json b/tools/pubsub2pubsub/config/examples/channel_templates/udp2storage.json index 521272473..b73847333 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/udp2storage.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/udp2storage.json @@ -232,11 +232,11 @@ "minSpilloverSideGoodHealthDurationBeforeActivationInMs": 5000, "routeSwitchingTimerIntervalInMs": 1000, "routes": [{ - "fromSide": "udp-source", - "toSide": "storage-sink", - "failoverSide": "", - "spilloverBufferSide": "", - "deadLetterSide": "" + "fromSideId": "udp-source", + "toSideId": "storage-sink", + "failoverSideId": "", + "spilloverBufferSideId": "", + "deadLetterSideId": "" }] } } \ No newline at end of file