Skip to content

Commit

Permalink
tests for cache recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 10, 2024
1 parent acbb817 commit c0417e3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
38 changes: 26 additions & 12 deletions broker_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,19 +545,33 @@ type recoverTest struct {
Sleep int
Limit int
Recovered bool
RecoveryMode RecoveryMode
}

var clientRecoverTests = []recoverTest{
{"empty_stream", 10, 60, 0, 0, 0, 0, 0, true},
{"from_position", 10, 60, 10, 8, 2, 0, 0, true},
{"from_position_limited", 10, 60, 10, 5, 0, 0, 2, false},
{"from_position_with_server_limit", 10, 60, 10, 5, 0, 0, 1, false},
{"from_position_that_already_gone", 10, 60, 20, 8, 0, 0, 0, false},
{"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false},
{"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true},
{"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true},
{"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false},
{"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true},
{"empty_stream", 10, 60, 0, 0, 0, 0, 0, true, RecoveryModeStream},
{"from_position", 10, 60, 10, 8, 2, 0, 0, true, RecoveryModeStream},
{"from_position_limited", 10, 60, 10, 5, 0, 0, 2, false, RecoveryModeStream},
{"from_position_with_server_limit", 10, 60, 10, 5, 0, 0, 1, false, RecoveryModeStream},
{"from_position_that_already_gone", 10, 60, 20, 8, 0, 0, 0, false, RecoveryModeStream},
{"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false, RecoveryModeStream},
{"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeStream},
{"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true, RecoveryModeStream},
{"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeStream},
{"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream},
{"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream},

{"cache_empty_stream", 10, 60, 0, 0, 0, 0, 0, false, RecoveryModeCache},
{"cache_from_position", 10, 60, 10, 8, 1, 0, 0, true, RecoveryModeCache},
{"cache_from_position_limited", 10, 60, 10, 5, 1, 0, 2, true, RecoveryModeCache},
{"cache_from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, true, RecoveryModeCache},
{"cache_from_position_that_already_gone", 10, 60, 20, 8, 1, 0, 0, true, RecoveryModeCache},
{"cache_from_position_that_not_exist_yet", 10, 60, 20, 108, 1, 0, 0, true, RecoveryModeCache},
{"cache_same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeCache},
{"cache_empty_position_recover_expected", 10, 60, 4, 0, 1, 0, 0, true, RecoveryModeCache},
{"cache_from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeCache},
{"cache_from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeCache},
{"cache_from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeCache},
}

type recoverTestChannel struct {
Expand All @@ -576,7 +590,7 @@ func TestClientSubscribeRecover(t *testing.T) {
node.config.RecoveryMaxPublicationLimit = tt.Limit
node.OnConnect(func(client *Client) {
client.OnSubscribe(func(event SubscribeEvent, cb SubscribeCallback) {
opts := SubscribeOptions{EnableRecovery: true}
opts := SubscribeOptions{EnableRecovery: true, RecoveryMode: tt.RecoveryMode}
cb(SubscribeReply{Options: opts}, nil)
})
})
Expand Down Expand Up @@ -615,8 +629,8 @@ func TestClientSubscribeRecover(t *testing.T) {
require.Nil(t, disconnect)
require.Nil(t, rwWrapper.replies[0].Error)
res := extractSubscribeResult(rwWrapper.replies)
require.Equal(t, tt.NumRecovered, len(res.Publications))
require.Equal(t, tt.Recovered, res.Recovered)
require.Equal(t, tt.NumRecovered, len(res.Publications))
if len(res.Publications) > 1 {
require.True(t, res.Publications[0].Offset < res.Publications[1].Offset)
}
Expand Down
20 changes: 10 additions & 10 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,16 +1622,16 @@ func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bo
}

var brokerRecoverTests = []recoverTest{
{"empty_stream", 10, 60, 0, 0, 0, 0, 0, true},
{"from_position", 10, 60, 10, 8, 2, 0, 0, true},
{"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false},
{"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false},
{"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false},
{"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false},
{"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true},
{"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true},
{"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false},
{"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true},
{"empty_stream", 10, 60, 0, 0, 0, 0, 0, true, RecoveryModeStream},
{"from_position", 10, 60, 10, 8, 2, 0, 0, true, RecoveryModeStream},
{"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false, RecoveryModeStream},
{"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false, RecoveryModeStream},
{"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false, RecoveryModeStream},
{"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false, RecoveryModeStream},
{"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeStream},
{"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true, RecoveryModeStream},
{"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeStream},
{"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream},
}

func TestRedisClientSubscribeRecoverStreams(t *testing.T) {
Expand Down

0 comments on commit c0417e3

Please sign in to comment.