From 5935f39fc33ddf02a9c443314ae54c46ff70fd1d Mon Sep 17 00:00:00 2001 From: Dinand Vanvelzen Date: Sun, 9 Jun 2024 13:11:53 -0500 Subject: [PATCH] pubsub sqs backend: some tweaks after initial test, still a wip --- ...subpluginAWSSQS_CAwsSqsPubSubClientTopic.h | 2 + ...pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp | 24 ++++-- ...bpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp | 74 ++++++++++++++++++- .../channel_templates/storage2sqs.json | 58 +++++++++++++-- 4 files changed, 138 insertions(+), 20 deletions(-) diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h index dce557e55..30c005e5f 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/include/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.h @@ -105,6 +105,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P virtual bool AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg ) GUCEF_VIRTUAL_OVERRIDE; virtual bool AcknowledgeReceipt( const PUBSUB::CPubSubBookmark& bookmark ) GUCEF_VIRTUAL_OVERRIDE; + virtual bool AcknowledgeReceipt( const CORE::CVariant& receipt ); virtual bool SaveConfig( PUBSUB::CPubSubClientTopicConfig& config ) const GUCEF_VIRTUAL_OVERRIDE; @@ -188,6 +189,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P CAwsSqsPubSubClientTopicConfig m_config; MT::CMutex m_lock; Aws::String m_queueUrl; + CORE::UInt32 m_sqsMaximumMessageSize; PUBSUB::CIPubSubMsg::TIPubSubMsgConstRawPtrVector m_publishBulkMsgRemapStorage; CORE::UInt64 m_currentPublishActionId; CORE::UInt64 m_currentReceiveActionId; diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp index cef2e5c40..c07f9b245 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClient.cpp @@ -205,14 +205,14 @@ CAwsSqsPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& featur features.supportsKeyValueSetPerMsg = true; // Supported in SQS using message attributes. "Each message can have up to 10 attributes. Message attributes are optional and separate from the message body (however, they are sent alongside it)" features.supportsMetaDataKeyValueSetPerMsg = true; // "Whereas you can use message attributes to attach custom metadata to Amazon SQS messages for your applications, you can use message system attributes to store metadata for other AWS services, such as AWS X-Ray" features.supportsDuplicateKeysPerMsg = false; // Since attributes are a map of keys to a value it mandates that every key is unique - features.supportsMultiHostSharding = true; // SQS is a managed service which under the coveres is shareded across hardware/nodes + features.supportsMultiHostSharding = true; // SQS is a managed service which under the coveres is shared across hardware/nodes features.supportsPublishing = true; // SQS supports sending messages to the queue features.supportsSubscribing = true; // SQS supports reading messages from the queue features.supportsMetrics = true; // We add our own metrics support in this plugin for SQS specific stats - features.supportsAutoReconnect = true; // Our plugin adds auto reconnect out of the box - features.supportsSubscriberMsgReceivedAck = false; // Since SQS is a queue where you consume the messages: this does not apply + features.supportsAutoReconnect = true; // Our plugin adds auto 'reconnect' out of the box by virtue of the AWS SDK + features.supportsSubscriberMsgReceivedAck = true; // SQS requires explit messages deletion to signal you are done with the message, this is the ack features.supportsAutoMsgReceivedAck = false; // Since SQS is a queue where you consume the messages: grabbing the message is in a way the ack but this does not really apply - features.supportsAbsentMsgReceivedAck = true; // Since SQS is a queue where you consume the messages: this does not apply and hence can be absent + features.supportsAbsentMsgReceivedAck = false; // SQS requires explit messages deletion to signal you are done with the message, without that the message will become 'visible' again and retransmitted features.supportsAckUsingLastMsgInBatch = false; // Since SQS is a queue where you consume the messages: this does not apply features.supportsAckUsingBookmark = false; // Since SQS is a queue where you consume the messages: this does not apply features.supportsBookmarkingConcept = true; // Since SQS is a queue where you consume the messages: Your offset is remembered simply due to the nature of a queue @@ -881,9 +881,9 @@ CAwsSqsPubSubClient::Connect( bool reset ) if ( !Disconnect() ) return false; - - // @TODO - return false; + // The SQS client works on a per action basis as request-response + // there is currently no persistent connect aside from a long poll for subscription use cases + return true; } /*-------------------------------------------------------------------------*/ @@ -892,7 +892,7 @@ bool CAwsSqsPubSubClient::IsConnected( void ) const {GUCEF_TRACE; - return false; + return IsInitialized(); } /*-------------------------------------------------------------------------*/ @@ -910,6 +910,14 @@ bool CAwsSqsPubSubClient::IsInitialized( void ) const {GUCEF_TRACE; + if ( PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsIncludedInGlobalBootstrapConfigLoad() ) + { + if ( PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsGlobalBootstrapConfigLoadInProgress() || + PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsGlobalConfigLoadInProgress() ) + { + return false; + } + } return true; } diff --git a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp index 1795d5aa8..62c326b06 100644 --- a/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginAWSSQS/src/pubsubpluginAWSSQS_CAwsSqsPubSubClientTopic.cpp @@ -102,6 +102,7 @@ CAwsSqsPubSubClientTopic::CAwsSqsPubSubClientTopic( CAwsSqsPubSubClient* client , m_config() , m_lock() , m_queueUrl() + , m_sqsMaximumMessageSize( SQSCLIENT_MAX_PAYLOAD_SIZE ) , m_publishBulkMsgRemapStorage() , m_currentPublishActionId( 1 ) , m_currentReceiveActionId( 1 ) @@ -308,20 +309,79 @@ CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds, con /*-------------------------------------------------------------------------*/ bool -CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg ) +CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const CORE::CVariant& receipt ) {GUCEF_TRACE; - // Does not apply to SQS + MT::CScopeMutex lock( m_lock ); + + if ( GUCEF_NULL == m_client ) + return false; + + bool totalSuccess = true; + try + { + if ( receipt.IsString() && !receipt.IsNULLOrEmpty() ) + { + const char* receiptHandle = receipt.AsCharPtr(); + if ( GUCEF_NULL != receiptHandle ) + { + Aws::SQS::Model::DeleteMessageRequest delete_req; + delete_req.SetQueueUrl( m_queueUrl ); + delete_req.SetReceiptHandle( receiptHandle ); + + Aws::SQS::Model::DeleteMessageOutcome delete_out = m_client->GetSqsClient().DeleteMessage( delete_req ); + if ( delete_out.IsSuccess() ) + { + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: Successfully deleted message with receipt handle " + CORE::ToString( receiptHandle ) ); + return true; + } + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided variant does not have valid content" ); + } + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided variant is not a string" ); + } + } + catch ( const std::exception& e ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, CORE::CString( "AwsSqsPubSubClientTopic:AcknowledgeReceipt: experienced an exception: " ) + e.what() ); + } + catch ( ... ) + { + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: experienced an unknown exception, your application may be unstable" ); + } return false; } /*-------------------------------------------------------------------------*/ +bool +CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg ) +{GUCEF_TRACE; + + const CORE::CVariant& msgIndex = msg.GetMsgIndex(); + return AcknowledgeReceipt( msgIndex ); +} + +/*-------------------------------------------------------------------------*/ + bool CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CPubSubBookmark& bookmark ) {GUCEF_TRACE; - // Does not apply to SQS + if ( PUBSUB::CPubSubBookmark::BOOKMARK_TYPE_MSG_INDEX == bookmark.GetBookmarkType() ) + { + const CORE::CVariant& msgIndex = bookmark.GetBookmarkData(); + return AcknowledgeReceipt( msgIndex ); + } + else + { + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided bookmark type is invalid" ); + } return false; } @@ -477,7 +537,7 @@ CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMs // Binary is Base64 encoded. For SQS strings are Unicode with UTF-8 binary encoding CORE::CUtf8String bodyPayloadStr = bodyPayload.AsUtf8String(); - if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= SQSCLIENT_MAX_PAYLOAD_SIZE ) + if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= m_sqsMaximumMessageSize ) { sqsMsg.SetMessageBody( bodyPayloadStr ); } @@ -812,6 +872,12 @@ CAwsSqsPubSubClientTopic::InitializeConnectivity( bool reset ) { GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:InitializeConnectivity: Retrieved attributes for queue \"" + CORE::ToString( m_queueUrl ) + "\" which are as follows: " + CORE::ToString( queueAttributes ) ); + + CORE::CString::StringMap::iterator i = queueAttributes.find( "MaximumMessageSize" ); + if ( i != queueAttributes.end() ) + { + m_sqsMaximumMessageSize = CORE::StringToUInt32( (*i).second, m_sqsMaximumMessageSize ); + } return true; } } diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/storage2sqs.json b/tools/pubsub2pubsub/config/examples/channel_templates/storage2sqs.json index 724a366b3..43b957f29 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/storage2sqs.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/storage2sqs.json @@ -21,6 +21,11 @@ "pubsubClientType": "STORAGE", "reconnectDelayInMs": 0, "CustomConfig": {}, + "journalConfig": { + "useJournal": false, + "journalType": "binary", + "journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/{clientInstance}" + }, "Topics": [], "defaultTopicConfig": { "isOptional": false, @@ -34,10 +39,10 @@ "maxTotalMsgsInFlight": -1, "CustomConfig": { "desiredNrOfBuffers": 3, - "desiredMinimalSerializedBlockSize": 2560000, - "desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 1000, - "vfsStorageRootPath": "InstallPath/storage-source", - "vfsFileExtention": "pubsubmsgs", + "desiredMinimalSerializedBlockSize": 52428800, + "desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 30000, + "vfsStorageRootPath": "DataPath/storage/{topicName}", + "vfsFileExtention": "pubsubcapture", "encodeCodecFamily": "", "encodeCodecName": "", "decodeCodecFamily": "", @@ -60,8 +65,34 @@ "treatEveryFullfilledRequestAsEODEvent": false, "deleteContainersWithFullyAckdContent": false, "moveContainersWithFullyAckdContent": true, - "vfsStorageRootPathForFullyAckdContainers": "InstallPath/storage-source/processed", - "maxCompletedContainerRefsToRetain": 50 + "vfsStorageRootPathForFullyAckdContainers": "DataPath/storage-source/ackd", + "maxCompletedContainerRefsToRetain": 50, + "binarySerializerOptions": { + "msgDateTimeIncluded": true, + "msgDateTimeAsMsSinceUnixEpochInUtc": true, + "msgIdIncluded": true, + "msgIndexIncluded": true, + "msgPrimaryPayloadIncluded": true, + "msgKeyValuePairsIncluded": true, + "msgMetaDataKeyValuePairsIncluded": true, + "topicNameIncluded": false + }, + "serializerOptions": { + "msgDateTimeIncluded": true, + "msgDateTimeAsMsSinceUnixEpochInUtc": true, + "msgIdIncluded": true, + "msgIndexIncluded": true, + "msgPrimaryPayloadIncluded": true, + "msgPrimaryPayloadTypeIncluded": false, + "msgKeyValuePairsIncluded": true, + "msgMetaDataKeyValuePairsIncluded": false, + "includeUndefinedValues": false, + "receiveActionIdIncluded": true, + "originTopicObjPointerIncluded": false, + "originTopicNameIncluded": false, + "originClientTypeNameIncluded": false, + "originClientObjPointerIncluded": false + } } }, "DesiredFeatures": { @@ -96,6 +127,7 @@ "supportsDerivingBookmarkFromMsg": true, "supportsDiscoveryOfAvailableTopics": true, "supportsGlobPatternTopicNames": true, + "supportsPatternBasedAggregateTopic": false, "supportsMetrics": true } }, @@ -108,7 +140,7 @@ "autoPersistMsgInterval": 1000, "maxNrOfBookmarksToKeep": 10, "CustomConfig": { - "vfsRootPath": "InstallPath/bookmarks", + "vfsRootPath": "DataPath/bookmarks", "persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}" } } @@ -130,6 +162,11 @@ "reconnectDelayInMs": 500, "RemoteAddresses": [], "CustomConfig": {}, + "journalConfig": { + "useJournal": false, + "journalType": "binary", + "journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/{clientInstance}" + }, "Topics": [], "defaultTopicConfig": { "isOptional": false, @@ -137,6 +174,11 @@ "needSubscribeSupport": false, "preferDedicatedConnection": false, "topicName": "pubsub2pubsub-storage-load-test-queue", + "journalConfig": { + "useJournal": true, + "journalType": "binary", + "journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/topics/{clientInstance}/topicName/{topicName}" + }, "CustomConfig": { "topicNameIsQueueName": true, "tryToUseSendMessageBatch": true, @@ -178,7 +220,7 @@ "autoPersistMsgInterval": 1000, "maxNrOfBookmarksToKeep": 10, "CustomConfig": { - "vfsRootPath": "InstallPath/bookmarks", + "vfsRootPath": "DataPath/bookmarks", "persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}" } }