Skip to content

Commit

Permalink
- core: swapped base64 encoding code due to issues with certain payloads
Browse files Browse the repository at this point in the history
- pubsub: changed msg ref into pointer versus the linked clonable since its already a Cloneable
- pubsub SQS backend: Publish is now working with single message and batch publish implemented
  • Loading branch information
LiberatorUSA committed Jun 30, 2024
1 parent 5935f39 commit aff58b1
Show file tree
Hide file tree
Showing 14 changed files with 434 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class CTMailboxForSharedCloneables : public MT::CMailboxForCloneables
{
public:

typedef CloneableType TCloneableType;
typedef PtrLockType TSharedPtrLockType;
typedef CTSharedPtr< CloneableType, PtrLockType > TMailSPtr;
typedef std::vector< TMailSPtr, gucef_allocator< TMailSPtr > > TMailSPtrList;

Expand Down Expand Up @@ -161,10 +163,13 @@ CTMailboxForSharedCloneables< CloneableType, PtrLockType >::GetSPtrBulkMail( TMa

MT::CObjectScopeLock lock( this );

if ( m_mailQueue.empty() )
return false; // nothing to read, early out

// We know how many mail items we will read so lets not perform unnessesary reallocs of the vector's underlying memory
if ( maxMailItems > 0 )
{
size_t itemsToRead = SMALLEST( m_mailQueue.size(), (size_t) maxMailItems );
size_t itemsToRead = GUCEF_SMALLEST( m_mailQueue.size(), (size_t) maxMailItems );
mailList.reserve( itemsToRead );
}
else
Expand All @@ -179,7 +184,9 @@ CTMailboxForSharedCloneables< CloneableType, PtrLockType >::GetSPtrBulkMail( TMa
{
#ifdef GUCEF_DEBUG_MODE

TMailSPtr objPtr( static_cast< CloneableType* >( m_mailQueue.front() ) );
CICloneable* clonable = m_mailQueue.front();
CloneableType* derivedClonable = static_cast< CloneableType* >( clonable );
TMailSPtr objPtr( derivedClonable );
mailList.push_back( objPtr );
GUCEF_ASSERT( objPtr.GetPointerAlways() == static_cast< CloneableType* >( m_mailQueue.front() ) );
m_mailQueue.pop_front();
Expand Down
59 changes: 45 additions & 14 deletions platform/gucefCORE/src/dvcppstringutils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,28 +383,59 @@ Base64Encode( const void* byteBuffer, UInt32 bufferSize )
return CString::Empty;

memset( str, '=', base64StrLength );
char* p = (char*) byteBuffer;
UInt32 j = 0, pad = bufferSize % 3;
const UInt32 last = bufferSize - pad;
const char* charByteBuffer = (const char*) byteBuffer;

int i = 0;
int j = 0;
unsigned char char_array_3[3];
unsigned char char_array_4[4];

for ( size_t i = 0; i < last; i+=3 )
for ( size_t pos = 0; pos < bufferSize; ++pos )
{
int n = int( p[ i ] ) << 16 | int( p[ i + 1] ) << 8 | p[ i + 2 ];
str[ j++ ] = base64_chars[ n >> 18 ];
str[ j++ ] = base64_chars[ n >> 12 & 0x3F ];
str[ j++ ] = base64_chars[ n >> 6 & 0x3F ];
str[ j++ ] = base64_chars[ n & 0x3F ];
char_array_3[ i++ ] = charByteBuffer[ pos ];
if ( i == 3 )
{
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;

for ( i=0; i<4; ++i )
{
*str = base64_chars[ char_array_4[ i ] ];
++str;
}
i=0;
}
}

if ( pad > 0 )
if ( i > 0 )
{
int n = --pad ? int( p[ last ] ) << 8 | p[ last + 1 ] : p[ last ];
str[ j++ ] = base64_chars[ pad ? n >> 10 & 0x3F : n >> 2 ];
str[ j++ ] = base64_chars[ pad ? n >> 4 & 0x03F : n << 4 & 0x3F ];
str[ j++ ] = pad ? base64_chars[ n << 2 & 0x3F ] : '=';
for(j = i; j < 3; j++)
char_array_3[j] = '\0';

char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;

for (j = 0; (j < i + 1); j++)
{
*str = base64_chars[ char_array_4[ j ] ];
++str;
}

while( (i++ < 3) )
{
*str = '=';
++str;
}

}

str[ base64StrLength ] = '\0';
result.DetermineLength();

return result;
}

Expand Down
16 changes: 14 additions & 2 deletions platform/gucefMT/include/gucefMT_CMailBoxForCloneables.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ bool
CMailboxForCloneables::AddBulkMail( const TContainer& mailList, UInt32 itemsPerLockCycle )
{GUCEF_TRACE;

bool totalSuccess = true;

typename TContainer::const_iterator i = mailList.begin();
while ( i != mailList.end() )
{
Expand All @@ -343,12 +345,22 @@ CMailboxForCloneables::AddBulkMail( const TContainer& mailList, UInt32 itemsPerL
const CICloneable* origMail = AsCloneablePtr< const typename TContainer::const_iterator::value_type >( (*i) );
if ( GUCEF_NULL != origMail )
{
m_mailQueue.push_back( origMail->Clone() );
CICloneable* clonedMail = origMail->Clone();
if ( GUCEF_NULL != clonedMail )
{
m_mailQueue.push_back( clonedMail );
}
else
{
// Cloning failed
totalSuccess = false;
}
}
++i; ++n;
}
}
return true;

return totalSuccess;
}

/*-------------------------------------------------------------------------//
Expand Down
33 changes: 5 additions & 28 deletions platform/gucefMT/include/gucefMT_basicMacros.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@
/*
* Macros to get the smallest or largest of 2 values.
*/
#define SMALLEST(a,b) (((a) < (b)) ? (a) : (b))
#define LARGEST(a,b) (((a) > (b)) ? (a) : (b))
#define GUCEF_SMALLEST(a,b) (((a) < (b)) ? (a) : (b))
#define GUCEF_LARGEST(a,b) (((a) > (b)) ? (a) : (b))

#define SMALLEST GUCEF_SMALLEST
#define LARGEST GUCEF_LARGEST

/*-------------------------------------------------------------------------*/

Expand Down Expand Up @@ -118,29 +121,3 @@
//#define INT16INTOCHAR

#endif /* GUCEF_MT_BASICMACROS_H */

/*-------------------------------------------------------------------------//
// //
// Info & Changes //
// //
//-------------------------------------------------------------------------//
- 24-09-2006 :
- Commented out SDL ... we dont use this anymore? - Logan
- 24-03-2004 :
- Removed all build specific macros. These are macros that relyed on
specific build switches to define there meaning. This required the
include of a config include here which causes problems when exporting
from one module and importing from another since they would both use
the same switch. Thus only generic macros are allowed here from now
on.
- 05-09-2003 :
- Added the LARGEST and SMALLEST macros
- 13-08-2003 :
- Added this section.
- Added MAX_DIR_LENGTH and MAX_FILENAME_LENGTH
---------------------------------------------------------------------------*/



Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class GUCEF_PUBSUB_EXPORT_CPP CPubSubClientTopic : public CORE::CTSGNotifier
typedef CIPubSubMsg::TIPubSubMsgConstRawPtrVector TIPubSubMsgConstRawPtrVector;
typedef CIPubSubMsg::TIPubSubMsgRawPtrVector TIPubSubMsgRawPtrVector;
typedef CIPubSubMsg::TIPubSubMsgSPtrVector TIPubSubMsgSPtrVector;
typedef CORE::CTLinkedCloneable< CIPubSubMsg > TPubSubMsgRef;
typedef CIPubSubMsg* TPubSubMsgRef;
typedef CORE::CTCloneableExpansion< std::vector< TPubSubMsgRef, gucef_allocator< TPubSubMsgRef > > > TPubSubMsgsRefVector;
typedef UInt64 TPublishActionId;
typedef CORE::CTCloneableExpansion< std::vector< TPublishActionId, gucef_allocator< TPublishActionId > > > TPublishActionIdVector;
Expand Down
5 changes: 3 additions & 2 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,7 @@ CPubSubClientSide::TopicLink::OnCheckForTimedOutInFlightMessagesTimerCycle( CORE
while ( n != inFlightDiscardList.end() )
{
TopicLink::MsgTrackingEntry& trackingEntry = inFlightMsgs[ (*n) ];
CPubSubClientTopic::TPubSubMsgRef msgRef;
msgRef.LinkTo( trackingEntry.msg.GetPointerAlways() );
CPubSubClientTopic::TPubSubMsgRef msgRef = trackingEntry.msg.GetPointerAlways();
discardedMsgs.push_back( msgRef );
++n;
}
Expand Down Expand Up @@ -1908,6 +1907,8 @@ CPubSubClientSide::TopicLink::PublishMailboxMsgs( void )
bool timeoutOccured = false;
try
{

CPubSubClientTopic::TIPubSubMsgSPtrVector::iterator i = msgs.begin();
publishSuccess = PublishMsgsSync< CPubSubClientTopic::TIPubSubMsgSPtrVector >( msgs );
totalSuccess = publishSuccess && totalSuccess;
}
Expand Down
9 changes: 7 additions & 2 deletions platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientTopic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,13 @@ CPubSubClientTopic::Publish( TPublishActionIdVector& publishActionIds, const TIP
publishActionId = publishActionIds[ n ];
}

if ( !(*i).IsNULL() )
totalSuccess = totalSuccess && Publish( publishActionId, *(*i).GetPointerAlways(), notify );
const CIPubSubMsg::TNoLockSharedPtr& msg = (*i);

if ( !msg.IsNULL() )
{
const CIPubSubMsg& msgRef = (*msg.GetPointerAlways());
totalSuccess = totalSuccess && Publish( publishActionId, msgRef, notify );
}
else
totalSuccess = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,15 @@ CPubSubMsgContainerBinarySerializer::Deserialize( CPubSubClientTopic::TPubSubMsg
while ( i != index.end() )
{
CORE::UInt32 offsetOfMsg = (*i);
CIPubSubMsg& msg = (*m).GetData();

if ( !CPubSubMsgBinarySerializer::Deserialize( options, linkWherePossible, msg, offsetOfMsg, source, bytesRead ) )
CIPubSubMsg* msg = (*m);
if ( GUCEF_NULL != msg )
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubMsgContainerBinarySerializer:DeserializeMsgAtIndex: Failed to deserialize msg at index " + CORE::ToString( msgIndex ) + " and offset " + CORE::ToString( offsetOfMsg ) );
isCorrupted = true;
return false;
if ( !CPubSubMsgBinarySerializer::Deserialize( options, linkWherePossible, *msg, offsetOfMsg, source, bytesRead ) )
{
GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubMsgContainerBinarySerializer:DeserializeMsgAtIndex: Failed to deserialize msg at index " + CORE::ToString( msgIndex ) + " and offset " + CORE::ToString( offsetOfMsg ) );
isCorrupted = true;
return false;
}
}
++i; ++m; ++msgIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
#define PUBSUBPLUGIN_AWSSQS_CAWSSQSPUBSUBCLIENTTOPICCONFIG_H
#endif /* PUBSUBPLUGIN_AWSSQS_CAWSSQSPUBSUBCLIENTTOPICCONFIG_H ? */

#include <aws/sqs/model/GetQueueUrlRequest.h>
#include <aws/sqs/model/GetQueueUrlResult.h>
#include <aws/sqs/model/SendMessageRequest.h>
#include <aws/sqs/model/SendMessageResult.h>
#include <aws/sqs/model/SendMessageBatchRequest.h>
#include <aws/sqs/model/SendMessageBatchResult.h>
#include <aws/sqs/model/ReceiveMessageRequest.h>
#include <aws/sqs/model/ReceiveMessageResult.h>
#include <aws/sqs/model/DeleteMessageRequest.h>

/*-------------------------------------------------------------------------//
// //
// NAMESPACE //
Expand Down Expand Up @@ -101,6 +111,7 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P

virtual bool Publish( TPublishActionIdVector& publishActionIds, const PUBSUB::CBasicPubSubMsg::TBasicPubSubMsgVector& msgs, bool notify ) GUCEF_VIRTUAL_OVERRIDE;
virtual bool Publish( TPublishActionIdVector& publishActionIds, const PUBSUB::CIPubSubMsg::TIPubSubMsgConstRawPtrVector& msgs, bool notify ) GUCEF_VIRTUAL_OVERRIDE;
virtual bool Publish( TPublishActionIdVector& publishActionIds, const PUBSUB::CIPubSubMsg::TIPubSubMsgSPtrVector& msgs, bool notify ) GUCEF_VIRTUAL_OVERRIDE;
virtual bool Publish( CORE::UInt64& publishActionId, const PUBSUB::CIPubSubMsg& msg, bool notify ) GUCEF_VIRTUAL_OVERRIDE;

virtual bool AcknowledgeReceipt( const PUBSUB::CIPubSubMsg& msg ) GUCEF_VIRTUAL_OVERRIDE;
Expand Down Expand Up @@ -161,13 +172,27 @@ class PUBSUBPLUGIN_AWSSQS_PLUGIN_PRIVATE_CPP CAwsSqsPubSubClientTopic : public P
AddAttributesToSqsMsg( T& sqsMsg ,
const PUBSUB::CIPubSubMsg::TKeyValuePairs& kvPairs ,
bool addPrefix ,
const CORE::CAsciiString& prefixToAdd );
const CORE::CAsciiString& prefixToAdd ,
CORE::UInt32& msgByteSize );

template< class T >
bool
TranslateToSqsMsg( T& sqsMsg ,
const PUBSUB::CIPubSubMsg* msg ,
CORE::UInt32& msgByteSize );
TranslateToSqsMsgOfType( T& sqsMsg ,
const PUBSUB::CIPubSubMsg* msg ,
CORE::UInt32& msgByteSize );


bool
TranslateToSqsMsg( Aws::SQS::Model::SendMessageRequest& sqsMsg ,
const PUBSUB::CIPubSubMsg* msg ,
CORE::UInt32& approxMsgByteSize );

bool
TranslateToSqsBatchMsg( Aws::SQS::Model::SendMessageBatchRequestEntry& sqsMsg ,
const PUBSUB::CIPubSubMsg* msg ,
CORE::UInt64 publishActionId ,
CORE::UInt32& approxMsgByteSize );


bool ApplySqsMessageAttributeNameContraints( CORE::CAsciiString& candidateName );

Expand Down
Loading

0 comments on commit aff58b1

Please sign in to comment.