Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jul 11, 2024
1 parent 1b95e05 commit 2bc1734
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3804,11 +3804,20 @@ func asyncSubscribeClient(t testing.TB, client *Client, ch string) {
require.NoError(t, err)
}

// Not looking at unsubscribe result - just execute subscribe command.
func asyncUnsubscribeClient(t testing.TB, client *Client, ch string) {
rwWrapper := testReplyWriterWrapper()
err := client.handleUnsubscribe(&protocol.UnsubscribeRequest{
Channel: ch,
}, &protocol.Command{Id: 1}, time.Now(), rwWrapper.rw)
require.NoError(t, err)
}

func TestClientUnsubscribeDuringSubscribe(t *testing.T) {
t.Parallel()
node := defaultNodeNoHandlers()
subscribedCh := make(chan struct{}, 2)
unsubscribedCh := make(chan struct{}, 2)
subscribedCh := make(chan struct{}, 1)
unsubscribedCh := make(chan struct{}, 1)
node.OnConnect(func(client *Client) {
client.OnSubscribe(func(e SubscribeEvent, cb SubscribeCallback) {
go func() {
Expand All @@ -3828,16 +3837,12 @@ func TestClientUnsubscribeDuringSubscribe(t *testing.T) {
client := newTestClient(t, node, "42")
connectClientV2(t, client)
asyncSubscribeClient(t, client, "test")
client.Unsubscribe("test")
asyncUnsubscribeClient(t, client, "test")
client.mu.Lock()
_, ok := client.channels["test"]
client.mu.Unlock()
require.False(t, ok)
waitWithTimeout(t, unsubscribedCh)
asyncSubscribeClient(t, client, "test")
err := client.close(DisconnectForceNoReconnect)
waitWithTimeout(t, unsubscribedCh)
require.NoError(t, err)
}

func TestClientUnsubscribeDuringSubscribeWithError(t *testing.T) {
Expand All @@ -3862,7 +3867,7 @@ func TestClientUnsubscribeDuringSubscribeWithError(t *testing.T) {
client := newTestClient(t, node, "42")
connectClientV2(t, client)
asyncSubscribeClient(t, client, "test")
client.Unsubscribe("test")
asyncUnsubscribeClient(t, client, "test")
client.mu.Lock()
_, ok := client.channels["test"]
client.mu.Unlock()
Expand All @@ -3871,3 +3876,33 @@ func TestClientUnsubscribeDuringSubscribeWithError(t *testing.T) {
err := client.close(DisconnectForceNoReconnect)
require.NoError(t, err)
}

func TestClientUnsubscribeDuringSubscribeCorrectChannels(t *testing.T) {
t.Parallel()
node := defaultNodeNoHandlers()
subscribedCh := make(chan struct{})
node.OnConnect(func(client *Client) {
client.OnSubscribe(func(e SubscribeEvent, cb SubscribeCallback) {
go func() {
time.Sleep(1000 * time.Millisecond)
cb(SubscribeReply{}, nil)
close(subscribedCh)
}()
})
client.OnUnsubscribe(func(e UnsubscribeEvent) {
})
})
defer func() { _ = node.Shutdown(context.Background()) }()
client := newTestClient(t, node, "42")
connectClientV2(t, client)
asyncSubscribeClient(t, client, "test")
asyncUnsubscribeClient(t, client, "test")
client.mu.Lock()
_, ok := client.channels["test"]
client.mu.Unlock()
require.False(t, ok)
<-subscribedCh
require.Equal(t, 0, node.Hub().NumChannels())
err := client.close(DisconnectForceNoReconnect)
require.NoError(t, err)
}

0 comments on commit 2bc1734

Please sign in to comment.