Skip to content

Commit

Permalink
- pubsub kafka backend: Added end of data detection and notification …
Browse files Browse the repository at this point in the history
…support

- pubsub kafka backend: Fixed message ack management to correctly sequence before doing a commit to Kafka since acks can occur out of order
  • Loading branch information
LiberatorUSA committed May 18, 2024
1 parent 3700d5d commit 4cd0a33
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClientTopic : public PUB

virtual bool IsSubscribingSupported( void ) const GUCEF_VIRTUAL_OVERRIDE;

virtual bool IsSubscriptionAtEndOfData( void ) const GUCEF_VIRTUAL_OVERRIDE;

virtual bool Subscribe( void ) GUCEF_VIRTUAL_OVERRIDE;

virtual bool SubscribeStartingAtTopicIndex( const CORE::CVariant& msgIndexBookmark );
Expand Down Expand Up @@ -287,6 +289,10 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClientTopic : public PUB

bool ProcessRdKafkaMessage( RdKafka::Message& message, CORE::UInt32 msgIndex, bool& isFiltered );

bool ProcessMsgAcks( void );

void CleanupMsgAcks( void );

private:

typedef CORE::CTEventHandlerFunctor< CKafkaPubSubClientTopic > TEventCallback;
Expand All @@ -296,7 +302,9 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClientTopic : public PUB
typedef std::vector< RdKafka::Message* > TRdKafkaMsgPtrVector;
typedef std::pair< CORE::CDynamicBuffer, CORE::CDynamicBuffer > TBufferPair;
typedef std::vector< TBufferPair > TBufferVector;
typedef std::set< CORE::Int64 > TInt64Set;
typedef std::map< CORE::Int32, CORE::Int64 > TInt32ToInt64Map;
typedef std::map< CORE::Int32, TInt64Set > TInt32ToInt64SetMap;
typedef std::vector< RdKafka::TopicPartition* > TRdKafkaTopicPartitionPtrVector;

CKafkaPubSubClient* m_client;
Expand All @@ -321,11 +329,13 @@ class PUBSUBPLUGIN_KAFKA_PLUGIN_PRIVATE_CPP CKafkaPubSubClientTopic : public PUB
CORE::UInt32 m_kafkaMessagesFiltered;
std::string m_producerHostname;
bool m_firstPartitionAssignment;
TInt32ToInt64Map m_consumerOffsets;
TInt32ToInt64Map m_consumerReadOffsets;
TInt32ToInt64Map m_consumerAckdOffsets;
TInt32ToInt64SetMap m_receivedMsgAcks;
CORE::UInt64 m_tickCountAtLastOffsetCommit;
CORE::UInt64 m_tickCountAtConsumeDelayRequest;
CORE::UInt32 m_requestedConsumeDelayInMs;
bool m_msgsReceivedSinceLastOffsetCommit;
bool m_msgsAckdSinceLastOffsetCommit;
bool m_consumerOffsetWaitsForExplicitMsgAck;
CORE::UInt64 m_currentPublishActionId;
CORE::UInt64 m_currentReceiveActionId;
Expand Down
Loading

0 comments on commit 4cd0a33

Please sign in to comment.