From 47c5cef177a232d37368a02b68c9a0f18ba74dd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Wed, 18 Mar 2020 15:46:00 -0400 Subject: [PATCH 1/6] Rewrite proxy client bus Previously, we created a new NATS subscription for each ARI subscription, which is inefficient for large numbers of similar subscriptions. (See Issue #26) With this change, we now retain a single NATS subscription for any number of ari subscriptions for which the same NATS subscription is sufficient, regardless of where in the tree of ARI clients it may be. To do this, we create a root-level (core) back-to-back event bus, with the NATS subscription being bound to a stdbus ARI bus. All ari-proxy clients subtend a new SubBus, which forwards all requests on to the core ARI bus, registering them along the way. When the SubBus is closed, the bound subscriptions will be closed, but the root-level bus itself will stay in existence. There is a remaining issue wherein long-running root-level ari-proxy clients will accumulate NATS subscriptions and not have them terminate even after all SubBuses are closed. This will need to be fixed in a later patch, but it should not affect most uses, for now. Fixes #26 --- client/bus/bus.go | 210 +++++++++++++++++++++-------------------- client/bus/bus_test.go | 52 ---------- client/client.go | 14 ++- 3 files changed, 117 insertions(+), 159 deletions(-) delete mode 100644 client/bus/bus_test.go diff --git a/client/bus/bus.go b/client/bus/bus.go index 677f5be..cbab820 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -5,14 +5,59 @@ import ( "sync" "github.com/CyCoreSystems/ari/v5" + "github.com/CyCoreSystems/ari/v5/stdbus" "github.com/inconshreveable/log15" + "github.com/pkg/errors" "github.com/nats-io/nats.go" ) -// EventChanBufferLength is the number of unhandled events which can be queued -// to the event channel buffer before further events are lost. -var EventChanBufferLength = 10 +// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus +type busWrapper struct { + subject string + + log log15.Logger + + sub *nats.Subscription + + bus ari.Bus +} + +func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) { + var err error + + w := &busWrapper{ + subject: subject, + log: log, + bus: stdbus.New(), + } + + w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) { + w.receive(m) + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to subscribe to NATS subject %s", subject) + } + + return w, nil +} + +func (w *busWrapper) receive(o *nats.Msg) { + e, err := ari.DecodeEvent(o.Data) + if err != nil { + w.log.Error("failed to convert received message to ari.Event", "error", err) + return + } + + w.bus.Send(e) +} + +func (w *busWrapper) Close() { + if err := w.sub.Unsubscribe(); err != nil { + w.log.Error("failed to unsubscribe when closing NATS subscription:", err) + } + w.bus.Close() +} // Bus provides an ari.Bus interface to NATS type Bus struct { @@ -21,14 +66,53 @@ type Bus struct { log log15.Logger nc *nats.EncodedConn + + subjectBuses map[string]*busWrapper + + mu sync.RWMutex } // New returns a new Bus func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { return &Bus{ - prefix: prefix, - log: log, - nc: nc, + prefix: prefix, + log: log, + nc: nc, + subjectBuses: make(map[string]*busWrapper), + } +} + +type subBus struct { + bus ari.Bus + subs []ari.Subscription +} + +func (b *subBus) Close() { + for _, s := range b.subs { + s.Cancel() + } + b.subs = nil + + // NOTE: we are NOT closing the parent bus here and now, since it could be used by any number of other clients + // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. +} + +func (b *subBus) Send(e ari.Event) { + b.bus.Send(e) +} + +func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { + sub := b.bus.Subscribe(key, eTypes...) + + b.subs = append(b.subs, sub) + + return sub +} + +// SubBus creates and returns a new ariBus which is subtended from this one +func (b *Bus) SubBus() ari.Bus { + return &subBus{ + bus: b, } } @@ -53,26 +137,15 @@ func (b *Bus) subjectFromKey(key *ari.Key) string { return subj + key.Node } -// Subscription represents an ari.Subscription over NATS -type Subscription struct { - key *ari.Key - - log log15.Logger - - subscription *nats.Subscription - - eventChan chan ari.Event - - events []string - - closed bool - - mu sync.RWMutex -} - // Close implements ari.Bus func (b *Bus) Close() { - // No-op + b.mu.Lock() + + for _, w := range b.subjectBuses { + w.Close() + } + + b.mu.Unlock() } // Send implements ari.Bus @@ -84,88 +157,19 @@ func (b *Bus) Send(e ari.Event) { func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription { var err error - s := &Subscription{ - key: key, - log: b.log, - eventChan: make(chan ari.Event, EventChanBufferLength), - events: n, - } - - s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) { - s.receive(m) - }) - if err != nil { - b.log.Error("failed to subscribe to NATS", "error", err) - return nil - } - return s -} - -// Events returns the channel on which events from this subscription will be sent -func (s *Subscription) Events() <-chan ari.Event { - return s.eventChan -} - -// Cancel destroys the subscription -func (s *Subscription) Cancel() { - if s == nil { - return - } + subject := b.subjectFromKey(key) - if s.subscription != nil { - err := s.subscription.Unsubscribe() + b.mu.Lock() + w, ok := b.subjectBuses[subject] + if !ok { + w, err = newBusWrapper(subject, b.nc, b.log) if err != nil { - s.log.Error("failed unsubscribe from NATS", "error", err) - } - } - - s.mu.Lock() - if !s.closed { - s.closed = true - close(s.eventChan) - } - s.mu.Unlock() -} - -func (s *Subscription) receive(o *nats.Msg) { - e, err := ari.DecodeEvent(o.Data) - if err != nil { - s.log.Error("failed to convert received message to ari.Event", "error", err) - return - } - - if s.matchEvent(e) { - s.mu.RLock() - if !s.closed { - s.eventChan <- e + b.log.Error("failed to create bus wrapper", "key", key, "error", err) + return nil } - s.mu.RUnlock() + b.subjectBuses[subject] = w } -} + b.mu.Unlock() -func (s *Subscription) matchEvent(o ari.Event) bool { - // First, filter by type - var match bool - for _, kind := range s.events { - if kind == o.GetType() || kind == ari.Events.All { - match = true - break - } - } - if !match { - return false - } - - // If we don't have a resource ID, we match everything - // Next, match the entity - if s.key == nil || s.key.ID == "" { - return true - } - - for _, k := range o.Keys() { - if s.key.Match(k) { - return true - } - } - return false + return w.bus.Subscribe(key, n...) } diff --git a/client/bus/bus_test.go b/client/bus/bus_test.go deleted file mode 100644 index b1e9dcd..0000000 --- a/client/bus/bus_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package bus - -import ( - "testing" - "time" - - "github.com/inconshreveable/log15" - - "github.com/CyCoreSystems/ari/v5" -) - -func TestMatchEvent(t *testing.T) { - key := &ari.Key{ - Kind: ari.ChannelKey, - ID: "testA", - Node: "0test0", - App: "testApp", - } - - e := &ari.StasisEnd{ - EventData: ari.EventData{ - Type: "StasisEnd", - Application: "testApp", - Node: "0test0", - Timestamp: ari.DateTime(time.Now()), - }, - Header: make(ari.Header), - Channel: ari.ChannelData{ - Key: nil, - ID: "testB", - Name: "Local/bozo", - State: "up", - Accountcode: "49er", - Dialplan: &ari.DialplanCEP{ - Context: "default", - Exten: "s", - Priority: 1, - }, - }, - } - - s := &Subscription{ - key: key, - log: log15.New(), - eventChan: make(chan ari.Event, EventChanBufferLength), - events: []string{"StasisEnd"}, - } - - if s.matchEvent(e) { - t.Error("matched incorrect event") - } -} diff --git a/client/client.go b/client/client.go index 8e3239a..fed72bd 100644 --- a/client/client.go +++ b/client/client.go @@ -44,6 +44,9 @@ var ErrNil = errors.New("Nil") // family of derived clients. It manages stateful elements such as the bus, // the NATS connection, and the cluster membership type core struct { + // bus is the core ari-proxy bus, which handles NATS subscription binding + bus *bus.Bus + // cluster describes the cluster of ARI proxies cluster *cluster.Cluster @@ -226,15 +229,18 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + + // Extract a SubBus from that core bus + c.bus = c.core.bus.SubBus() + // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } - // Create the bus - c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Call Close whenever the context is closed go func() { <-ctx.Done() @@ -262,7 +268,7 @@ func (c *Client) New(ctx context.Context) *Client { appName: c.appName, cancel: cancel, core: c.core, - bus: bus.New(c.core.prefix, c.core.nc, c.core.log), + bus: c.core.bus.SubBus(), } } From 2ad4facf079b26a78859765f0b42962dae744e33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Thu, 19 Mar 2020 10:20:20 -0400 Subject: [PATCH 2/6] create SubBus after Start --- client/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index fed72bd..d8629ab 100644 --- a/client/client.go +++ b/client/client.go @@ -232,15 +232,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { // Create the core bus c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Extract a SubBus from that core bus - c.bus = c.core.bus.SubBus() - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) + c.bus = c.core.bus.SubBus() + // Call Close whenever the context is closed go func() { <-ctx.Done() @@ -326,7 +326,7 @@ func WithURI(uri string) OptionFunc { // WithNATS binds an existing NATS connection func WithNATS(nc *nats.EncodedConn) OptionFunc { return func(c *Client) { - c.nc = nc + c.core.nc = nc } } From b59fa270726a6ed22c3e3cd1c31e87e9417acc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Thu, 19 Mar 2020 10:33:35 -0400 Subject: [PATCH 3/6] create Bus after Start, too --- client/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index d8629ab..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } - // Create the core bus - c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) c.bus = c.core.bus.SubBus() From 21fe6fb44e6ba5f1980f3198090d5788f4e43927 Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Thu, 2 Jul 2020 12:28:47 +0200 Subject: [PATCH 4/6] Attempt to fix leak using callback from stdbus --- client/bus/bus.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index cbab820..bcf81d6 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -97,13 +97,24 @@ func (b *subBus) Close() { // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. } +func (b *subBus) Cancel(s ari.Subscription) { + for i, si := range b.subs { + if s == si { + b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end + b.subs[len(b.subs)-1] = nil // remove the end + b.subs = b.subs[:len(b.subs)-1] // lop off the end + break + } + } +} + func (b *subBus) Send(e ari.Event) { b.bus.Send(e) } func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) - + sub.SetCallback(b.Cancel) b.subs = append(b.subs, sub) return sub From 8bee712917d4767d41057f34b97e594a330a32db Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Thu, 2 Jul 2020 14:45:19 +0200 Subject: [PATCH 5/6] Add locking --- client/bus/bus.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/bus/bus.go b/client/bus/bus.go index bcf81d6..4239ba7 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -85,6 +85,7 @@ func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { type subBus struct { bus ari.Bus subs []ari.Subscription + mu sync.Mutex } func (b *subBus) Close() { @@ -97,7 +98,9 @@ func (b *subBus) Close() { // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. } +// Used as callback from stdbus func (b *subBus) Cancel(s ari.Subscription) { + b.mu.Lock() for i, si := range b.subs { if s == si { b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end @@ -106,6 +109,7 @@ func (b *subBus) Cancel(s ari.Subscription) { break } } + b.mu.Unlock() } func (b *subBus) Send(e ari.Event) { @@ -115,7 +119,9 @@ func (b *subBus) Send(e ari.Event) { func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) sub.SetCallback(b.Cancel) + b.mu.Lock() b.subs = append(b.subs, sub) + b.mu.Unlock() return sub } From 56edf9574214c102843987d24da599b05db8475d Mon Sep 17 00:00:00 2001 From: Morten Tryfoss Date: Fri, 3 Jul 2020 08:59:34 +0200 Subject: [PATCH 6/6] Changed to fit new structure of ari module --- client/bus/bus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/bus/bus.go b/client/bus/bus.go index 4239ba7..a658fc5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -99,7 +99,7 @@ func (b *subBus) Close() { } // Used as callback from stdbus -func (b *subBus) Cancel(s ari.Subscription) { +func (b *subBus) Cancel(s interface{}) { b.mu.Lock() for i, si := range b.subs { if s == si { @@ -118,7 +118,7 @@ func (b *subBus) Send(e ari.Event) { func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { sub := b.bus.Subscribe(key, eTypes...) - sub.SetCallback(b.Cancel) + sub.AddCancelCallback(b.Cancel) b.mu.Lock() b.subs = append(b.subs, sub) b.mu.Unlock()