Skip to content

Commit

Permalink
pubsub sqs backend: some tweaks after initial test, still a wip
Browse files Browse the repository at this point in the history
  • Loading branch information
LiberatorUSA committed Jun 9, 2024
1 parent 83d8c28 commit 5935f39
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P

virtual bool AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg ) GUCEF_VIRTUAL_OVERRIDE;
virtual bool AcknowledgeReceipt( const PUBSUB::CPubSubBookmark& bookmark ) GUCEF_VIRTUAL_OVERRIDE;
virtual bool AcknowledgeReceipt( const CORE::CVariant& receipt );

virtual bool SaveConfig( PUBSUB::CPubSubClientTopicConfig& config ) const GUCEF_VIRTUAL_OVERRIDE;

Expand Down Expand Up @@ -188,6 +189,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P
CAwsSqsPubSubClientTopicConfig m_config;
MT::CMutex m_lock;
Aws::String m_queueUrl;
CORE::UInt32 m_sqsMaximumMessageSize;
PUBSUB::CIPubSubMsg::TIPubSubMsgConstRawPtrVector m_publishBulkMsgRemapStorage;
CORE::UInt64 m_currentPublishActionId;
CORE::UInt64 m_currentReceiveActionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ CAwsSqsPubSubClient::GetSupportedFeatures( PUBSUB::CPubSubClientFeatures& featur
features.supportsKeyValueSetPerMsg = true; // Supported in SQS using message attributes. "Each message can have up to 10 attributes. Message attributes are optional and separate from the message body (however, they are sent alongside it)"
features.supportsMetaDataKeyValueSetPerMsg = true; // "Whereas you can use message attributes to attach custom metadata to Amazon SQS messages for your applications, you can use message system attributes to store metadata for other AWS services, such as AWS X-Ray"
features.supportsDuplicateKeysPerMsg = false; // Since attributes are a map of keys to a value it mandates that every key is unique
features.supportsMultiHostSharding = true; // SQS is a managed service which under the coveres is shareded across hardware/nodes
features.supportsMultiHostSharding = true; // SQS is a managed service which under the coveres is shared across hardware/nodes
features.supportsPublishing = true; // SQS supports sending messages to the queue
features.supportsSubscribing = true; // SQS supports reading messages from the queue
features.supportsMetrics = true; // We add our own metrics support in this plugin for SQS specific stats
features.supportsAutoReconnect = true; // Our plugin adds auto reconnect out of the box
features.supportsSubscriberMsgReceivedAck = false; // Since SQS is a queue where you consume the messages: this does not apply
features.supportsAutoReconnect = true; // Our plugin adds auto 'reconnect' out of the box by virtue of the AWS SDK
features.supportsSubscriberMsgReceivedAck = true; // SQS requires explit messages deletion to signal you are done with the message, this is the ack
features.supportsAutoMsgReceivedAck = false; // Since SQS is a queue where you consume the messages: grabbing the message is in a way the ack but this does not really apply
features.supportsAbsentMsgReceivedAck = true; // Since SQS is a queue where you consume the messages: this does not apply and hence can be absent
features.supportsAbsentMsgReceivedAck = false; // SQS requires explit messages deletion to signal you are done with the message, without that the message will become 'visible' again and retransmitted
features.supportsAckUsingLastMsgInBatch = false; // Since SQS is a queue where you consume the messages: this does not apply
features.supportsAckUsingBookmark = false; // Since SQS is a queue where you consume the messages: this does not apply
features.supportsBookmarkingConcept = true; // Since SQS is a queue where you consume the messages: Your offset is remembered simply due to the nature of a queue
Expand Down Expand Up @@ -881,9 +881,9 @@ CAwsSqsPubSubClient::Connect( bool reset )
if ( !Disconnect() )
return false;


// @TODO
return false;
// The SQS client works on a per action basis as request-response
// there is currently no persistent connect aside from a long poll for subscription use cases
return true;
}

/*-------------------------------------------------------------------------*/
Expand All @@ -892,7 +892,7 @@ bool
CAwsSqsPubSubClient::IsConnected( void ) const
{GUCEF_TRACE;

return false;
return IsInitialized();
}

