Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis reversed history fix when meta does not exist #328

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 14 additions & 144 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync/atomic"
"time"

_ "embed"

"github.com/centrifugal/centrifuge/internal/convert"

"github.com/centrifugal/protocol"
Expand Down Expand Up @@ -219,150 +221,18 @@ func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) {
return b, nil
}

const (
// Add to history and optionally publish.
// KEYS[1] - history LIST key
// KEYS[2] - history meta HASH key
// ARGV[1] - message payload
// ARGV[2] - history size ltrim right bound
// ARGV[3] - history lifetime
// ARGV[4] - channel to publish message to if needed
// ARGV[5] - history meta key expiration time
// ARGV[6] - new epoch value if no epoch set yet
// ARGV[7] - command to publish (publish or spublish)
addHistoryListSource = `
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
local offset = redis.call("hincrby", KEYS[2], "s", 1)
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1]
redis.call("lpush", KEYS[1], payload)
redis.call("ltrim", KEYS[1], 0, ARGV[2])
redis.call("expire", KEYS[1], ARGV[3])
if ARGV[4] ~= '' then
redis.call(ARGV[7], ARGV[4], payload)
end
return {offset, epoch}
`

// addHistoryStreamSource contains a Lua script to save data to Redis stream and
// publish it into channel.
// KEYS[1] - history STREAM key
// KEYS[2] - stream meta HASH key
// ARGV[1] - message payload
// ARGV[2] - stream size
// ARGV[3] - stream lifetime
// ARGV[4] - channel to publish message to if needed
// ARGV[5] - history meta key expiration time
// ARGV[6] - new epoch value if no epoch set yet
// ARGV[7] - command to publish (publish or spublish)
addHistoryStreamSource = `
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
local offset = redis.call("hincrby", KEYS[2], "s", 1)
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
redis.call("xadd", KEYS[1], "MAXLEN", ARGV[2], offset, "d", ARGV[1])
redis.call("expire", KEYS[1], ARGV[3])
if ARGV[4] ~= '' then
local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1]
redis.call(ARGV[7], ARGV[4], payload)
end
return {offset, epoch}
`

// Retrieve channel history information from LIST.
// KEYS[1] - history LIST key
// KEYS[2] - list meta HASH key
// ARGV[1] - include publications into response
// ARGV[2] - publications list right bound
// ARGV[3] - list meta hash key expiration time
// ARGV[4] - new epoch value if no epoch set yet
historyListSource = `
local offset = redis.call("hget", KEYS[2], "s")
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[4]
redis.call("hset", KEYS[2], "e", epoch)
end
if ARGV[3] ~= '0' then
redis.call("expire", KEYS[2], ARGV[3])
end
local pubs = nil
if ARGV[1] ~= "0" then
pubs = redis.call("lrange", KEYS[1], 0, ARGV[2])
end
return {offset, epoch, pubs}
`

// Retrieve channel history information from STREAM.
// KEYS[1] - history STREAM key
// KEYS[2] - stream meta HASH key
// ARGV[1] - include publications into response
// ARGV[2] - offset
// ARGV[3] - limit
// ARGV[4] - reverse
// ARGV[5] - stream meta hash key expiration time
// ARGV[6] - new epoch value if no epoch set yet
historyStreamSource = `
local offset = redis.call("hget", KEYS[2], "s")
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
local pubs = nil
if ARGV[1] ~= "0" then
if ARGV[3] ~= "0" then
if ARGV[4] == '0' then
pubs = redis.call("xrange", KEYS[1], ARGV[2], "+", "COUNT", ARGV[3])
else
local getOffset = offset
local incomingOffset = tonumber(ARGV[2])
if incomingOffset ~= 0 then
getOffset = incomingOffset
end
pubs = redis.call("xrevrange", KEYS[1], getOffset, "-", "COUNT", ARGV[3])
end
else
if ARGV[4] == '0' then
pubs = redis.call("xrange", KEYS[1], ARGV[2], "+")
else
local getOffset = offset
local incomingOffset = tonumber(ARGV[2])
if incomingOffset ~= 0 then
getOffset = incomingOffset
end
pubs = redis.call("xrevrange", KEYS[1], getOffset, "-")
end
end
end
return {offset, epoch, pubs}
`
var (
//go:embed internal/redis_lua/broker_history_add_list.lua
addHistoryListSource string

//go:embed internal/redis_lua/broker_history_add_stream.lua
addHistoryStreamSource string

//go:embed internal/redis_lua/broker_history_list.lua
historyListSource string

//go:embed internal/redis_lua/broker_history_stream.lua
historyStreamSource string
)

func (b *RedisBroker) getShard(channel string) *shardWrapper {
Expand Down
32 changes: 32 additions & 0 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,38 @@ func TestRedisHistoryIteration(t *testing.T) {
}
}

func TestRedisHistoryReversedNoMetaYet(t *testing.T) {
for _, tt := range redisTests {
t.Run(tt.Name, func(t *testing.T) {
node := testNode(t)
broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(broker)
pubs, sp, err := broker.History(
randString(10),
HistoryOptions{
Filter: HistoryFilter{Limit: 10, Reverse: true},
MetaTTL: 24 * time.Hour,
},
)
require.NoError(t, err)
require.Equal(t, uint64(0), sp.Offset)
require.Len(t, pubs, 0)

pubs, sp, err = broker.History(
randString(10),
HistoryOptions{
Filter: HistoryFilter{Limit: -1, Reverse: true},
MetaTTL: 24 * time.Hour,
},
)
require.NoError(t, err)
require.Equal(t, uint64(0), sp.Offset)
require.Len(t, pubs, 0)
})
}
}

func TestRedisHistoryIterationReverse(t *testing.T) {
for _, tt := range redisTests {
t.Run(tt.Name, func(t *testing.T) {
Expand Down
32 changes: 32 additions & 0 deletions internal/redis_lua/broker_history_add_list.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
local list_key = KEYS[1]
local meta_key = KEYS[2]
local message_payload = ARGV[1]
local ltrim_right_bound = ARGV[2]
local list_ttl = ARGV[3]
local channel = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]
local publish_command = ARGV[7]

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
redis.call("lpush", list_key, payload)
redis.call("ltrim", list_key, 0, ltrim_right_bound)
redis.call("expire", list_key, list_ttl)

if channel ~= '' then
redis.call(publish_command, channel, payload)
end

return {top_offset, current_epoch}
31 changes: 31 additions & 0 deletions internal/redis_lua/broker_history_add_stream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
local stream_key = KEYS[1]
local meta_key = KEYS[2]
local message_payload = ARGV[1]
local stream_size = ARGV[2]
local stream_ttl = ARGV[3]
local channel = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]
local publish_command = ARGV[7]

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload)
redis.call("expire", stream_key, stream_ttl)

if channel ~= '' then
local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
redis.call(publish_command, channel, payload)
end

return {top_offset, current_epoch}
26 changes: 26 additions & 0 deletions internal/redis_lua/broker_history_list.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
local list_key = KEYS[1]
local meta_key = KEYS[2]
local include_publications = ARGV[1]
local list_right_bound = ARGV[2]
local meta_expire = ARGV[3]
local new_epoch_if_empty = ARGV[4]

local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch, "s", "0")
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil
if include_publications ~= "0" then
pubs = redis.call("lrange", list_key, 0, list_right_bound)
end

return {top_offset, current_epoch, pubs}
49 changes: 49 additions & 0 deletions internal/redis_lua/broker_history_stream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
local stream_key = KEYS[1]
local meta_key = KEYS[2]
local include_publications = ARGV[1]
local since_offset = ARGV[2]
local limit = ARGV[3]
local reverse = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]

local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch, "s", "0")
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil

if include_publications ~= "0" then
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
end
else
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
end

return {top_offset, current_epoch, pubs}
Loading