Skip to content

Commit

Permalink
option to use hash field ttl for presence
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 31, 2024
1 parent 70fd357 commit b0e09bf
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 111 deletions.
27 changes: 16 additions & 11 deletions internal/redis_lua/broker_history_add_list.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ if result_key_expire ~= '' then
local cached_result = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = cached_result[1], cached_result[2]
if result_epoch ~= false then
return {result_offset, result_epoch, "1"}
return { result_offset, result_epoch, "1" }
end
end

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local prev_message_payload = ""
Expand All @@ -42,15 +42,20 @@ redis.call("ltrim", list_key, 0, ltrim_right_bound)
redis.call("expire", list_key, list_ttl)

if channel ~= '' then
if use_delta == "1" then
payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
end
redis.call(publish_command, channel, payload)
if use_delta == "1" then
payload = "__" ..
"d1:" ..
top_offset ..
":" ..
current_epoch ..
":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
end
redis.call(publish_command, channel, payload)
end

if result_key_expire ~= '' then
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
end

return {top_offset, current_epoch, "0"}
return { top_offset, current_epoch, "0" }
57 changes: 31 additions & 26 deletions internal/redis_lua/broker_history_add_stream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,61 @@ if result_key_expire ~= '' then
local cached_result = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = cached_result[1], cached_result[2]
if result_epoch ~= false then
return {result_offset, result_epoch, "1"}
return { result_offset, result_epoch, "1" }
end
end

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local prev_message_payload = ""
if use_delta == "1" then
local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1)
if #prev_entries > 0 then
prev_message_payload = prev_entries[1][2]["d"]
local fields_and_values = prev_entries[1][2]
-- Loop through the fields and values to find the field "d"
for i = 1, #fields_and_values, 2 do
local field = fields_and_values[i]
local value = fields_and_values[i + 1]
if field == "d" then
prev_message_payload = value
break -- Stop the loop once we find the field "d"
local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1)
if #prev_entries > 0 then
prev_message_payload = prev_entries[1][2]["d"]
local fields_and_values = prev_entries[1][2]
-- Loop through the fields and values to find the field "d"
for i = 1, #fields_and_values, 2 do
local field = fields_and_values[i]
local value = fields_and_values[i + 1]
if field == "d" then
prev_message_payload = value
break -- Stop the loop once we find the field "d"
end
end
end
end
end

redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload)
redis.call("expire", stream_key, stream_ttl)

if channel ~= '' then
local payload
if use_delta == "1" then
payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
else
payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
end
redis.call(publish_command, channel, payload)
local payload
if use_delta == "1" then
payload = "__" ..
"d1:" ..
top_offset ..
":" ..
current_epoch ..
":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
else
payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
end
redis.call(publish_command, channel, payload)
end

if result_key_expire ~= '' then
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
end

return {top_offset, current_epoch, "0"}
return { top_offset, current_epoch, "0" }
14 changes: 7 additions & 7 deletions internal/redis_lua/broker_history_list.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
end

if top_offset == false then
top_offset = 0
top_offset = 0
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil
if include_publications ~= "0" then
pubs = redis.call("lrange", list_key, 0, list_right_bound)
pubs = redis.call("lrange", list_key, 0, list_right_bound)
end

return {top_offset, current_epoch, pubs}
return { top_offset, current_epoch, pubs }
50 changes: 25 additions & 25 deletions internal/redis_lua/broker_history_stream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,43 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
end

if top_offset == false then
top_offset = 0
top_offset = 0
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil

if include_publications ~= "0" then
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
end
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
else
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
end

return {top_offset, current_epoch, pubs}
return { top_offset, current_epoch, pubs }
2 changes: 1 addition & 1 deletion internal/redis_lua/broker_publish_idempotent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if result_key_expire ~= '' then
local stream_meta = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = stream_meta[1], stream_meta[2]
if result_epoch ~= false then
return {result_offset, result_epoch}
return { result_offset, result_epoch }
end
end

