Skip to content

Commit cbe55e4

Browse files
committed
FIX: Listener shutdown testing.
1 parent aac2c8d commit cbe55e4

File tree

1 file changed

+31
-30
lines changed

1 file changed

+31
-30
lines changed

pubsub_test.go

+31-30
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ type listenerCall struct {
1717
err error
1818
}
1919

20+
func testListenerClose(t *testing.T, l *Listener, calls <-chan *listenerCall) {
21+
if err := l.Close(); err != nil {
22+
t.Error("Listener Close error:", err)
23+
}
24+
for call := range calls {
25+
if call.err != nil {
26+
t.Error("unexpected Listener error:", call.err)
27+
} else {
28+
t.Errorf("unexpected message %q on %q", call.message, call.channel)
29+
}
30+
}
31+
}
32+
2033
// newTestListener closes the channel upon ErrClosed, or test-time-out.
2134
func newTestListener(t *testing.T) (*Listener, <-chan *listenerCall) {
2235
record := make(chan *listenerCall, 99)
@@ -43,32 +56,19 @@ func newTestListener(t *testing.T) (*Listener, <-chan *listenerCall) {
4356
for {
4457
select {
4558
case <-timeout.C:
46-
t.Fatal("Listener recording time-out")
59+
t.Error("Listener recording time-out")
60+
return
61+
4762
case call := <-record:
4863
if call.err == ErrClosed {
49-
t.Log("Listener recording stop on ErrClosed")
50-
go func() {
51-
for call := range record {
52-
if call.err != nil {
53-
t.Error("Listener error after ErrClosed:", call.err)
54-
} else {
55-
t.Errorf("Listener message %q on %q after ErrClosed", call.message, call.channel)
56-
}
57-
}
58-
}()
64+
time.Sleep(10 * time.Millisecond)
65+
if len(record) != 0 {
66+
t.Errorf("got %d Listener invocations after ErrClosed", len(record))
67+
}
5968
return
6069
}
6170

62-
select {
63-
case <-timeout.C:
64-
if call.err != nil {
65-
t.Fatal("Unwanted Listener error:", call.err)
66-
} else {
67-
t.Fatalf("Unwanted Listener message %q on %q", call.message, call.channel)
68-
}
69-
case out <- call:
70-
break
71-
}
71+
out <- call
7272
}
7373
}
7474
}()
@@ -111,7 +111,7 @@ func TestSubscribe(t *testing.T) {
111111
}()
112112

113113
l, calls := newTestListener(t)
114-
defer l.Close()
114+
defer testListenerClose(t, l, calls)
115115

116116
l.SUBSCRIBE(channel)
117117
call1 := <-calls
@@ -131,8 +131,8 @@ func TestSubscribe(t *testing.T) {
131131
func TestUnsubscribe(t *testing.T) {
132132
t.Parallel()
133133

134-
l, _ := newTestListener(t)
135-
defer l.Close()
134+
l, calls := newTestListener(t)
135+
defer testListenerClose(t, l, calls)
136136

137137
channel := randomKey("channel")
138138
l.SUBSCRIBE(channel)
@@ -153,8 +153,8 @@ func TestUnsubscribe(t *testing.T) {
153153
func TestUnsubscribeRace(t *testing.T) {
154154
t.Parallel()
155155

156-
l, _ := newTestListener(t)
157-
defer l.Close()
156+
l, calls := newTestListener(t)
157+
defer testListenerClose(t, l, calls)
158158

159159
channel := randomKey("channel")
160160
l.SUBSCRIBE(channel)
@@ -174,8 +174,8 @@ func TestUnsubscribeRace(t *testing.T) {
174174
func TestSubscriptionConcurrency(t *testing.T) {
175175
t.Parallel()
176176

177-
l, _ := newTestListener(t)
178-
defer l.Close()
177+
l, calls := newTestListener(t)
178+
defer testListenerClose(t, l, calls)
179179

180180
channels := make([]string, 9)
181181
for i := range channels {
@@ -204,7 +204,8 @@ func TestSubscriptionConcurrency(t *testing.T) {
204204
func TestListenerClose(t *testing.T) {
205205
t.Parallel()
206206

207-
l, _ := newTestListener(t)
207+
l, calls := newTestListener(t)
208+
defer testListenerClose(t, l, calls)
208209

209210
channel1 := randomKey("channel")
210211
channel2 := randomKey("channel")
@@ -238,7 +239,7 @@ func TestListenerBufferLimit(t *testing.T) {
238239
t.Parallel()
239240

240241
l, calls := newTestListener(t)
241-
defer l.Close()
242+
defer testListenerClose(t, l, calls)
242243

243244
channel := randomKey("channel")
244245
l.SUBSCRIBE(channel)

0 commit comments

Comments
 (0)