diff --git a/broker_memory_test.go b/broker_memory_test.go index 8e1ca4d7..ba4361eb 100644 --- a/broker_memory_test.go +++ b/broker_memory_test.go @@ -545,19 +545,33 @@ type recoverTest struct { Sleep int Limit int Recovered bool + RecoveryMode RecoveryMode } var clientRecoverTests = []recoverTest{ - {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true}, - {"from_position", 10, 60, 10, 8, 2, 0, 0, true}, - {"from_position_limited", 10, 60, 10, 5, 0, 0, 2, false}, - {"from_position_with_server_limit", 10, 60, 10, 5, 0, 0, 1, false}, - {"from_position_that_already_gone", 10, 60, 20, 8, 0, 0, 0, false}, - {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false}, - {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true}, - {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true}, - {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false}, - {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true}, + {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true, RecoveryModeStream}, + {"from_position", 10, 60, 10, 8, 2, 0, 0, true, RecoveryModeStream}, + {"from_position_limited", 10, 60, 10, 5, 0, 0, 2, false, RecoveryModeStream}, + {"from_position_with_server_limit", 10, 60, 10, 5, 0, 0, 1, false, RecoveryModeStream}, + {"from_position_that_already_gone", 10, 60, 20, 8, 0, 0, 0, false, RecoveryModeStream}, + {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false, RecoveryModeStream}, + {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeStream}, + {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true, RecoveryModeStream}, + {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeStream}, + {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream}, + {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream}, + + {"cache_empty_stream", 10, 60, 0, 0, 0, 0, 0, false, RecoveryModeCache}, + {"cache_from_position", 10, 60, 10, 8, 1, 0, 0, true, RecoveryModeCache}, + {"cache_from_position_limited", 10, 60, 10, 5, 1, 0, 2, true, RecoveryModeCache}, + {"cache_from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, true, RecoveryModeCache}, + {"cache_from_position_that_already_gone", 10, 60, 20, 8, 1, 0, 0, true, RecoveryModeCache}, + {"cache_from_position_that_not_exist_yet", 10, 60, 20, 108, 1, 0, 0, true, RecoveryModeCache}, + {"cache_same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeCache}, + {"cache_empty_position_recover_expected", 10, 60, 4, 0, 1, 0, 0, true, RecoveryModeCache}, + {"cache_from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeCache}, + {"cache_from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeCache}, + {"cache_from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeCache}, } type recoverTestChannel struct { @@ -576,7 +590,7 @@ func TestClientSubscribeRecover(t *testing.T) { node.config.RecoveryMaxPublicationLimit = tt.Limit node.OnConnect(func(client *Client) { client.OnSubscribe(func(event SubscribeEvent, cb SubscribeCallback) { - opts := SubscribeOptions{EnableRecovery: true} + opts := SubscribeOptions{EnableRecovery: true, RecoveryMode: tt.RecoveryMode} cb(SubscribeReply{Options: opts}, nil) }) }) @@ -615,8 +629,8 @@ func TestClientSubscribeRecover(t *testing.T) { require.Nil(t, disconnect) require.Nil(t, rwWrapper.replies[0].Error) res := extractSubscribeResult(rwWrapper.replies) - require.Equal(t, tt.NumRecovered, len(res.Publications)) require.Equal(t, tt.Recovered, res.Recovered) + require.Equal(t, tt.NumRecovered, len(res.Publications)) if len(res.Publications) > 1 { require.True(t, res.Publications[0].Offset < res.Publications[1].Offset) } diff --git a/broker_redis_test.go b/broker_redis_test.go index ca34cff4..92cc845e 100644 --- a/broker_redis_test.go +++ b/broker_redis_test.go @@ -1622,16 +1622,16 @@ func testRedisClientSubscribeRecover(t *testing.T, tt recoverTest, useStreams bo } var brokerRecoverTests = []recoverTest{ - {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true}, - {"from_position", 10, 60, 10, 8, 2, 0, 0, true}, - {"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false}, - {"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false}, - {"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false}, - {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false}, - {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true}, - {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true}, - {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false}, - {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true}, + {"empty_stream", 10, 60, 0, 0, 0, 0, 0, true, RecoveryModeStream}, + {"from_position", 10, 60, 10, 8, 2, 0, 0, true, RecoveryModeStream}, + {"from_position_limited", 10, 60, 10, 5, 2, 0, 2, false, RecoveryModeStream}, + {"from_position_with_server_limit", 10, 60, 10, 5, 1, 0, 1, false, RecoveryModeStream}, + {"from_position_that_already_gone", 10, 60, 20, 8, 10, 0, 0, false, RecoveryModeStream}, + {"from_position_that_not_exist_yet", 10, 60, 20, 108, 0, 0, 0, false, RecoveryModeStream}, + {"same_position_no_pubs_expected", 10, 60, 7, 7, 0, 0, 0, true, RecoveryModeStream}, + {"empty_position_recover_expected", 10, 60, 4, 0, 4, 0, 0, true, RecoveryModeStream}, + {"from_position_in_expired_stream", 10, 1, 10, 8, 0, 3, 0, false, RecoveryModeStream}, + {"from_same_position_in_expired_stream", 10, 1, 1, 1, 0, 3, 0, true, RecoveryModeStream}, } func TestRedisClientSubscribeRecoverStreams(t *testing.T) {