/*-------------------------------------------------------------------------*/
Expand All @@ -910,6 +910,14 @@ bool
CAwsSqsPubSubClient::IsInitialized( void ) const
{GUCEF_TRACE;

if ( PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsIncludedInGlobalBootstrapConfigLoad() )
{
if ( PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsGlobalBootstrapConfigLoadInProgress() ||
PLUGINGLUE::AWSSDK::CAwsSdkGlobal::Instance()->IsGlobalConfigLoadInProgress() )
{
return false;
}
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ CAwsSqsPubSubClientTopic::CAwsSqsPubSubClientTopic( CAwsSqsPubSubClient* client
, m_config()
, m_lock()
, m_queueUrl()
, m_sqsMaximumMessageSize( SQSCLIENT_MAX_PAYLOAD_SIZE )
, m_publishBulkMsgRemapStorage()
, m_currentPublishActionId( 1 )
, m_currentReceiveActionId( 1 )
Expand Down Expand Up @@ -308,20 +309,79 @@ CAwsSqsPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds, con
/*-------------------------------------------------------------------------*/

bool
CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg )
CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const CORE::CVariant& receipt )
{GUCEF_TRACE;

// Does not apply to SQS
MT::CScopeMutex lock( m_lock );

if ( GUCEF_NULL == m_client )
return false;

bool totalSuccess = true;
try
{
if ( receipt.IsString() && !receipt.IsNULLOrEmpty() )
{
const char* receiptHandle = receipt.AsCharPtr();
if ( GUCEF_NULL != receiptHandle )
{
Aws::SQS::Model::DeleteMessageRequest delete_req;
delete_req.SetQueueUrl( m_queueUrl );
delete_req.SetReceiptHandle( receiptHandle );

Aws::SQS::Model::DeleteMessageOutcome delete_out = m_client->GetSqsClient().DeleteMessage( delete_req );
if ( delete_out.IsSuccess() )
{
GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: Successfully deleted message with receipt handle " + CORE::ToString( receiptHandle ) );
return true;
}
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided variant does not have valid content" );
}
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided variant is not a string" );
}
}
catch ( const std::exception& e )
{
GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, CORE::CString( "AwsSqsPubSubClientTopic:AcknowledgeReceipt: experienced an exception: " ) + e.what() );
}
catch ( ... )
{
GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_CRITICAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: experienced an unknown exception, your application may be unstable" );
}
return false;
}

/*-------------------------------------------------------------------------*/

bool
CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg )
{GUCEF_TRACE;

const CORE::CVariant& msgIndex = msg.GetMsgIndex();
return AcknowledgeReceipt( msgIndex );
}

/*-------------------------------------------------------------------------*/

bool
CAwsSqsPubSubClientTopic::AcknowledgeReceipt( const PUBSUB::CPubSubBookmark& bookmark )
{GUCEF_TRACE;

// Does not apply to SQS
if ( PUBSUB::CPubSubBookmark::BOOKMARK_TYPE_MSG_INDEX == bookmark.GetBookmarkType() )
{
const CORE::CVariant& msgIndex = bookmark.GetBookmarkData();
return AcknowledgeReceipt( msgIndex );
}
else
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:AcknowledgeReceipt: provided bookmark type is invalid" );
}
return false;
}

Expand Down Expand Up @@ -477,7 +537,7 @@ CAwsSqsPubSubClientTopic::TranslateToSqsMsg( T& sqsMsg, const PUBSUB::CIPubSubMs
// Binary is Base64 encoded. For SQS strings are Unicode with UTF-8 binary encoding
CORE::CUtf8String bodyPayloadStr = bodyPayload.AsUtf8String();

if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= SQSCLIENT_MAX_PAYLOAD_SIZE )
if ( bodyPayloadStr.ByteSize() >= 1 && bodyPayloadStr.ByteSize() <= m_sqsMaximumMessageSize )
{
sqsMsg.SetMessageBody( bodyPayloadStr );
}
Expand Down Expand Up @@ -812,6 +872,12 @@ CAwsSqsPubSubClientTopic::InitializeConnectivity( bool reset )
{
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "AwsSqsPubSubClientTopic:InitializeConnectivity: Retrieved attributes for queue \"" + CORE::ToString( m_queueUrl ) +
"\" which are as follows: " + CORE::ToString( queueAttributes ) );

