diff --git a/broker_redis.go b/broker_redis.go index 754ade4d..3e939eb2 100644 --- a/broker_redis.go +++ b/broker_redis.go @@ -12,6 +12,8 @@ import ( "sync/atomic" "time" + _ "embed" + "github.com/centrifugal/centrifuge/internal/convert" "github.com/centrifugal/protocol" @@ -219,150 +221,18 @@ func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) { return b, nil } -const ( - // Add to history and optionally publish. - // KEYS[1] - history LIST key - // KEYS[2] - history meta HASH key - // ARGV[1] - message payload - // ARGV[2] - history size ltrim right bound - // ARGV[3] - history lifetime - // ARGV[4] - channel to publish message to if needed - // ARGV[5] - history meta key expiration time - // ARGV[6] - new epoch value if no epoch set yet - // ARGV[7] - command to publish (publish or spublish) - addHistoryListSource = ` -local epoch -if redis.call('exists', KEYS[2]) ~= 0 then - epoch = redis.call("hget", KEYS[2], "e") -end -if epoch == false or epoch == nil then - epoch = ARGV[6] - redis.call("hset", KEYS[2], "e", epoch) -end -local offset = redis.call("hincrby", KEYS[2], "s", 1) -if ARGV[5] ~= '0' then - redis.call("expire", KEYS[2], ARGV[5]) -end -local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1] -redis.call("lpush", KEYS[1], payload) -redis.call("ltrim", KEYS[1], 0, ARGV[2]) -redis.call("expire", KEYS[1], ARGV[3]) -if ARGV[4] ~= '' then - redis.call(ARGV[7], ARGV[4], payload) -end -return {offset, epoch} - ` - - // addHistoryStreamSource contains a Lua script to save data to Redis stream and - // publish it into channel. - // KEYS[1] - history STREAM key - // KEYS[2] - stream meta HASH key - // ARGV[1] - message payload - // ARGV[2] - stream size - // ARGV[3] - stream lifetime - // ARGV[4] - channel to publish message to if needed - // ARGV[5] - history meta key expiration time - // ARGV[6] - new epoch value if no epoch set yet - // ARGV[7] - command to publish (publish or spublish) - addHistoryStreamSource = ` -local epoch -if redis.call('exists', KEYS[2]) ~= 0 then - epoch = redis.call("hget", KEYS[2], "e") -end -if epoch == false or epoch == nil then - epoch = ARGV[6] - redis.call("hset", KEYS[2], "e", epoch) -end -local offset = redis.call("hincrby", KEYS[2], "s", 1) -if ARGV[5] ~= '0' then - redis.call("expire", KEYS[2], ARGV[5]) -end -redis.call("xadd", KEYS[1], "MAXLEN", ARGV[2], offset, "d", ARGV[1]) -redis.call("expire", KEYS[1], ARGV[3]) -if ARGV[4] ~= '' then - local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1] - redis.call(ARGV[7], ARGV[4], payload) -end -return {offset, epoch} - ` - - // Retrieve channel history information from LIST. - // KEYS[1] - history LIST key - // KEYS[2] - list meta HASH key - // ARGV[1] - include publications into response - // ARGV[2] - publications list right bound - // ARGV[3] - list meta hash key expiration time - // ARGV[4] - new epoch value if no epoch set yet - historyListSource = ` -local offset = redis.call("hget", KEYS[2], "s") -local epoch -if redis.call('exists', KEYS[2]) ~= 0 then - epoch = redis.call("hget", KEYS[2], "e") -end -if epoch == false or epoch == nil then - epoch = ARGV[4] - redis.call("hset", KEYS[2], "e", epoch) -end -if ARGV[3] ~= '0' then - redis.call("expire", KEYS[2], ARGV[3]) -end -local pubs = nil -if ARGV[1] ~= "0" then - pubs = redis.call("lrange", KEYS[1], 0, ARGV[2]) -end -return {offset, epoch, pubs} - ` - - // Retrieve channel history information from STREAM. - // KEYS[1] - history STREAM key - // KEYS[2] - stream meta HASH key - // ARGV[1] - include publications into response - // ARGV[2] - offset - // ARGV[3] - limit - // ARGV[4] - reverse - // ARGV[5] - stream meta hash key expiration time - // ARGV[6] - new epoch value if no epoch set yet - historyStreamSource = ` -local offset = redis.call("hget", KEYS[2], "s") -local epoch -if redis.call('exists', KEYS[2]) ~= 0 then - epoch = redis.call("hget", KEYS[2], "e") -end -if epoch == false or epoch == nil then - epoch = ARGV[6] - redis.call("hset", KEYS[2], "e", epoch) -end -if ARGV[5] ~= '0' then - redis.call("expire", KEYS[2], ARGV[5]) -end -local pubs = nil -if ARGV[1] ~= "0" then - if ARGV[3] ~= "0" then - if ARGV[4] == '0' then - pubs = redis.call("xrange", KEYS[1], ARGV[2], "+", "COUNT", ARGV[3]) - else - local getOffset = offset - local incomingOffset = tonumber(ARGV[2]) - if incomingOffset ~= 0 then - getOffset = incomingOffset - end - pubs = redis.call("xrevrange", KEYS[1], getOffset, "-", "COUNT", ARGV[3]) - end - else - if ARGV[4] == '0' then - pubs = redis.call("xrange", KEYS[1], ARGV[2], "+") - else - local getOffset = offset - local incomingOffset = tonumber(ARGV[2]) - if incomingOffset ~= 0 then - getOffset = incomingOffset - end - pubs = redis.call("xrevrange", KEYS[1], getOffset, "-") - end - end -end -return {offset, epoch, pubs} - ` +var ( + //go:embed internal/redis_lua/broker_history_add_list.lua + addHistoryListSource string + + //go:embed internal/redis_lua/broker_history_add_stream.lua + addHistoryStreamSource string + + //go:embed internal/redis_lua/broker_history_list.lua + historyListSource string + + //go:embed internal/redis_lua/broker_history_stream.lua + historyStreamSource string ) func (b *RedisBroker) getShard(channel string) *shardWrapper { diff --git a/broker_redis_test.go b/broker_redis_test.go index cb48d931..9370cccc 100644 --- a/broker_redis_test.go +++ b/broker_redis_test.go @@ -1522,6 +1522,38 @@ func TestRedisHistoryIteration(t *testing.T) { } } +func TestRedisHistoryReversedNoMetaYet(t *testing.T) { + for _, tt := range redisTests { + t.Run(tt.Name, func(t *testing.T) { + node := testNode(t) + broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisBroker(broker) + pubs, sp, err := broker.History( + randString(10), + HistoryOptions{ + Filter: HistoryFilter{Limit: 10, Reverse: true}, + MetaTTL: 24 * time.Hour, + }, + ) + require.NoError(t, err) + require.Equal(t, uint64(0), sp.Offset) + require.Len(t, pubs, 0) + + pubs, sp, err = broker.History( + randString(10), + HistoryOptions{ + Filter: HistoryFilter{Limit: -1, Reverse: true}, + MetaTTL: 24 * time.Hour, + }, + ) + require.NoError(t, err) + require.Equal(t, uint64(0), sp.Offset) + require.Len(t, pubs, 0) + }) + } +} + func TestRedisHistoryIterationReverse(t *testing.T) { for _, tt := range redisTests { t.Run(tt.Name, func(t *testing.T) { diff --git a/internal/redis_lua/broker_history_add_list.lua b/internal/redis_lua/broker_history_add_list.lua new file mode 100644 index 00000000..a4959a4f --- /dev/null +++ b/internal/redis_lua/broker_history_add_list.lua @@ -0,0 +1,32 @@ +local list_key = KEYS[1] +local meta_key = KEYS[2] +local message_payload = ARGV[1] +local ltrim_right_bound = ARGV[2] +local list_ttl = ARGV[3] +local channel = ARGV[4] +local meta_expire = ARGV[5] +local new_epoch_if_empty = ARGV[6] +local publish_command = ARGV[7] + +local current_epoch = redis.call("hget", meta_key, "e") +if current_epoch == false then + current_epoch = new_epoch_if_empty + redis.call("hset", meta_key, "e", current_epoch) +end + +local top_offset = redis.call("hincrby", meta_key, "s", 1) + +if meta_expire ~= '0' then + redis.call("expire", meta_key, meta_expire) +end + +local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload +redis.call("lpush", list_key, payload) +redis.call("ltrim", list_key, 0, ltrim_right_bound) +redis.call("expire", list_key, list_ttl) + +if channel ~= '' then + redis.call(publish_command, channel, payload) +end + +return {top_offset, current_epoch} diff --git a/internal/redis_lua/broker_history_add_stream.lua b/internal/redis_lua/broker_history_add_stream.lua new file mode 100644 index 00000000..482707f2 --- /dev/null +++ b/internal/redis_lua/broker_history_add_stream.lua @@ -0,0 +1,31 @@ +local stream_key = KEYS[1] +local meta_key = KEYS[2] +local message_payload = ARGV[1] +local stream_size = ARGV[2] +local stream_ttl = ARGV[3] +local channel = ARGV[4] +local meta_expire = ARGV[5] +local new_epoch_if_empty = ARGV[6] +local publish_command = ARGV[7] + +local current_epoch = redis.call("hget", meta_key, "e") +if current_epoch == false then + current_epoch = new_epoch_if_empty + redis.call("hset", meta_key, "e", current_epoch) +end + +local top_offset = redis.call("hincrby", meta_key, "s", 1) + +if meta_expire ~= '0' then + redis.call("expire", meta_key, meta_expire) +end + +redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload) +redis.call("expire", stream_key, stream_ttl) + +if channel ~= '' then + local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload + redis.call(publish_command, channel, payload) +end + +return {top_offset, current_epoch} diff --git a/internal/redis_lua/broker_history_list.lua b/internal/redis_lua/broker_history_list.lua new file mode 100644 index 00000000..f9daee15 --- /dev/null +++ b/internal/redis_lua/broker_history_list.lua @@ -0,0 +1,26 @@ +local list_key = KEYS[1] +local meta_key = KEYS[2] +local include_publications = ARGV[1] +local list_right_bound = ARGV[2] +local meta_expire = ARGV[3] +local new_epoch_if_empty = ARGV[4] + +local stream_meta = redis.call("hmget", meta_key, "e", "s") +local current_epoch, top_offset = stream_meta[1], stream_meta[2] + +if current_epoch == false then + current_epoch = new_epoch_if_empty + top_offset = 0 + redis.call("hset", meta_key, "e", current_epoch, "s", "0") +end + +if meta_expire ~= '0' then + redis.call("expire", meta_key, meta_expire) +end + +local pubs = nil +if include_publications ~= "0" then + pubs = redis.call("lrange", list_key, 0, list_right_bound) +end + +return {top_offset, current_epoch, pubs} diff --git a/internal/redis_lua/broker_history_stream.lua b/internal/redis_lua/broker_history_stream.lua new file mode 100644 index 00000000..479bcdec --- /dev/null +++ b/internal/redis_lua/broker_history_stream.lua @@ -0,0 +1,49 @@ +local stream_key = KEYS[1] +local meta_key = KEYS[2] +local include_publications = ARGV[1] +local since_offset = ARGV[2] +local limit = ARGV[3] +local reverse = ARGV[4] +local meta_expire = ARGV[5] +local new_epoch_if_empty = ARGV[6] + +local stream_meta = redis.call("hmget", meta_key, "e", "s") +local current_epoch, top_offset = stream_meta[1], stream_meta[2] + +if current_epoch == false then + current_epoch = new_epoch_if_empty + top_offset = 0 + redis.call("hset", meta_key, "e", current_epoch, "s", "0") +end + +if meta_expire ~= '0' then + redis.call("expire", meta_key, meta_expire) +end + +local pubs = nil + +if include_publications ~= "0" then + if limit ~= "0" then + if reverse == "0" then + pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit) + else + local get_offset = top_offset + if since_offset ~= "0" then + get_offset = since_offset + end + pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit) + end + else + if reverse == "0" then + pubs = redis.call("xrange", stream_key, since_offset, "+") + else + local get_offset = top_offset + if since_offset ~= "0" then + get_offset = since_offset + end + pubs = redis.call("xrevrange", stream_key, get_offset, "-") + end + end +end + +return {top_offset, current_epoch, pubs}