From b0e09bf277f647b7e5668e43bff19aaeb04843f8 Mon Sep 17 00:00:00 2001 From: FZambia Date: Wed, 31 Jul 2024 18:13:29 +0300 Subject: [PATCH] option to use hash field ttl for presence --- .../redis_lua/broker_history_add_list.lua | 27 +++-- .../redis_lua/broker_history_add_stream.lua | 57 +++++----- internal/redis_lua/broker_history_list.lua | 14 +-- internal/redis_lua/broker_history_stream.lua | 50 ++++----- .../redis_lua/broker_publish_idempotent.lua | 2 +- internal/redis_lua/presence_add.lua | 19 +++- internal/redis_lua/presence_get.lua | 15 +-- internal/redis_lua/presence_rem.lua | 9 +- internal/redis_lua/presence_stats_get.lua | 31 +++--- presence_redis.go | 25 ++++- presence_redis_test.go | 100 ++++++++++++++++-- 11 files changed, 238 insertions(+), 111 deletions(-) diff --git a/internal/redis_lua/broker_history_add_list.lua b/internal/redis_lua/broker_history_add_list.lua index 99bff5a1..daf034f4 100644 --- a/internal/redis_lua/broker_history_add_list.lua +++ b/internal/redis_lua/broker_history_add_list.lua @@ -15,20 +15,20 @@ if result_key_expire ~= '' then local cached_result = redis.call("hmget", result_key, "e", "s") local result_epoch, result_offset = cached_result[1], cached_result[2] if result_epoch ~= false then - return {result_offset, result_epoch, "1"} + return { result_offset, result_epoch, "1" } end end local current_epoch = redis.call("hget", meta_key, "e") if current_epoch == false then - current_epoch = new_epoch_if_empty - redis.call("hset", meta_key, "e", current_epoch) + current_epoch = new_epoch_if_empty + redis.call("hset", meta_key, "e", current_epoch) end local top_offset = redis.call("hincrby", meta_key, "s", 1) if meta_expire ~= '0' then - redis.call("expire", meta_key, meta_expire) + redis.call("expire", meta_key, meta_expire) end local prev_message_payload = "" @@ -42,15 +42,20 @@ redis.call("ltrim", list_key, 0, ltrim_right_bound) redis.call("expire", list_key, list_ttl) if channel ~= '' then - if use_delta == "1" then - payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload - end - redis.call(publish_command, channel, payload) + if use_delta == "1" then + payload = "__" .. + "d1:" .. + top_offset .. + ":" .. + current_epoch .. + ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload + end + redis.call(publish_command, channel, payload) end if result_key_expire ~= '' then - redis.call("hset", result_key, "e", current_epoch, "s", top_offset) - redis.call("expire", result_key, result_key_expire) + redis.call("hset", result_key, "e", current_epoch, "s", top_offset) + redis.call("expire", result_key, result_key_expire) end -return {top_offset, current_epoch, "0"} +return { top_offset, current_epoch, "0" } diff --git a/internal/redis_lua/broker_history_add_stream.lua b/internal/redis_lua/broker_history_add_stream.lua index 22065f33..1521ac72 100644 --- a/internal/redis_lua/broker_history_add_stream.lua +++ b/internal/redis_lua/broker_history_add_stream.lua @@ -15,56 +15,61 @@ if result_key_expire ~= '' then local cached_result = redis.call("hmget", result_key, "e", "s") local result_epoch, result_offset = cached_result[1], cached_result[2] if result_epoch ~= false then - return {result_offset, result_epoch, "1"} + return { result_offset, result_epoch, "1" } end end local current_epoch = redis.call("hget", meta_key, "e") if current_epoch == false then - current_epoch = new_epoch_if_empty - redis.call("hset", meta_key, "e", current_epoch) + current_epoch = new_epoch_if_empty + redis.call("hset", meta_key, "e", current_epoch) end local top_offset = redis.call("hincrby", meta_key, "s", 1) if meta_expire ~= '0' then - redis.call("expire", meta_key, meta_expire) + redis.call("expire", meta_key, meta_expire) end local prev_message_payload = "" if use_delta == "1" then - local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1) - if #prev_entries > 0 then - prev_message_payload = prev_entries[1][2]["d"] - local fields_and_values = prev_entries[1][2] - -- Loop through the fields and values to find the field "d" - for i = 1, #fields_and_values, 2 do - local field = fields_and_values[i] - local value = fields_and_values[i + 1] - if field == "d" then - prev_message_payload = value - break -- Stop the loop once we find the field "d" + local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1) + if #prev_entries > 0 then + prev_message_payload = prev_entries[1][2]["d"] + local fields_and_values = prev_entries[1][2] + -- Loop through the fields and values to find the field "d" + for i = 1, #fields_and_values, 2 do + local field = fields_and_values[i] + local value = fields_and_values[i + 1] + if field == "d" then + prev_message_payload = value + break -- Stop the loop once we find the field "d" + end end end - end end redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload) redis.call("expire", stream_key, stream_ttl) if channel ~= '' then - local payload - if use_delta == "1" then - payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload - else - payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload - end - redis.call(publish_command, channel, payload) + local payload + if use_delta == "1" then + payload = "__" .. + "d1:" .. + top_offset .. + ":" .. + current_epoch .. + ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload + else + payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload + end + redis.call(publish_command, channel, payload) end if result_key_expire ~= '' then - redis.call("hset", result_key, "e", current_epoch, "s", top_offset) - redis.call("expire", result_key, result_key_expire) + redis.call("hset", result_key, "e", current_epoch, "s", top_offset) + redis.call("expire", result_key, result_key_expire) end -return {top_offset, current_epoch, "0"} +return { top_offset, current_epoch, "0" } diff --git a/internal/redis_lua/broker_history_list.lua b/internal/redis_lua/broker_history_list.lua index 86abc96a..44c19907 100644 --- a/internal/redis_lua/broker_history_list.lua +++ b/internal/redis_lua/broker_history_list.lua @@ -9,22 +9,22 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s") local current_epoch, top_offset = stream_meta[1], stream_meta[2] if current_epoch == false then - current_epoch = new_epoch_if_empty - top_offset = 0 - redis.call("hset", meta_key, "e", current_epoch) + current_epoch = new_epoch_if_empty + top_offset = 0 + redis.call("hset", meta_key, "e", current_epoch) end if top_offset == false then - top_offset = 0 + top_offset = 0 end if meta_expire ~= '0' then - redis.call("expire", meta_key, meta_expire) + redis.call("expire", meta_key, meta_expire) end local pubs = nil if include_publications ~= "0" then - pubs = redis.call("lrange", list_key, 0, list_right_bound) + pubs = redis.call("lrange", list_key, 0, list_right_bound) end -return {top_offset, current_epoch, pubs} +return { top_offset, current_epoch, pubs } diff --git a/internal/redis_lua/broker_history_stream.lua b/internal/redis_lua/broker_history_stream.lua index f7398c06..fe329bad 100644 --- a/internal/redis_lua/broker_history_stream.lua +++ b/internal/redis_lua/broker_history_stream.lua @@ -11,43 +11,43 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s") local current_epoch, top_offset = stream_meta[1], stream_meta[2] if current_epoch == false then - current_epoch = new_epoch_if_empty - top_offset = 0 - redis.call("hset", meta_key, "e", current_epoch) + current_epoch = new_epoch_if_empty + top_offset = 0 + redis.call("hset", meta_key, "e", current_epoch) end if top_offset == false then - top_offset = 0 + top_offset = 0 end if meta_expire ~= '0' then - redis.call("expire", meta_key, meta_expire) + redis.call("expire", meta_key, meta_expire) end local pubs = nil if include_publications ~= "0" then - if limit ~= "0" then - if reverse == "0" then - pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit) + if limit ~= "0" then + if reverse == "0" then + pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit) + else + local get_offset = top_offset + if since_offset ~= "0" then + get_offset = since_offset + end + pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit) + end else - local get_offset = top_offset - if since_offset ~= "0" then - get_offset = since_offset - end - pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit) + if reverse == "0" then + pubs = redis.call("xrange", stream_key, since_offset, "+") + else + local get_offset = top_offset + if since_offset ~= "0" then + get_offset = since_offset + end + pubs = redis.call("xrevrange", stream_key, get_offset, "-") + end end - else - if reverse == "0" then - pubs = redis.call("xrange", stream_key, since_offset, "+") - else - local get_offset = top_offset - if since_offset ~= "0" then - get_offset = since_offset - end - pubs = redis.call("xrevrange", stream_key, get_offset, "-") - end - end end -return {top_offset, current_epoch, pubs} +return { top_offset, current_epoch, pubs } diff --git a/internal/redis_lua/broker_publish_idempotent.lua b/internal/redis_lua/broker_publish_idempotent.lua index 0bc32f19..7a683ab2 100644 --- a/internal/redis_lua/broker_publish_idempotent.lua +++ b/internal/redis_lua/broker_publish_idempotent.lua @@ -8,7 +8,7 @@ if result_key_expire ~= '' then local stream_meta = redis.call("hmget", result_key, "e", "s") local result_epoch, result_offset = stream_meta[1], stream_meta[2] if result_epoch ~= false then - return {result_offset, result_epoch} + return { result_offset, result_epoch } end end diff --git a/internal/redis_lua/presence_add.lua b/internal/redis_lua/presence_add.lua index 14fcd118..33d52f75 100644 --- a/internal/redis_lua/presence_add.lua +++ b/internal/redis_lua/presence_add.lua @@ -9,25 +9,34 @@ -- ARGV[4] - info payload -- ARGV[5] - user ID -- ARGV[6] - enable user mapping "0" or "1" +-- ARGV[7] - use hash field TTL "0" or "1" -- Check if client ID is new. local isNewClient = false if ARGV[6] ~= '0' then - isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0 + isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0 end -- Add per-client presence. -redis.call("zadd", KEYS[1], ARGV[2], ARGV[3]) redis.call("hset", KEYS[2], ARGV[3], ARGV[4]) -redis.call("expire", KEYS[1], ARGV[1]) redis.call("expire", KEYS[2], ARGV[1]) +if ARGV[7] == '0' then + redis.call("zadd", KEYS[1], ARGV[2], ARGV[3]) + redis.call("expire", KEYS[1], ARGV[1]) +else + redis.call("hexpire", KEYS[2], ARGV[1], "FIELDS", "1", ARGV[3]) +end -- Add per-user information. if ARGV[6] ~= '0' then - redis.call("zadd", KEYS[3], ARGV[2], ARGV[5]) - redis.call("expire", KEYS[3], ARGV[1]) if isNewClient then redis.call("hincrby", KEYS[4], ARGV[5], 1) end redis.call("expire", KEYS[4], ARGV[1]) + if ARGV[7] == '0' then + redis.call("zadd", KEYS[3], ARGV[2], ARGV[5]) + redis.call("expire", KEYS[3], ARGV[1]) + else + redis.call("hexpire", KEYS[4], ARGV[1], "FIELDS", "1", ARGV[5]) + end end diff --git a/internal/redis_lua/presence_get.lua b/internal/redis_lua/presence_get.lua index af40fb0e..0c124fc2 100644 --- a/internal/redis_lua/presence_get.lua +++ b/internal/redis_lua/presence_get.lua @@ -2,11 +2,14 @@ -- KEYS[1] - presence set key -- KEYS[2] - presence hash key -- ARGV[1] - current timestamp in seconds -local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1]) -if #expired > 0 then - for num = 1, #expired do - redis.call("hdel", KEYS[2], expired[num]) - end - redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1]) +-- ARGV[2] - use hash field TTL "0" or "1" +if ARGV[2] == '0' then + local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1]) + if #expired > 0 then + for num = 1, #expired do + redis.call("hdel", KEYS[2], expired[num]) + end + redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1]) + end end return redis.call("hgetall", KEYS[2]) diff --git a/internal/redis_lua/presence_rem.lua b/internal/redis_lua/presence_rem.lua index e24ba39c..c722f441 100644 --- a/internal/redis_lua/presence_rem.lua +++ b/internal/redis_lua/presence_rem.lua @@ -6,6 +6,7 @@ -- ARGV[1] - client ID -- ARGV[2] - user ID -- ARGV[3] - enable user mapping "0" or "1" +-- ARGV[4] - use hash field TTL "0" or "1" local clientExists = false if ARGV[3] ~= '0' then @@ -14,14 +15,18 @@ if ARGV[3] ~= '0' then end redis.call("hdel", KEYS[2], ARGV[1]) -redis.call("zrem", KEYS[1], ARGV[1]) +if ARGV[4] == '0' then + redis.call("zrem", KEYS[1], ARGV[1]) +end if ARGV[3] ~= '0' and clientExists then local connectionsCount = redis.call("hincrby", KEYS[4], ARGV[2], -1) -- If the number of connections for this user is zero, remove the user -- from the sorted set and clean hash. if connectionsCount <= 0 then - redis.call("zrem", KEYS[3], ARGV[2]) + if ARGV[4] == '0' then + redis.call("zrem", KEYS[3], ARGV[2]) + end redis.call("hdel", KEYS[4], ARGV[2]) end end diff --git a/internal/redis_lua/presence_stats_get.lua b/internal/redis_lua/presence_stats_get.lua index fc76f0d6..5cc3e7f9 100644 --- a/internal/redis_lua/presence_stats_get.lua +++ b/internal/redis_lua/presence_stats_get.lua @@ -4,23 +4,28 @@ -- KEYS[3] - per-user zset key -- KEYS[4] - per-user hash key -- ARGV[1] - current timestamp in seconds -local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1]) -if #expired > 0 then - for num = 1, #expired do - redis.call("hdel", KEYS[2], expired[num]) - end - redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1]) +-- ARGV[2] - use hash field TTL "0" or "1" +if ARGV[2] == '0' then + local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1]) + if #expired > 0 then + for num = 1, #expired do + redis.call("hdel", KEYS[2], expired[num]) + end + redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1]) + end end -local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1]) -if #userExpired > 0 then - for num = 1, #userExpired do - redis.call("hdel", KEYS[4], userExpired[num]) - end - redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1]) +if ARGV[2] == '0' then + local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1]) + if #userExpired > 0 then + for num = 1, #userExpired do + redis.call("hdel", KEYS[4], userExpired[num]) + end + redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1]) + end end local clientCount = redis.call("hlen", KEYS[2]) local userCount = redis.call("hlen", KEYS[4]) -return {clientCount, userCount} +return { clientCount, userCount } diff --git a/presence_redis.go b/presence_redis.go index b3d7fcc1..dd1c37be 100644 --- a/presence_redis.go +++ b/presence_redis.go @@ -52,6 +52,12 @@ type RedisPresenceManagerConfig struct { // Redis side instead of loading the entire presence information. By default, user mapping // is not maintained. EnableUserMapping func(channel string) bool + + // UseHashFieldTTL allows using HEXPIRE command to set TTL for hash field. It's only available + // since Redis 7.4.0 thus disabled by default. Using hash field TTL can be useful to avoid + // maintaining expiration index in ZSET – so both useful from the throughput and memory usage + // perspective. + UseHashFieldTTL bool } var ( @@ -131,7 +137,10 @@ func (m *RedisPresenceManager) addPresenceScriptKeysArgs(s *RedisShard, ch strin expireAt := time.Now().Unix() + int64(expire) useUserMapping := m.useUserMappingArg(ch) - args := []string{strconv.Itoa(expire), strconv.FormatInt(expireAt, 10), uid, convert.BytesToString(infoBytes), info.UserID, useUserMapping} + args := []string{ + strconv.Itoa(expire), strconv.FormatInt(expireAt, 10), uid, + convert.BytesToString(infoBytes), info.UserID, useUserMapping, m.useHashFieldTTLArg(), + } return keys, args, nil } @@ -144,6 +153,14 @@ func (m *RedisPresenceManager) useUserMappingArg(ch string) string { return useUserMapping } +func (m *RedisPresenceManager) useHashFieldTTLArg() string { + useHashFieldTTL := "0" + if m.config.UseHashFieldTTL { + useHashFieldTTL = "1" + } + return useHashFieldTTL +} + func (m *RedisPresenceManager) addPresence(s *RedisShard, ch string, uid string, info *ClientInfo) error { keys, args, err := m.addPresenceScriptKeysArgs(s, ch, uid, info) if err != nil { @@ -170,7 +187,7 @@ func (m *RedisPresenceManager) removePresenceScriptKeysArgs(s *RedisShard, ch st keys := []string{string(setKey), string(hashKey), string(userSetKey), string(userHashKey)} useUserMapping := m.useUserMappingArg(ch) - args := []string{uid, userID, useUserMapping} + args := []string{uid, userID, useUserMapping, m.useHashFieldTTLArg()} return keys, args, nil } @@ -197,7 +214,7 @@ func (m *RedisPresenceManager) presenceScriptKeysArgs(s *RedisShard, ch string) keys := []string{string(setKey), string(hashKey)} now := int(time.Now().Unix()) - args := []string{strconv.Itoa(now)} + args := []string{strconv.Itoa(now), m.useHashFieldTTLArg()} return keys, args, nil } @@ -210,7 +227,7 @@ func (m *RedisPresenceManager) presenceStatsScriptKeysArgs(s *RedisShard, ch str keys := []string{string(setKey), string(hashKey), string(userSetKey), string(userHashKey)} now := int(time.Now().Unix()) - args := []string{strconv.Itoa(now)} + args := []string{strconv.Itoa(now), m.useHashFieldTTLArg()} return keys, args, nil } diff --git a/presence_redis_test.go b/presence_redis_test.go index afcacbb0..be7b331d 100644 --- a/presence_redis_test.go +++ b/presence_redis_test.go @@ -13,11 +13,11 @@ import ( "github.com/stretchr/testify/require" ) -func newTestRedisPresenceManager(tb testing.TB, n *Node, useCluster bool, userMapping bool, port int) *RedisPresenceManager { +func newTestRedisPresenceManager(tb testing.TB, n *Node, useCluster bool, userMapping, hashFieldTTL bool, port int) *RedisPresenceManager { if useCluster { return NewTestRedisPresenceManagerClusterWithPrefix(tb, n, getUniquePrefix(), userMapping) } - return NewTestRedisPresenceManagerWithPrefix(tb, n, getUniquePrefix(), userMapping, port) + return NewTestRedisPresenceManagerWithPrefix(tb, n, getUniquePrefix(), userMapping, hashFieldTTL, port) } func stopRedisPresenceManager(pm *RedisPresenceManager) { @@ -26,7 +26,7 @@ func stopRedisPresenceManager(pm *RedisPresenceManager) { } } -func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string, userMapping bool, port int) *RedisPresenceManager { +func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string, userMapping, hashFieldTTL bool, port int) *RedisPresenceManager { redisConf := testSingleRedisConf(port) s, err := NewRedisShard(n, redisConf) require.NoError(tb, err) @@ -36,6 +36,7 @@ func NewTestRedisPresenceManagerWithPrefix(tb testing.TB, n *Node, prefix string EnableUserMapping: func(_ string) bool { return userMapping }, + UseHashFieldTTL: hashFieldTTL, }) if err != nil { tb.Fatal(err) @@ -100,7 +101,7 @@ func TestRedisPresenceManager(t *testing.T) { for _, tt := range redisPresenceTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, false, tt.Port) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, false, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -130,7 +131,7 @@ func TestRedisPresenceManagerWithUserMapping(t *testing.T) { for _, tt := range redisPresenceTests { t.Run(tt.Name, func(t *testing.T) { node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, tt.Port) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -200,13 +201,90 @@ func TestRedisPresenceManagerWithUserMapping(t *testing.T) { } } +func TestRedisPresenceManagerWithHashFieldTTL(t *testing.T) { + t.Skip() // Will work on Redis 7.4 for now, so skipping for now since CI also runs on Redis 6. + for _, tt := range redisPresenceTests { + for _, userMapping := range []bool{true, false} { + t.Run(tt.Name+"_user_mapping_"+strconv.FormatBool(userMapping), func(t *testing.T) { + node := testNode(t) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, userMapping, true, tt.Port) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisPresenceManager(pm) + + // adding presence for the first time. + require.NoError(t, pm.AddPresence("channel", "uid", &ClientInfo{ + ClientID: "uid", + UserID: "1", + })) + + // same conn, same user. + require.NoError(t, pm.AddPresence("channel", "uid", &ClientInfo{ + ClientID: "uid", + UserID: "1", + })) + + stats, err := pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 1, stats.NumClients) + require.Equal(t, 1, stats.NumUsers) + + // same user, different conn + require.NoError(t, pm.AddPresence("channel", "uid-2", &ClientInfo{ + ClientID: "uid-2", + UserID: "1", + })) + + stats, err = pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 2, stats.NumClients) + require.Equal(t, 1, stats.NumUsers) + + // different user, different conn + require.NoError(t, pm.AddPresence("channel", "uid-3", &ClientInfo{ + ClientID: "uid-3", + UserID: "2", + })) + + stats, err = pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 3, stats.NumClients) + require.Equal(t, 2, stats.NumUsers) + + err = pm.RemovePresence("channel", "uid", "1") + require.NoError(t, err) + + stats, err = pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 2, stats.NumClients) + require.Equal(t, 2, stats.NumUsers) + + err = pm.RemovePresence("channel", "uid-2", "1") + require.NoError(t, err) + + stats, err = pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 1, stats.NumClients) + require.Equal(t, 1, stats.NumUsers) + + err = pm.RemovePresence("channel", "uid-3", "2") + require.NoError(t, err) + + stats, err = pm.PresenceStats("channel") + require.NoError(t, err) + require.Equal(t, 0, stats.NumClients) + require.Equal(t, 0, stats.NumUsers) + }) + } + } +} + func TestRedisPresenceManagerWithUserMappingExpire(t *testing.T) { t.Parallel() for _, tt := range redisPresenceTests { t.Run(tt.Name, func(t *testing.T) { t.Parallel() node := testNode(t) - pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, tt.Port) + pm := newTestRedisPresenceManager(t, node, tt.UseCluster, true, false, tt.Port) pm.config.PresenceTTL = 2 * time.Second defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) @@ -276,7 +354,7 @@ func BenchmarkRedisAddPresence_1Ch(b *testing.B) { for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -297,7 +375,7 @@ func BenchmarkRedisAddPresence_ManyCh(b *testing.B) { for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -321,7 +399,7 @@ func BenchmarkRedisPresence_1Ch(b *testing.B) { for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -343,7 +421,7 @@ func BenchmarkRedisPresence_ManyCh(b *testing.B) { for _, tt := range redisPresenceTests { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, tt.Port) + pm := newTestRedisPresenceManager(b, node, tt.UseCluster, false, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism()) @@ -368,7 +446,7 @@ func BenchmarkRedisPresenceStatsWithMapping(b *testing.B) { for _, tt := range excludeClusterPresenceTests(redisPresenceTests) { b.Run(tt.Name, func(b *testing.B) { node := benchNode(b) - pm := newTestRedisPresenceManager(b, node, false, true, tt.Port) + pm := newTestRedisPresenceManager(b, node, false, true, false, tt.Port) defer func() { _ = node.Shutdown(context.Background()) }() defer stopRedisPresenceManager(pm) b.SetParallelism(getBenchParallelism())