Skip to content

Commit

Permalink
Custom mod of redis-plus-plus library to add support to the xadd comm…
Browse files Browse the repository at this point in the history
…and for the minid param

Also added code to use that mod from the pubsub redis plugin
  • Loading branch information
LiberatorUSA committed Apr 29, 2024
1 parent c7efa92 commit 1cf6811
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 4 deletions.
21 changes: 21 additions & 0 deletions dependencies/redis-plus-plus/src/sw/redis++/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,27 @@ void xadd_maxlen_range(Connection &connection,
connection.send(args);
}

// DV edit: add minid variant
template <typename Input>
void dvcustom_xadd_minid_range(Connection &connection,
const StringView &key,
const StringView &id,
Input first,
Input last,
long long minid,
bool approx) {
CmdArgs args;
args << "XADD" << key << "MINID";

if (approx) {
args << "~";
}

args << minid << id << std::make_pair(first, last);

connection.send(args);
}

inline void xclaim(Connection &connection,
const StringView &key,
const StringView &group,
Expand Down
13 changes: 13 additions & 0 deletions dependencies/redis-plus-plus/src/sw/redis++/queued_redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -1610,6 +1610,19 @@ class QueuedRedis {
return command(cmd::xadd_maxlen_range<Input>, key, id, first, last, count, approx);
}

// DV edit: add minid variant
template <typename Input>
QueuedRedis& dvcustom_xadd_minid(const StringView &key,
const StringView &id,
Input first,
Input last,
long long minid,
bool approx = true) {
range_check("XADD", first, last);

return command(cmd::dvcustom_xadd_minid_range<Input>, key, id, first, last, minid, approx);
}

template <typename T>
QueuedRedis& xadd(const StringView &key,
const StringView &id,
Expand Down
9 changes: 9 additions & 0 deletions dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,15 @@ class RedisCluster {
long long count,
bool approx = true);

// DV edit: add minid variant
template <typename Input>
std::string dvcustom_xadd_minid(const StringView &key,
const StringView &id,
Input first,
Input last,
long long minid,
bool approx = true);

template <typename T>
std::string xadd(const StringView &key,
const StringView &id,
Expand Down
15 changes: 15 additions & 0 deletions dependencies/redis-plus-plus/src/sw/redis++/redis_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,21 @@ std::string RedisCluster::xadd(const StringView &key,
return reply::parse<std::string>(*reply);
}

// DV edit: add minid variant
template <typename Input>
std::string RedisCluster::dvcustom_xadd_minid(const StringView &key,
const StringView &id,
Input first,
Input last,
long long minid,
bool approx) {
auto reply = command(cmd::dvcustom_xadd_minid_range<Input>, key, id, first, last, minid, approx);

return reply::parse<std::string>(*reply);
}



template <typename Output>
void RedisCluster::xclaim(const StringView &key,
const StringView &group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ class PUBSUBPLUGIN_REDISCLUSTER_PLUGIN_PRIVATE_CPP CRedisClusterPubSubClientTopi
{
public:

CORE::Int32 redisXAddMaxLen;
bool redisXAddMaxLenIsApproximate;
bool redisXAddIgnoreMsgId;
bool autoGenerateRedisAddMinId; /**< whether to auto generate the min id for the XADD command. This is a Unix epoch timestamp of the publish time at the node */
CORE::UInt64 maxAgeInMsForMinId; /**< the max age of a message in the stream in milliseconds based on current system time. Used when autoGenerateRedisAddMinId is true */
bool redisXAddMaxAgeIsApproximate; /**< whether the max age is approximate or exact thus resulting in a ~ or = passed to redis. exact has a performance penalty for Redis */
CORE::Int32 redisXAddMaxLen; /**< the max len of the stream. A value smaller than 0 means maxlen is not used */
bool redisXAddMaxLenIsApproximate; /**< whether the max len is approximate or exact thus resulting in a ~ or = passed to redis. exact has a performance penalty for Redis */
bool redisXAddIgnoreMsgId; /**< whether to ignore the msg id on the message object and instead have the Redis node generate the key which is a Unix epoch timestamp of the publish time at the node */
CORE::CString redisXReadDefaultOffset;
CORE::Int32 redisXReadCount;
CORE::UInt32 redisXReadBlockTimeoutInMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,13 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId

if ( m_config.preferDedicatedConnection && GUCEF_NULL != m_redisPipeline )
{
if ( m_config.autoGenerateRedisAddMinId )
{
// the cluster uses local time for its auto-generated message id as such we will use the same but are assuming the cluster is in the same timezone
CORE::UInt64 maxAgeTimestamp = CORE::CDateTime::NowLocalDateTime().ToUnixEpochBasedTicksInMillisecs() - m_config.maxAgeInMsForMinId;
m_redisPipeline->dvcustom_xadd_minid( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), maxAgeTimestamp, m_config.redisXAddMaxAgeIsApproximate );
}
else
if ( m_config.redisXAddMaxLen >= 0 )
m_redisPipeline->xadd( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), m_config.redisXAddMaxLen, m_config.redisXAddMaxLenIsApproximate );
else
Expand Down Expand Up @@ -488,6 +495,13 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId
}
else
{
if ( m_config.autoGenerateRedisAddMinId )
{
// the cluster uses local time for its auto-generated message id as such we will use the same but are assuming the cluster is in the same timezone
CORE::UInt64 maxAgeTimestamp = CORE::CDateTime::NowLocalDateTime().ToUnixEpochBasedTicksInMillisecs() - m_config.maxAgeInMsForMinId;
m_redisContext->dvcustom_xadd_minid( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), maxAgeTimestamp, m_config.redisXAddMaxAgeIsApproximate );
}
else
if ( m_config.redisXAddMaxLen >= 0 )
m_redisContext->xadd( cnSV, msgIdToUse, kvPairs.begin(), kvPairs.end(), m_config.redisXAddMaxLen, m_config.redisXAddMaxLenIsApproximate );
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ namespace REDISCLUSTER {
CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( void )
: PUBSUB::CPubSubClientTopicConfig()
, CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this )
, autoGenerateRedisAddMinId( false )
, maxAgeInMsForMinId( GUCEF_MT_UINT64MAX )
, redisXAddMaxAgeIsApproximate( true )
, redisXAddMaxLen( -1 )
, redisXAddMaxLenIsApproximate( true )
, redisXAddIgnoreMsgId( true )
Expand All @@ -66,6 +69,9 @@ CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( void
CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( const CRedisClusterPubSubClientTopicConfig& src )
: PUBSUB::CPubSubClientTopicConfig( src )
, CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this )
, autoGenerateRedisAddMinId( src.autoGenerateRedisAddMinId )
, maxAgeInMsForMinId( src.maxAgeInMsForMinId )
, redisXAddMaxAgeIsApproximate( src.redisXAddMaxAgeIsApproximate )
, redisXAddMaxLen( src.redisXAddMaxLen )
, redisXAddMaxLenIsApproximate( src.redisXAddMaxLenIsApproximate )
, redisXAddIgnoreMsgId( src.redisXAddIgnoreMsgId )
Expand All @@ -84,6 +90,9 @@ CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( cons
CRedisClusterPubSubClientTopicConfig::CRedisClusterPubSubClientTopicConfig( const PUBSUB::CPubSubClientTopicConfig& genericConfig )
: PUBSUB::CPubSubClientTopicConfig( genericConfig )
, CORE::CTSharedObjCreator< CRedisClusterPubSubClientTopicConfig, MT::CMutex >( this )
, autoGenerateRedisAddMinId( false )
, maxAgeInMsForMinId( GUCEF_MT_UINT64MAX )
, redisXAddMaxAgeIsApproximate( true )
, redisXAddMaxLen( -1 )
, redisXAddMaxLenIsApproximate( true )
, redisXAddIgnoreMsgId( true )
Expand Down Expand Up @@ -111,6 +120,9 @@ bool
CRedisClusterPubSubClientTopicConfig::LoadCustomConfig( const CORE::CDataNode& config )
{GUCEF_TRACE;

autoGenerateRedisAddMinId = config.GetAttributeValueOrChildValueByName( "autoGenerateRedisAddMinId" ).AsBool( autoGenerateRedisAddMinId, true );
maxAgeInMsForMinId = config.GetAttributeValueOrChildValueByName( "maxAgeInMsForMinId" ).AsUInt64( maxAgeInMsForMinId, true );
redisXAddMaxAgeIsApproximate = config.GetAttributeValueOrChildValueByName( "redisXAddMaxAgeIsApproximate" ).AsBool( redisXAddMaxAgeIsApproximate, true );
redisXAddMaxLen = config.GetAttributeValueOrChildValueByName( "redisXAddMaxLen" ).AsInt32( redisXAddMaxLen, true );
redisXAddMaxLenIsApproximate = config.GetAttributeValueOrChildValueByName( "redisXAddMaxLenIsApproximate" ).AsBool( redisXAddMaxLenIsApproximate, true );
redisXAddIgnoreMsgId = config.GetAttributeValueOrChildValueByName( "redisXAddIgnoreMsgId" ).AsBool( redisXAddIgnoreMsgId, true );
Expand Down Expand Up @@ -160,6 +172,9 @@ CRedisClusterPubSubClientTopicConfig::operator=( const CRedisClusterPubSubClient
if ( &src != this )
{
PUBSUB::CPubSubClientTopicConfig::operator=( src );
autoGenerateRedisAddMinId = src.autoGenerateRedisAddMinId;
maxAgeInMsForMinId = src.maxAgeInMsForMinId;
redisXAddMaxAgeIsApproximate = src.redisXAddMaxAgeIsApproximate;
redisXAddMaxLen = src.redisXAddMaxLen;
redisXAddMaxLenIsApproximate = src.redisXAddMaxLenIsApproximate;
redisXAddIgnoreMsgId = src.redisXAddIgnoreMsgId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
"journalPath": "vfs://DataPath/journals/clientType/{clientType}/instance/topics/{clientInstance}/topicName/{topicName}"
},
"CustomConfig": {
"redisXAddMaxLen": -1,
"autoGenerateRedisAddMinId": false,
"maxAgeInMsForMinId": 31560000000,
"redisXAddMaxLen": -1,
"redisXAddMaxLenIsApproximate": true,
"redisXAddIgnoreMsgId": true,
"redisXReadDefaultOffset": "0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
"useTopicLevelMaxTotalMsgsInFlight": false,
"maxTotalMsgsInFlight": -1,
"CustomConfig": {
"autoGenerateRedisAddMinId": false,
"maxAgeInMsForMinId": 31560000000,
"redisXAddMaxLen": -1,
"redisXAddMaxLenIsApproximate": true,
"redisXAddIgnoreMsgId": true,
Expand Down

0 comments on commit 1cf6811

Please sign in to comment.