Skip to content

Commit

Permalink
pubsub kafka: topic & config tweaks based on testing
Browse files Browse the repository at this point in the history
  • Loading branch information
LiberatorUSA committed Mar 7, 2024
1 parent 8c32e99 commit 92f63d8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,12 @@ CKafkaPubSubClientTopic::offset_commit_cb( RdKafka::ErrorCode err

// check our config to see how we want to handle this scenario
Int64 requestTimeout = GetConsumerConfigSettingAsInt64( "request.timeout.ms", 3000 );
const CORE::CString& offsetResetSetting = GetConsumerConfigSetting( "auto.offset.reset", DefaultOffsetResetValue );
CORE::CString offsetResetSetting = GetConsumerConfigSetting( "auto.offset.reset", DefaultOffsetResetValue );
if ( offsetResetSetting.Lowercase() == "stored" )
{
GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic: The \"auto.offset.reset\" setting is set to \"" + offsetResetSetting + " which is not correct for an invalid/no offset scenario. Overruling to " + DefaultOffsetResetValue );
offsetResetSetting = DefaultOffsetResetValue;
}
GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "KafkaPubSubClientTopic: Using offset reset setting of \"" + offsetResetSetting + "\" with request timeout of " + CORE::ToString( requestTimeout ) );

// Set the desired new offsets
Expand Down Expand Up @@ -1830,10 +1835,20 @@ CKafkaPubSubClientTopic::rebalance_cb( RdKafka::KafkaConsumer* consumer
{
if ( m_firstPartitionAssignment || RdKafka::Topic::OFFSET_INVALID == partitions[ i ]->offset() )
{
CORE::Int64 startOffset = (CORE::Int64) RdKafka::Topic::OFFSET_INVALID;
Int64 requestTimeout = GetConsumerConfigSettingAsInt64( "request.timeout.ms", 3000 );
const CORE::CString& offsetResetSetting = GetConsumerConfigSetting( "auto.offset.reset", m_config.consumerModeStartOffset.IsNULLOrEmpty() ? DefaultOffsetResetValue : m_config.consumerModeStartOffset );
if ( RdKafka::Topic::OFFSET_INVALID == partitions[i]->offset() )
{
startOffset = ConvertKafkaConsumerStartOffset( DefaultOffsetResetValue, partitions[ i ]->partition(), (Int32) requestTimeout );
}
else
{
CORE::CString offsetResetValue = m_config.consumerModeStartOffset.IsNULLOrEmpty() ? DefaultOffsetResetValue : m_config.consumerModeStartOffset;
offsetResetValue = GetConsumerConfigSetting( "auto.offset.reset", offsetResetValue );

startOffset = ConvertKafkaConsumerStartOffset( offsetResetValue, partitions[ i ]->partition(), (Int32) requestTimeout );
}

CORE::Int64 startOffset = ConvertKafkaConsumerStartOffset( offsetResetSetting, partitions[ i ]->partition(), (Int32) requestTimeout );
partitions[ i ]->set_offset( startOffset );
m_consumerOffsets[ partitions[ i ]->partition() ] = startOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
"autoPersistMsgInterval": 1000,
"maxNrOfBookmarksToKeep": 10,
"CustomConfig": {
"vfsRootPath": "InstallPath/bookmarks",
"vfsRootPath": "DataPath/bookmarks",
"persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}"
}
}
Expand Down Expand Up @@ -155,8 +155,8 @@
"CustomConfig": {
"desiredNrOfBuffers": 3,
"desiredMinimalSerializedBlockSize": 52428800,
"desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 10000,
"vfsStorageRootPath": "InstallPath/pubsub/storage/{pubsubIdPrefix}/{metricsFriendlyTopicName}",
"desiredMaxTimeToWaitToGrowSerializedBlockSizeInMs": 240000,
"vfsStorageRootPath": "DataPath/pubsub/storage/{pubsubIdPrefix}/{topicName}",
"vfsFileExtention": "pubsubcapture",
"encodeCodecFamily": "",
"encodeCodecName": "",
Expand All @@ -180,14 +180,14 @@
"treatEveryFullfilledRequestAsEODEvent": false,
"deleteContainersWithFullyAckdContent": false,
"moveContainersWithFullyAckdContent": true,
"vfsStorageRootPathForFullyAckdContainers": "InstallPath/storage-sink/ackd",
"vfsStorageRootPathForFullyAckdContainers": "DataPath/storage-sink/ackd",
"maxCompletedContainerRefsToRetain": 50,
"binarySerializerOptions": {
"msgDateTimeIncluded": true,
"msgDateTimeAsMsSinceUnixEpochInUtc": true,
"msgIdIncluded": true,
"msgIndexIncluded": false,
"msgPrimaryPayloadIncluded": false,
"msgIndexIncluded": true,
"msgPrimaryPayloadIncluded": true,
"msgKeyValuePairsIncluded": true,
"msgMetaDataKeyValuePairsIncluded": true,
"topicNameIncluded": false
Expand Down Expand Up @@ -254,7 +254,7 @@
"autoPersistMsgInterval": 1000,
"maxNrOfBookmarksToKeep": 10,
"CustomConfig": {
"vfsRootPath": "InstallPath/pubsub/bookmarks",
"vfsRootPath": "DataPath/pubsub/bookmarks",
"persistenceStructure": "{bookmarkNamespace}/clientType/{clientType}/topicName/{topicName}"
}
}
Expand Down

0 comments on commit 92f63d8

Please sign in to comment.