Expand Down
19 changes: 14 additions & 5 deletions internal/redis_lua/presence_add.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,34 @@
-- ARGV[4] - info payload
-- ARGV[5] - user ID
-- ARGV[6] - enable user mapping "0" or "1"
-- ARGV[7] - use hash field TTL "0" or "1"

-- Check if client ID is new.
local isNewClient = false
if ARGV[6] ~= '0' then
isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0
isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0
end

-- Add per-client presence.
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("hset", KEYS[2], ARGV[3], ARGV[4])
redis.call("expire", KEYS[1], ARGV[1])
redis.call("expire", KEYS[2], ARGV[1])
if ARGV[7] == '0' then
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("expire", KEYS[1], ARGV[1])
else
redis.call("hexpire", KEYS[2], ARGV[1], "FIELDS", "1", ARGV[3])
end

-- Add per-user information.
if ARGV[6] ~= '0' then
redis.call("zadd", KEYS[3], ARGV[2], ARGV[5])
redis.call("expire", KEYS[3], ARGV[1])
if isNewClient then
redis.call("hincrby", KEYS[4], ARGV[5], 1)
end
redis.call("expire", KEYS[4], ARGV[1])
if ARGV[7] == '0' then
redis.call("zadd", KEYS[3], ARGV[2], ARGV[5])
redis.call("expire", KEYS[3], ARGV[1])
else
redis.call("hexpire", KEYS[4], ARGV[1], "FIELDS", "1", ARGV[5])
end
end
15 changes: 9 additions & 6 deletions internal/redis_lua/presence_get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
-- KEYS[1] - presence set key
-- KEYS[2] - presence hash key
-- ARGV[1] - current timestamp in seconds
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
-- ARGV[2] - use hash field TTL "0" or "1"
if ARGV[2] == '0' then
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
end
return redis.call("hgetall", KEYS[2])
9 changes: 7 additions & 2 deletions internal/redis_lua/presence_rem.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
-- ARGV[1] - client ID
-- ARGV[2] - user ID
-- ARGV[3] - enable user mapping "0" or "1"
-- ARGV[4] - use hash field TTL "0" or "1"

local clientExists = false
if ARGV[3] ~= '0' then
Expand All @@ -14,14 +15,18 @@ if ARGV[3] ~= '0' then
end

redis.call("hdel", KEYS[2], ARGV[1])
redis.call("zrem", KEYS[1], ARGV[1])
if ARGV[4] == '0' then
redis.call("zrem", KEYS[1], ARGV[1])
end

if ARGV[3] ~= '0' and clientExists then
local connectionsCount = redis.call("hincrby", KEYS[4], ARGV[2], -1)
-- If the number of connections for this user is zero, remove the user
-- from the sorted set and clean hash.
if connectionsCount <= 0 then
redis.call("zrem", KEYS[3], ARGV[2])
if ARGV[4] == '0' then
redis.call("zrem", KEYS[3], ARGV[2])
end
redis.call("hdel", KEYS[4], ARGV[2])
end
end
31 changes: 18 additions & 13 deletions internal/redis_lua/presence_stats_get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
-- KEYS[3] - per-user zset key
-- KEYS[4] - per-user hash key
-- ARGV[1] - current timestamp in seconds
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
-- ARGV[2] - use hash field TTL "0" or "1"
if ARGV[2] == '0' then
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
end

local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1])
if #userExpired > 0 then
for num = 1, #userExpired do
redis.call("hdel", KEYS[4], userExpired[num])
end
redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1])
if ARGV[2] == '0' then
local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1])
if #userExpired > 0 then
for num = 1, #userExpired do
redis.call("hdel", KEYS[4], userExpired[num])
end
redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1])
end
end

local clientCount = redis.call("hlen", KEYS[2])
local userCount = redis.call("hlen", KEYS[4])

return {clientCount, userCount}
return { clientCount, userCount }
Loading

0 comments on commit b0e09bf

Please sign in to comment.