diff --git a/dependencies/redis-plus-plus/src/sw/redis++/command.h b/dependencies/redis-plus-plus/src/sw/redis++/command.h index bc9338ed5..b49c5eca8 100644 --- a/dependencies/redis-plus-plus/src/sw/redis++/command.h +++ b/dependencies/redis-plus-plus/src/sw/redis++/command.h @@ -1748,6 +1748,27 @@ void xadd_maxlen_range(Connection &connection, connection.send(args); } +// DV edit: add minid variant +template +void dvcustom_xadd_minid_range(Connection &connection, + const StringView &key, + const StringView &id, + Input first, + Input last, + long long minid, + bool approx) { + CmdArgs args; + args << "XADD" << key << "MINID"; + + if (approx) { + args << "~"; + } + + args << minid << id << std::make_pair(first, last); + + connection.send(args); +} + inline void xclaim(Connection &connection, const StringView &key, const StringView &group, diff --git a/dependencies/redis-plus-plus/src/sw/redis++/queued_redis.h b/dependencies/redis-plus-plus/src/sw/redis++/queued_redis.h index 5285b7496..b95b8152d 100644 --- a/dependencies/redis-plus-plus/src/sw/redis++/queued_redis.h +++ b/dependencies/redis-plus-plus/src/sw/redis++/queued_redis.h @@ -1610,6 +1610,19 @@ class QueuedRedis { return command(cmd::xadd_maxlen_range, key, id, first, last, count, approx); } + // DV edit: add minid variant + template + QueuedRedis& dvcustom_xadd_minid(const StringView &key, + const StringView &id, + Input first, + Input last, + long long minid, + bool approx = true) { + range_check("XADD", first, last); + + return command(cmd::dvcustom_xadd_minid_range, key, id, first, last, minid, approx); + } + template QueuedRedis& xadd(const StringView &key, const StringView &id, diff --git a/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.h b/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.h index fbed369ea..b6d35b9e9 100644 --- a/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.h +++ b/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.h @@ -1084,6 +1084,15 @@ class RedisCluster { long long count, bool approx = true); + // DV edit: add minid variant + template + std::string dvcustom_xadd_minid(const StringView &key, + const StringView &id, + Input first, + Input last, + long long minid, + bool approx = true); + template std::string xadd(const StringView &key, const StringView &id, diff --git a/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.hpp b/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.hpp index 62c454b61..532c853e3 100644 --- a/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.hpp +++ b/dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.hpp @@ -1022,6 +1022,21 @@ std::string RedisCluster::xadd(const StringView &key, return reply::parse(*reply); } +// DV edit: add minid variant +template +std::string RedisCluster::dvcustom_xadd_minid(const StringView &key, + const StringView &id, + Input first, + Input last, + long long minid, + bool approx) { + auto reply = command(cmd::dvcustom_xadd_minid_range, key, id, first, last, minid, approx); + + return reply::parse(*reply); +} + + + template void RedisCluster::xclaim(const StringView &key, const StringView &group, diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.h b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.h index 2c86c0f1d..a9a86bc24 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.h +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/include/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.h @@ -62,9 +62,12 @@ class PUBSUBPLUGIN_REDISCLUSTER_PLUGIN_PRIVATE_CPP CRedisClusterPubSubClientTopi { public: - CORE::Int32 redisXAddMaxLen; - bool redisXAddMaxLenIsApproximate; - bool redisXAddIgnoreMsgId; + bool autoGenerateRedisAddMinId; /**< whether to auto generate the min id for the XADD command. This is a Unix epoch timestamp of the publish time at the node */ + CORE::UInt64 maxAgeInMsForMinId; /**< the max age of a message in the stream in milliseconds based on current system time. Used when autoGenerateRedisAddMinId is true */ + bool redisXAddMaxAgeIsApproximate; /**< whether the max age is approximate or exact thus resulting in a ~ or = passed to redis. exact has a performance penalty for Redis */ + CORE::Int32 redisXAddMaxLen; /**< the max len of the stream. A value smaller than 0 means maxlen is not used */ + bool redisXAddMaxLenIsApproximate; /**< whether the max len is approximate or exact thus resulting in a ~ or = passed to redis. exact has a performance penalty for Redis */ + bool redisXAddIgnoreMsgId; /**< whether to ignore the msg id on the message object and instead have the Redis node generate the key which is a Unix epoch timestamp of the publish time at the node */ CORE::CString redisXReadDefaultOffset; CORE::Int32 redisXReadCount; CORE::UInt32 redisXReadBlockTimeoutInMs; diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp index 332442315..16311972a 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp @@ -450,6 +450,13 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId if ( m_config.preferDedicatedConnection && GUCEF_NULL != m_redisPipeline ) { + if ( m_config.autoGenerateRedisAddMinId ) + { + // the cluster uses local time for its auto-generated message id as such we will use the same but are assuming the cluster is in the same timezone + CORE::UInt64 maxAgeTimestamp = CORE::CDateTime::NowLocalDateTime().ToUnixEpochBasedTicksInMillisecs() - m_config.maxAgeInMsForMinId; + m_redisPipeline->dvcustom_xadd_minid( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), maxAgeTimestamp, m_config.redisXAddMaxAgeIsApproximate ); + } + else if ( m_config.redisXAddMaxLen >= 0 ) m_redisPipeline->xadd( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), m_config.redisXAddMaxLen, m_config.redisXAddMaxLenIsApproximate ); else @@ -488,6 +495,13 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId } else { + if ( m_config.autoGenerateRedisAddMinId ) + { + // the cluster uses local time for its auto-generated message id as such we will use the same but are assuming the cluster is in the same timezone + CORE::UInt64 maxAgeTimestamp = CORE::CDateTime::NowLocalDateTime().ToUnixEpochBasedTicksInMillisecs() - m_config.maxAgeInMsForMinId; + m_redisContext->dvcustom_xadd_minid( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), maxAgeTimestamp, m_config.redisXAddMaxAgeIsApproximate ); + } + else if ( m_config.redisXAddMaxLen >= 0 ) m_redisContext->xadd( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), m_config.redisXAddMaxLen, m_config.redisXAddMaxLenIsApproximate ); else diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.cpp b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.cpp index 703f0ed98..a75926c58 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.cpp +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopicConfig.cpp @@ -48,6 +48,9 @@ namespace REDISCLUSTER { CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( void ) : PUBSUB::CPubSubClientTopicConfig() , CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this ) + , autoGenerateRedisAddMinId( false ) + , maxAgeInMsForMinId( GUCEF_MT_UINT64MAX ) + , redisXAddMaxAgeIsApproximate( true ) , redisXAddMaxLen( -1 ) , redisXAddMaxLenIsApproximate( true ) , redisXAddIgnoreMsgId( true ) @@ -66,6 +69,9 @@ CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( void CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( const CRedisClusterPubSubClientTopicConfig& src ) : PUBSUB::CPubSubClientTopicConfig( src ) , CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this ) + , autoGenerateRedisAddMinId( src.autoGenerateRedisAddMinId ) + , maxAgeInMsForMinId( src.maxAgeInMsForMinId ) + , redisXAddMaxAgeIsApproximate( src.redisXAddMaxAgeIsApproximate ) , redisXAddMaxLen( src.redisXAddMaxLen ) , redisXAddMaxLenIsApproximate( src.redisXAddMaxLenIsApproximate ) , redisXAddIgnoreMsgId( src.redisXAddIgnoreMsgId ) @@ -84,6 +90,9 @@ CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( cons CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( const PUBSUB::CPubSubClientTopicConfig& genericConfig ) : PUBSUB::CPubSubClientTopicConfig( genericConfig ) , CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this ) + , autoGenerateRedisAddMinId( false ) + , maxAgeInMsForMinId( GUCEF_MT_UINT64MAX ) + , redisXAddMaxAgeIsApproximate( true ) , redisXAddMaxLen( -1 ) , redisXAddMaxLenIsApproximate( true ) , redisXAddIgnoreMsgId( true ) @@ -111,6 +120,9 @@ bool CRedisClusterPubSubClientTopicConfig::LoadCustomConfig( const CORE::CDataNode& config ) {GUCEF_TRACE; + autoGenerateRedisAddMinId = config.GetAttributeValueOrChildValueByName( "autoGenerateRedisAddMinId" ).AsBool( autoGenerateRedisAddMinId, true ); + maxAgeInMsForMinId = config.GetAttributeValueOrChildValueByName( "maxAgeInMsForMinId" ).AsUInt64( maxAgeInMsForMinId, true ); + redisXAddMaxAgeIsApproximate = config.GetAttributeValueOrChildValueByName( "redisXAddMaxAgeIsApproximate" ).AsBool( redisXAddMaxAgeIsApproximate, true ); redisXAddMaxLen = config.GetAttributeValueOrChildValueByName( "redisXAddMaxLen" ).AsInt32( redisXAddMaxLen, true ); redisXAddMaxLenIsApproximate = config.GetAttributeValueOrChildValueByName( "redisXAddMaxLenIsApproximate" ).AsBool( redisXAddMaxLenIsApproximate, true ); redisXAddIgnoreMsgId = config.GetAttributeValueOrChildValueByName( "redisXAddIgnoreMsgId" ).AsBool( redisXAddIgnoreMsgId, true ); @@ -160,6 +172,9 @@ CRedisClusterPubSubClientTopicConfig::operator=( const CRedisClusterPubSubClient if ( &src != this ) { PUBSUB::CPubSubClientTopicConfig::operator=( src ); + autoGenerateRedisAddMinId = src.autoGenerateRedisAddMinId; + maxAgeInMsForMinId = src.maxAgeInMsForMinId; + redisXAddMaxAgeIsApproximate = src.redisXAddMaxAgeIsApproximate; redisXAddMaxLen = src.redisXAddMaxLen; redisXAddMaxLenIsApproximate = src.redisXAddMaxLenIsApproximate; redisXAddIgnoreMsgId = src.redisXAddIgnoreMsgId; diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/redis2storage.json b/tools/pubsub2pubsub/config/examples/channel_templates/redis2storage.json index ce4e3bec6..cf304a5c6 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/redis2storage.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/redis2storage.json @@ -49,7 +49,9 @@ "journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/topics/{clientInstance}/topicName/{topicName}" }, "CustomConfig": { - "redisXAddMaxLen": -1, + "autoGenerateRedisAddMinId": false, + "maxAgeInMsForMinId": 31560000000, + "redisXAddMaxLen": -1, "redisXAddMaxLenIsApproximate": true, "redisXAddIgnoreMsgId": true, "redisXReadDefaultOffset": "0", diff --git a/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json b/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json index bf416756c..0907b8775 100644 --- a/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json +++ b/tools/pubsub2pubsub/config/examples/channel_templates/udp2redis.json @@ -117,6 +117,8 @@ "useTopicLevelMaxTotalMsgsInFlight": false, "maxTotalMsgsInFlight": -1, "CustomConfig": { + "autoGenerateRedisAddMinId": false, + "maxAgeInMsForMinId": 31560000000, "redisXAddMaxLen": -1, "redisXAddMaxLenIsApproximate": true, "redisXAddIgnoreMsgId": true,