Skip to content

Commit 8bee712

Browse files
committed
Add locking
1 parent 21fe6fb commit 8bee712

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

client/bus/bus.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus {
8585
type subBus struct {
8686
bus ari.Bus
8787
subs []ari.Subscription
88+
mu sync.Mutex
8889
}
8990

9091
func (b *subBus) Close() {
@@ -97,7 +98,9 @@ func (b *subBus) Close() {
9798
// TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed.
9899
}
99100

101+
// Used as callback from stdbus
100102
func (b *subBus) Cancel(s ari.Subscription) {
103+
b.mu.Lock()
101104
for i, si := range b.subs {
102105
if s == si {
103106
b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end
@@ -106,6 +109,7 @@ func (b *subBus) Cancel(s ari.Subscription) {
106109
break
107110
}
108111
}
112+
b.mu.Unlock()
109113
}
110114

111115
func (b *subBus) Send(e ari.Event) {
@@ -115,7 +119,9 @@ func (b *subBus) Send(e ari.Event) {
115119
func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription {
116120
sub := b.bus.Subscribe(key, eTypes...)
117121
sub.SetCallback(b.Cancel)
122+
b.mu.Lock()
118123
b.subs = append(b.subs, sub)
124+
b.mu.Unlock()
119125

120126
return sub
121127
}

0 commit comments

Comments
 (0)