diff --git a/client/bus/bus.go b/client/bus/bus.go index 677f5be..a658fc5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -5,14 +5,59 @@ import ( "sync" "github.com/CyCoreSystems/ari/v5" + "github.com/CyCoreSystems/ari/v5/stdbus" "github.com/inconshreveable/log15" + "github.com/pkg/errors" "github.com/nats-io/nats.go" ) -// EventChanBufferLength is the number of unhandled events which can be queued -// to the event channel buffer before further events are lost. -var EventChanBufferLength = 10 +// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus +type busWrapper struct { + subject string + + log log15.Logger + + sub *nats.Subscription + + bus ari.Bus +} + +func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) { + var err error + + w := &busWrapper{ + subject: subject, + log: log, + bus: stdbus.New(), + } + + w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) { + w.receive(m) + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to subscribe to NATS subject %s", subject) + } + + return w, nil +} + +func (w *busWrapper) receive(o *nats.Msg) { + e, err := ari.DecodeEvent(o.Data) + if err != nil { + w.log.Error("failed to convert received message to ari.Event", "error", err) + return + } + + w.bus.Send(e) +} + +func (w *busWrapper) Close() { + if err := w.sub.Unsubscribe(); err != nil { + w.log.Error("failed to unsubscribe when closing NATS subscription:", err) + } + w.bus.Close() +} // Bus provides an ari.Bus interface to NATS type Bus struct { @@ -21,14 +66,70 @@ type Bus struct { log log15.Logger nc *nats.EncodedConn + + subjectBuses map[string]*busWrapper + + mu sync.RWMutex } // New returns a new Bus func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus { return &Bus{ - prefix: prefix, - log: log, - nc: nc, + prefix: prefix, + log: log, + nc: nc, + subjectBuses: make(map[string]*busWrapper), + } +} + +type subBus struct { + bus ari.Bus + subs []ari.Subscription + mu sync.Mutex +} + +func (b *subBus) Close() { + for _, s := range b.subs { + s.Cancel() + } + b.subs = nil + + // NOTE: we are NOT closing the parent bus here and now, since it could be used by any number of other clients + // TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed. +} + +// Used as callback from stdbus +func (b *subBus) Cancel(s interface{}) { + b.mu.Lock() + for i, si := range b.subs { + if s == si { + b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end + b.subs[len(b.subs)-1] = nil // remove the end + b.subs = b.subs[:len(b.subs)-1] // lop off the end + break + } + } + b.mu.Unlock() +} + +func (b *subBus) Send(e ari.Event) { + b.bus.Send(e) +} + +func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { + sub := b.bus.Subscribe(key, eTypes...) + sub.AddCancelCallback(b.Cancel) + b.mu.Lock() + b.subs = append(b.subs, sub) + b.mu.Unlock() + + return sub +} + +// SubBus creates and returns a new ariBus which is subtended from this one +func (b *Bus) SubBus() ari.Bus { + return &subBus{ + bus: b, } } @@ -53,26 +154,15 @@ func (b *Bus) subjectFromKey(key *ari.Key) string { return subj + key.Node } -// Subscription represents an ari.Subscription over NATS -type Subscription struct { - key *ari.Key - - log log15.Logger - - subscription *nats.Subscription - - eventChan chan ari.Event - - events []string - - closed bool - - mu sync.RWMutex -} - // Close implements ari.Bus func (b *Bus) Close() { - // No-op + b.mu.Lock() + + for _, w := range b.subjectBuses { + w.Close() + } + + b.mu.Unlock() } // Send implements ari.Bus @@ -84,88 +174,19 @@ func (b *Bus) Send(e ari.Event) { func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription { var err error - s := &Subscription{ - key: key, - log: b.log, - eventChan: make(chan ari.Event, EventChanBufferLength), - events: n, - } - - s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) { - s.receive(m) - }) - if err != nil { - b.log.Error("failed to subscribe to NATS", "error", err) - return nil - } - return s -} - -// Events returns the channel on which events from this subscription will be sent -func (s *Subscription) Events() <-chan ari.Event { - return s.eventChan -} - -// Cancel destroys the subscription -func (s *Subscription) Cancel() { - if s == nil { - return - } + subject := b.subjectFromKey(key) - if s.subscription != nil { - err := s.subscription.Unsubscribe() + b.mu.Lock() + w, ok := b.subjectBuses[subject] + if !ok { + w, err = newBusWrapper(subject, b.nc, b.log) if err != nil { - s.log.Error("failed unsubscribe from NATS", "error", err) + b.log.Error("failed to create bus wrapper", "key", key, "error", err) + return nil } + b.subjectBuses[subject] = w } + b.mu.Unlock() - s.mu.Lock() - if !s.closed { - s.closed = true - close(s.eventChan) - } - s.mu.Unlock() -} - -func (s *Subscription) receive(o *nats.Msg) { - e, err := ari.DecodeEvent(o.Data) - if err != nil { - s.log.Error("failed to convert received message to ari.Event", "error", err) - return - } - - if s.matchEvent(e) { - s.mu.RLock() - if !s.closed { - s.eventChan <- e - } - s.mu.RUnlock() - } -} - -func (s *Subscription) matchEvent(o ari.Event) bool { - // First, filter by type - var match bool - for _, kind := range s.events { - if kind == o.GetType() || kind == ari.Events.All { - match = true - break - } - } - if !match { - return false - } - - // If we don't have a resource ID, we match everything - // Next, match the entity - if s.key == nil || s.key.ID == "" { - return true - } - - for _, k := range o.Keys() { - if s.key.Match(k) { - return true - } - } - return false + return w.bus.Subscribe(key, n...) } diff --git a/client/bus/bus_test.go b/client/bus/bus_test.go deleted file mode 100644 index b1e9dcd..0000000 --- a/client/bus/bus_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package bus - -import ( - "testing" - "time" - - "github.com/inconshreveable/log15" - - "github.com/CyCoreSystems/ari/v5" -) - -func TestMatchEvent(t *testing.T) { - key := &ari.Key{ - Kind: ari.ChannelKey, - ID: "testA", - Node: "0test0", - App: "testApp", - } - - e := &ari.StasisEnd{ - EventData: ari.EventData{ - Type: "StasisEnd", - Application: "testApp", - Node: "0test0", - Timestamp: ari.DateTime(time.Now()), - }, - Header: make(ari.Header), - Channel: ari.ChannelData{ - Key: nil, - ID: "testB", - Name: "Local/bozo", - State: "up", - Accountcode: "49er", - Dialplan: &ari.DialplanCEP{ - Context: "default", - Exten: "s", - Priority: 1, - }, - }, - } - - s := &Subscription{ - key: key, - log: log15.New(), - eventChan: make(chan ari.Event, EventChanBufferLength), - events: []string{"StasisEnd"}, - } - - if s.matchEvent(e) { - t.Error("matched incorrect event") - } -} diff --git a/client/client.go b/client/client.go index 8e3239a..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -44,6 +44,9 @@ var ErrNil = errors.New("Nil") // family of derived clients. It manages stateful elements such as the bus, // the NATS connection, and the cluster membership type core struct { + // bus is the core ari-proxy bus, which handles NATS subscription binding + bus *bus.Bus + // cluster describes the cluster of ARI proxies cluster *cluster.Cluster @@ -232,8 +235,11 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { return nil, errors.Wrap(err, "failed to start core") } - // Create the bus - c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) + c.bus = c.core.bus.SubBus() // Call Close whenever the context is closed go func() { @@ -262,7 +268,7 @@ func (c *Client) New(ctx context.Context) *Client { appName: c.appName, cancel: cancel, core: c.core, - bus: bus.New(c.core.prefix, c.core.nc, c.core.log), + bus: c.core.bus.SubBus(), } } @@ -320,7 +326,7 @@ func WithURI(uri string) OptionFunc { // WithNATS binds an existing NATS connection func WithNATS(nc *nats.EncodedConn) OptionFunc { return func(c *Client) { - c.nc = nc + c.core.nc = nc } }