CORE::CString::StringMap::iterator i = queueAttributes.find( "MaximumMessageSize" );
if ( i != queueAttributes.end() )
{
m_sqsMaximumMessageSize = CORE::StringToUInt32( (*i).second, m_sqsMaximumMessageSize );
}
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
"pubsubClientType": "STORAGE",
"reconnectDelayInMs": 0,
"CustomConfig": {},
"journalConfig": {
"useJournal": false,
"journalType": "binary",
"journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/{clientInstance}"
},
"Topics": [],
"defaultTopicConfig": {
"isOptional": false,
Expand All @@ -34,10 +39,10 @@
"maxTotalMsgsInFlight": -1,
"CustomConfig": {
"desiredNrOfBuffers": 3,
"desiredMinimalSerializedBlockSize": 2560000,
"desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 1000,
"vfsStorageRootPath": "InstallPath/storage-source",
"vfsFileExtention": "pubsubmsgs",
"desiredMinimalSerializedBlockSize": 52428800,
"desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 30000,
"vfsStorageRootPath": "DataPath/storage/{topicName}",
"vfsFileExtention": "pubsubcapture",
"encodeCodecFamily": "",
"encodeCodecName": "",
"decodeCodecFamily": "",
Expand All @@ -60,8 +65,34 @@
"treatEveryFullfilledRequestAsEODEvent": false,
"deleteContainersWithFullyAckdContent": false,
"moveContainersWithFullyAckdContent": true,
"vfsStorageRootPathForFullyAckdContainers": "InstallPath/storage-source/processed",
"maxCompletedContainerRefsToRetain": 50
"vfsStorageRootPathForFullyAckdContainers": "DataPath/storage-source/ackd",
"maxCompletedContainerRefsToRetain": 50,
"binarySerializerOptions": {
"msgDateTimeIncluded": true,
"msgDateTimeAsMsSinceUnixEpochInUtc": true,
"msgIdIncluded": true,
"msgIndexIncluded": true,
"msgPrimaryPayloadIncluded": true,
"msgKeyValuePairsIncluded": true,
"msgMetaDataKeyValuePairsIncluded": true,
"topicNameIncluded": false
},
"serializerOptions": {
"msgDateTimeIncluded": true,
"msgDateTimeAsMsSinceUnixEpochInUtc": true,
"msgIdIncluded": true,
"msgIndexIncluded": true,
"msgPrimaryPayloadIncluded": true,
"msgPrimaryPayloadTypeIncluded": false,
"msgKeyValuePairsIncluded": true,
"msgMetaDataKeyValuePairsIncluded": false,
"includeUndefinedValues": false,
"receiveActionIdIncluded": true,
"originTopicObjPointerIncluded": false,
"originTopicNameIncluded": false,
"originClientTypeNameIncluded": false,
"originClientObjPointerIncluded": false
}
}
},
"DesiredFeatures": {
Expand Down Expand Up @@ -96,6 +127,7 @@
"supportsDerivingBookmarkFromMsg": true,
"supportsDiscoveryOfAvailableTopics": true,
"supportsGlobPatternTopicNames": true,
"supportsPatternBasedAggregateTopic": false,
"supportsMetrics": true
}
},
Expand All @@ -108,7 +140,7 @@
"autoPersistMsgInterval": 1000,
"maxNrOfBookmarksToKeep": 10,
"CustomConfig": {
"vfsRootPath": "InstallPath/bookmarks",
"vfsRootPath": "DataPath/bookmarks",
"persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}"
}
}
Expand All @@ -130,13 +162,23 @@
"reconnectDelayInMs": 500,
"RemoteAddresses": [],
"CustomConfig": {},
"journalConfig": {
"useJournal": false,
"journalType": "binary",
"journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/{clientInstance}"
},
"Topics": [],
"defaultTopicConfig": {
"isOptional": false,
"needPublishSupport": true,
"needSubscribeSupport": false,
"preferDedicatedConnection": false,
"topicName": "pubsub2pubsub-storage-load-test-queue",
"journalConfig": {
"useJournal": true,
"journalType": "binary",
"journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/topics/{clientInstance}/topicName/{topicName}"
},
"CustomConfig": {
"topicNameIsQueueName": true,
"tryToUseSendMessageBatch": true,
Expand Down Expand Up @@ -178,7 +220,7 @@
"autoPersistMsgInterval": 1000,
"maxNrOfBookmarksToKeep": 10,
"CustomConfig": {
"vfsRootPath": "InstallPath/bookmarks",
"vfsRootPath": "DataPath/bookmarks",
"persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}"
}
}
Expand Down

0 comments on commit 5935f39

Please sign in to comment.