Skip to content

Commit

Permalink
Redis reversed history fix when meta does not exist (#328)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 30, 2023
1 parent 763e1f0 commit 91778bc
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 144 deletions.
158 changes: 14 additions & 144 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync/atomic"
"time"

_ "embed"

"github.com/centrifugal/centrifuge/internal/convert"

"github.com/centrifugal/protocol"
Expand Down Expand Up @@ -219,150 +221,18 @@ func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) {
return b, nil
}

const (
// Add to history and optionally publish.
// KEYS[1] - history LIST key
// KEYS[2] - history meta HASH key
// ARGV[1] - message payload
// ARGV[2] - history size ltrim right bound
// ARGV[3] - history lifetime
// ARGV[4] - channel to publish message to if needed
// ARGV[5] - history meta key expiration time
// ARGV[6] - new epoch value if no epoch set yet
// ARGV[7] - command to publish (publish or spublish)
addHistoryListSource = `
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
local offset = redis.call("hincrby", KEYS[2], "s", 1)
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1]
redis.call("lpush", KEYS[1], payload)
redis.call("ltrim", KEYS[1], 0, ARGV[2])
redis.call("expire", KEYS[1], ARGV[3])
if ARGV[4] ~= '' then
redis.call(ARGV[7], ARGV[4], payload)
end
return {offset, epoch}
`

// addHistoryStreamSource contains a Lua script to save data to Redis stream and
// publish it into channel.
// KEYS[1] - history STREAM key
// KEYS[2] - stream meta HASH key
// ARGV[1] - message payload
// ARGV[2] - stream size
// ARGV[3] - stream lifetime
// ARGV[4] - channel to publish message to if needed
// ARGV[5] - history meta key expiration time
// ARGV[6] - new epoch value if no epoch set yet
// ARGV[7] - command to publish (publish or spublish)
addHistoryStreamSource = `
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
local offset = redis.call("hincrby", KEYS[2], "s", 1)
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
redis.call("xadd", KEYS[1], "MAXLEN", ARGV[2], offset, "d", ARGV[1])
redis.call("expire", KEYS[1], ARGV[3])
if ARGV[4] ~= '' then
local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1]
redis.call(ARGV[7], ARGV[4], payload)
end
return {offset, epoch}
`

// Retrieve channel history information from LIST.
// KEYS[1] - history LIST key
// KEYS[2] - list meta HASH key
// ARGV[1] - include publications into response
// ARGV[2] - publications list right bound
// ARGV[3] - list meta hash key expiration time
// ARGV[4] - new epoch value if no epoch set yet
historyListSource = `
local offset = redis.call("hget", KEYS[2], "s")
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[4]
redis.call("hset", KEYS[2], "e", epoch)
end
if ARGV[3] ~= '0' then
redis.call("expire", KEYS[2], ARGV[3])
end
local pubs = nil
if ARGV[1] ~= "0" then
pubs = redis.call("lrange", KEYS[1], 0, ARGV[2])
end
return {offset, epoch, pubs}
`

// Retrieve channel history information from STREAM.
// KEYS[1] - history STREAM key
// KEYS[2] - stream meta HASH key
// ARGV[1] - include publications into response
// ARGV[2] - offset
// ARGV[3] - limit
// ARGV[4] - reverse
// ARGV[5] - stream meta hash key expiration time
// ARGV[6] - new epoch value if no epoch set yet
historyStreamSource = `
local offset = redis.call("hget", KEYS[2], "s")
local epoch
if redis.call('exists', KEYS[2]) ~= 0 then
epoch = redis.call("hget", KEYS[2], "e")
end
if epoch == false or epoch == nil then
epoch = ARGV[6]
redis.call("hset", KEYS[2], "e", epoch)
end
if ARGV[5] ~= '0' then
redis.call("expire", KEYS[2], ARGV[5])
end
local pubs = nil
if ARGV[1] ~= "0" then
if ARGV[3] ~= "0" then
if ARGV[4] == '0' then
pubs = redis.call("xrange", KEYS[1], ARGV[2], "+", "COUNT", ARGV[3])
else
local getOffset = offset
local incomingOffset = tonumber(ARGV[2])
if incomingOffset ~= 0 then
getOffset = incomingOffset
end
pubs = redis.call("xrevrange", KEYS[1], getOffset, "-", "COUNT", ARGV[3])
end
else
if ARGV[4] == '0' then
pubs = redis.call("xrange", KEYS[1], ARGV[2], "+")
else
local getOffset = offset
local incomingOffset = tonumber(ARGV[2])
if incomingOffset ~= 0 then
getOffset = incomingOffset
end
pubs = redis.call("xrevrange", KEYS[1], getOffset, "-")
end
end
end
return {offset, epoch, pubs}
`
var (
//go:embed internal/redis_lua/broker_history_add_list.lua
addHistoryListSource string

//go:embed internal/redis_lua/broker_history_add_stream.lua
addHistoryStreamSource string

//go:embed internal/redis_lua/broker_history_list.lua
historyListSource string

//go:embed internal/redis_lua/broker_history_stream.lua
historyStreamSource string
)

func (b *RedisBroker) getShard(channel string) *shardWrapper {
Expand Down
32 changes: 32 additions & 0 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,38 @@ func TestRedisHistoryIteration(t *testing.T) {
}
}

func TestRedisHistoryReversedNoMetaYet(t *testing.T) {
for _, tt := range redisTests {
t.Run(tt.Name, func(t *testing.T) {
node := testNode(t)
broker := newTestRedisBroker(t, node, tt.UseStreams, tt.UseCluster)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(broker)
pubs, sp, err := broker.History(
randString(10),
HistoryOptions{
Filter: HistoryFilter{Limit: 10, Reverse: true},
MetaTTL: 24 * time.Hour,
},
)
require.NoError(t, err)
require.Equal(t, uint64(0), sp.Offset)
require.Len(t, pubs, 0)

pubs, sp, err = broker.History(
randString(10),
HistoryOptions{
Filter: HistoryFilter{Limit: -1, Reverse: true},
MetaTTL: 24 * time.Hour,
},
)
require.NoError(t, err)
require.Equal(t, uint64(0), sp.Offset)
require.Len(t, pubs, 0)
})
}
}

func TestRedisHistoryIterationReverse(t *testing.T) {
for _, tt := range redisTests {
t.Run(tt.Name, func(t *testing.T) {
Expand Down
32 changes: 32 additions & 0 deletions internal/redis_lua/broker_history_add_list.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
local list_key = KEYS[1]
local meta_key = KEYS[2]
local message_payload = ARGV[1]
local ltrim_right_bound = ARGV[2]
local list_ttl = ARGV[3]
local channel = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]
local publish_command = ARGV[7]

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
redis.call("lpush", list_key, payload)
redis.call("ltrim", list_key, 0, ltrim_right_bound)
redis.call("expire", list_key, list_ttl)

if channel ~= '' then
redis.call(publish_command, channel, payload)
end

return {top_offset, current_epoch}
31 changes: 31 additions & 0 deletions internal/redis_lua/broker_history_add_stream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
local stream_key = KEYS[1]
local meta_key = KEYS[2]
local message_payload = ARGV[1]
local stream_size = ARGV[2]
local stream_ttl = ARGV[3]
local channel = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]
local publish_command = ARGV[7]

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload)
redis.call("expire", stream_key, stream_ttl)

if channel ~= '' then
local payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
redis.call(publish_command, channel, payload)
end

return {top_offset, current_epoch}
26 changes: 26 additions & 0 deletions internal/redis_lua/broker_history_list.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
local list_key = KEYS[1]
local meta_key = KEYS[2]
local include_publications = ARGV[1]
local list_right_bound = ARGV[2]
local meta_expire = ARGV[3]
local new_epoch_if_empty = ARGV[4]

local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch, "s", "0")
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil
if include_publications ~= "0" then
pubs = redis.call("lrange", list_key, 0, list_right_bound)
end

return {top_offset, current_epoch, pubs}
49 changes: 49 additions & 0 deletions internal/redis_lua/broker_history_stream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
local stream_key = KEYS[1]
local meta_key = KEYS[2]
local include_publications = ARGV[1]
local since_offset = ARGV[2]
local limit = ARGV[3]
local reverse = ARGV[4]
local meta_expire = ARGV[5]
local new_epoch_if_empty = ARGV[6]

local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch, "s", "0")
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil

if include_publications ~= "0" then
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
end
else
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
end

return {top_offset, current_epoch, pubs}

0 comments on commit 91778bc

Please sign in to comment.