-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcast_test.go
131 lines (113 loc) · 2.75 KB
/
broadcast_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"sync"
"testing"
"time"
)
func TestBroadcaster(t *testing.T) {
t.Run("AddListener", func(t *testing.T) {
b := NewBroadcaster[int]()
ch, remove := b.AddListener()
if len(b.listeners) != 1 {
t.Errorf("Expected 1 listener, got %d", len(b.listeners))
}
remove()
if len(b.listeners) != 0 {
t.Errorf("Expected 0 listeners after removal, got %d", len(b.listeners))
}
_, ok := <-ch
if ok {
t.Error("Channel should be closed after removal")
}
})
t.Run("Broadcast", func(t *testing.T) {
b := NewBroadcaster[int]()
ch1, _ := b.AddListener()
ch2, _ := b.AddListener()
b.Broadcast(42)
for i, ch := range []<-chan int{ch1, ch2} {
select {
case msg := <-ch:
if msg != 42 {
t.Errorf("Listener %d: Expected 42, got %d", i, msg)
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for message on listener %d", i)
}
}
})
t.Run("ConcurrentOperations", func(t *testing.T) {
b := NewBroadcaster[int]()
var wg sync.WaitGroup
listenerCount := 100
messageCount := 100
// Add listeners concurrently
for i := 0; i < listenerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
b.AddListener()
}()
}
wg.Wait()
if len(b.listeners) != listenerCount {
t.Errorf("Expected %d listeners, got %d", listenerCount, len(b.listeners))
}
// Prepare a channel to signal completion of broadcasting
done := make(chan struct{})
// Broadcast messages concurrently
go func() {
var broadcastWg sync.WaitGroup
for i := 0; i < messageCount; i++ {
broadcastWg.Add(1)
go func(msg int) {
defer broadcastWg.Done()
b.Broadcast(msg)
}(i)
}
broadcastWg.Wait()
close(done)
}()
// Wait for broadcasting to complete or timeout
select {
case <-done:
// Broadcasting completed successfully
case <-time.After(time.Second):
t.Fatal("Broadcasting timed out after 1 second")
}
})
t.Run("RemoveListener", func(t *testing.T) {
b := NewBroadcaster[int]()
_, remove1 := b.AddListener()
ch2, _ := b.AddListener()
remove1()
b.Broadcast(42)
select {
case msg := <-ch2:
if msg != 42 {
t.Errorf("Expected 42, got %d on ch2", msg)
}
case <-time.After(time.Second):
t.Error("Timeout waiting for message on ch2")
}
})
t.Run("AllListenersReceiveMessage", func(t *testing.T) {
b := NewBroadcaster[int]()
listenerCount := 10
listeners := make([]<-chan int, listenerCount)
for i := 0; i < listenerCount; i++ {
listeners[i], _ = b.AddListener()
}
b.Broadcast(42)
for i, ch := range listeners {
select {
case msg := <-ch:
if msg != 42 {
t.Errorf("Listener %d: Expected 42, got %d", i, msg)
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for message on listener %d", i)
}
}
})
}