-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy patheventbus.go
241 lines (216 loc) · 6.23 KB
/
eventbus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package eventbus
import (
"reflect"
"sync"
)
// channel is a struct representing a topic and its associated handlers.
type channel struct {
sync.RWMutex
bufferSize int
topic string
channel chan any
handlers *CowMap
closed bool
stopCh chan any
}
// newChannel creates a new channel with a specified topic and buffer size.
// It initializes the handlers map with NewCowMap function and
// starts a goroutine c.loop() to continuously listen to messages in the channel.
func newChannel(topic string, bufferSize int) *channel {
var ch chan any
if bufferSize <= 0 {
ch = make(chan any)
} else {
ch = make(chan any, bufferSize)
}
c := &channel{
topic: topic,
bufferSize: bufferSize,
channel: ch,
handlers: NewCowMap(),
stopCh: make(chan any),
}
go c.loop()
return c
}
// transfer calls all the handlers in the channel with the given payload.
// It iterates over the handlers in the handlers map to call them with the payload.
func (c *channel) transfer(topic string, payload any) {
var payloadValue reflect.Value
topicValue := reflect.ValueOf(c.topic)
c.handlers.Range(func(key any, fn any) bool {
handler := fn.(*reflect.Value)
typ := handler.Type()
if payload == nil {
// If the parameter passed to the handler is nil,
// it initializes a new payload element based on the
// type of the second parameter of the handler using the reflect package.
payloadValue = reflect.New(typ.In(1)).Elem()
} else {
payloadValue = reflect.ValueOf(payload)
}
(*handler).Call([]reflect.Value{topicValue, payloadValue})
return true
})
}
// loop listens to the channel and calls handlers with payload.
// It receives messages from the channel and then iterates over the handlers
// in the handlers map to call them with the payload.
func (c *channel) loop() {
for {
select {
case payload := <-c.channel:
c.transfer(c.topic, payload)
case <-c.stopCh:
return
}
}
}
// subscribe add a handler to a channel, return error if the channel is closed.
func (c *channel) subscribe(handler any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
fn := reflect.ValueOf(handler)
c.handlers.Store(fn.Pointer(), &fn)
return nil
}
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (c *channel) publishSync(payload any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
c.transfer(c.topic, payload)
return nil
}
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
func (c *channel) publish(payload any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
c.channel <- payload
return nil
}
// unsubscribe removes handler defined for this channel.
func (c *channel) unsubscribe(handler any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
fn := reflect.ValueOf(handler)
c.handlers.Delete(fn.Pointer())
return nil
}
// close closes a channel
func (c *channel) close() {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
c.closed = true
c.stopCh <- struct{}{}
c.handlers.Clear()
close(c.channel)
}
// EventBus is a container for event topics.
// Each topic corresponds to a channel. `eventbus.Publish()` pushes a message to the channel,
// and the handler in `eventbus.Subscribe()` will process the message coming out of the channel.
type EventBus struct {
channels *CowMap
bufferSize int
once sync.Once
}
// NewBuffered returns new EventBus with a buffered channel.
// The second argument indicate the buffer's length
func NewBuffered(bufferSize int) *EventBus {
if bufferSize <= 0 {
bufferSize = 1
}
return &EventBus{
bufferSize: bufferSize,
channels: NewCowMap(),
}
}
// New returns new EventBus with empty handlers.
func New() *EventBus {
return &EventBus{
bufferSize: -1,
channels: NewCowMap(),
}
}
// Unsubscribe removes handler defined for a topic.
// Returns error if there are no handlers subscribed to the topic.
func (e *EventBus) Unsubscribe(topic string, handler any) error {
ch, ok := e.channels.Load(topic)
if !ok {
return ErrNoSubscriber
}
return ch.(*channel).unsubscribe(handler)
}
// Subscribe subscribes to a topic, return an error if the handler is not a function.
// The handler must have two parameters: the first parameter must be a string,
// and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`
func (e *EventBus) Subscribe(topic string, handler any) error {
typ := reflect.TypeOf(handler)
if typ.Kind() != reflect.Func {
return ErrHandlerIsNotFunc
}
if typ.NumIn() != 2 {
return ErrHandlerParamNum
}
if typ.In(0).Kind() != reflect.String {
return ErrHandlerFirstParam
}
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
}
return ch.(*channel).subscribe(handler)
}
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
func (e *EventBus) Publish(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
go ch.(*channel).loop()
}
return ch.(*channel).publish(payload)
}
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (e *EventBus) PublishSync(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
go ch.(*channel).loop()
}
return ch.(*channel).publishSync(payload)
}
// Close closes the eventbus
func (e *EventBus) Close() {
e.once.Do(func() {
e.channels.Range(func(key any, ch any) bool {
ch.(*channel).close()
return true
})
})
}