Skip to content

Commit

Permalink
Rewrite proxy client bus
Browse files Browse the repository at this point in the history
Previously, we created a new NATS subscription for each ARI
subscription, which is inefficient for large numbers of similar
subscriptions.  (See Issue #26)

With this change, we now retain a single NATS subscription for any
number of ari subscriptions for which the same NATS subscription is
sufficient, regardless of where in the tree of ARI clients it may be.

To do this, we create a root-level (core) back-to-back event bus, with
the NATS subscription being bound to a stdbus ARI bus.  All ari-proxy
clients subtend a new SubBus, which forwards all requests on to the core
ARI bus, registering them along the way.  When the SubBus is closed, the
bound subscriptions will be closed, but the root-level bus itself will
stay in existence.

There is a remaining issue wherein long-running root-level ari-proxy
clients will accumulate NATS subscriptions and not have them terminate
even after all SubBuses are closed.  This will need to be fixed in a
later patch, but it should not affect most uses, for now.

Fixes #26
  • Loading branch information
Ulexus committed Mar 18, 2020
1 parent edbc162 commit 47c5cef
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 159 deletions.
210 changes: 107 additions & 103 deletions client/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,59 @@ import (
"sync"

"github.com/CyCoreSystems/ari/v5"
"github.com/CyCoreSystems/ari/v5/stdbus"
"github.com/inconshreveable/log15"
"github.com/pkg/errors"

"github.com/nats-io/nats.go"
)

// EventChanBufferLength is the number of unhandled events which can be queued
// to the event channel buffer before further events are lost.
var EventChanBufferLength = 10
// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus
type busWrapper struct {
subject string

log log15.Logger

sub *nats.Subscription

bus ari.Bus
}

func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) {
var err error

w := &busWrapper{
subject: subject,
log: log,
bus: stdbus.New(),
}

w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) {
w.receive(m)
})
if err != nil {
return nil, errors.Wrapf(err, "failed to subscribe to NATS subject %s", subject)
}

return w, nil
}

func (w *busWrapper) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
w.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

w.bus.Send(e)
}

func (w *busWrapper) Close() {
if err := w.sub.Unsubscribe(); err != nil {
w.log.Error("failed to unsubscribe when closing NATS subscription:", err)
}
w.bus.Close()
}

// Bus provides an ari.Bus interface to NATS
type Bus struct {
Expand All @@ -21,14 +66,53 @@ type Bus struct {
log log15.Logger

nc *nats.EncodedConn

subjectBuses map[string]*busWrapper

mu sync.RWMutex
}

// New returns a new Bus
func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus {
return &Bus{
prefix: prefix,
log: log,
nc: nc,
prefix: prefix,
log: log,
nc: nc,
subjectBuses: make(map[string]*busWrapper),
}
}

type subBus struct {
bus ari.Bus
subs []ari.Subscription
}

func (b *subBus) Close() {
for _, s := range b.subs {
s.Cancel()
}
b.subs = nil

// NOTE: we are NOT closing the parent bus here and now, since it could be used by any number of other clients
// TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed.
}

func (b *subBus) Send(e ari.Event) {
b.bus.Send(e)
}

func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription {
sub := b.bus.Subscribe(key, eTypes...)

b.subs = append(b.subs, sub)

return sub
}

// SubBus creates and returns a new ariBus which is subtended from this one
func (b *Bus) SubBus() ari.Bus {
return &subBus{
bus: b,
}
}

Expand All @@ -53,26 +137,15 @@ func (b *Bus) subjectFromKey(key *ari.Key) string {
return subj + key.Node
}

// Subscription represents an ari.Subscription over NATS
type Subscription struct {
key *ari.Key

log log15.Logger

subscription *nats.Subscription

eventChan chan ari.Event

events []string

closed bool

mu sync.RWMutex
}

// Close implements ari.Bus
func (b *Bus) Close() {
// No-op
b.mu.Lock()

for _, w := range b.subjectBuses {
w.Close()
}

b.mu.Unlock()
}

// Send implements ari.Bus
Expand All @@ -84,88 +157,19 @@ func (b *Bus) Send(e ari.Event) {
func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription {
var err error

s := &Subscription{
key: key,
log: b.log,
eventChan: make(chan ari.Event, EventChanBufferLength),
events: n,
}

s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) {
s.receive(m)
})
if err != nil {
b.log.Error("failed to subscribe to NATS", "error", err)
return nil
}
return s
}

// Events returns the channel on which events from this subscription will be sent
func (s *Subscription) Events() <-chan ari.Event {
return s.eventChan
}

// Cancel destroys the subscription
func (s *Subscription) Cancel() {
if s == nil {
return
}
subject := b.subjectFromKey(key)

if s.subscription != nil {
err := s.subscription.Unsubscribe()
b.mu.Lock()
w, ok := b.subjectBuses[subject]
if !ok {
w, err = newBusWrapper(subject, b.nc, b.log)
if err != nil {
s.log.Error("failed unsubscribe from NATS", "error", err)
}
}

s.mu.Lock()
if !s.closed {
s.closed = true
close(s.eventChan)
}
s.mu.Unlock()
}

func (s *Subscription) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
s.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

if s.matchEvent(e) {
s.mu.RLock()
if !s.closed {
s.eventChan <- e
b.log.Error("failed to create bus wrapper", "key", key, "error", err)
return nil
}
s.mu.RUnlock()
b.subjectBuses[subject] = w
}
}
b.mu.Unlock()

func (s *Subscription) matchEvent(o ari.Event) bool {
// First, filter by type
var match bool
for _, kind := range s.events {
if kind == o.GetType() || kind == ari.Events.All {
match = true
break
}
}
if !match {
return false
}

// If we don't have a resource ID, we match everything
// Next, match the entity
if s.key == nil || s.key.ID == "" {
return true
}

for _, k := range o.Keys() {
if s.key.Match(k) {
return true
}
}
return false
return w.bus.Subscribe(key, n...)
}
52 changes: 0 additions & 52 deletions client/bus/bus_test.go

This file was deleted.

14 changes: 10 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var ErrNil = errors.New("Nil")
// family of derived clients. It manages stateful elements such as the bus,
// the NATS connection, and the cluster membership
type core struct {
// bus is the core ari-proxy bus, which handles NATS subscription binding
bus *bus.Bus

// cluster describes the cluster of ARI proxies
cluster *cluster.Cluster

Expand Down Expand Up @@ -226,15 +229,18 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) {
opt(c)
}

// Create the core bus
c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Extract a SubBus from that core bus
c.bus = c.core.bus.SubBus()

// Start the core, if it is not already started
err := c.core.Start()
if err != nil {
return nil, errors.Wrap(err, "failed to start core")
}

// Create the bus
c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Call Close whenever the context is closed
go func() {
<-ctx.Done()
Expand Down Expand Up @@ -262,7 +268,7 @@ func (c *Client) New(ctx context.Context) *Client {
appName: c.appName,
cancel: cancel,
core: c.core,
bus: bus.New(c.core.prefix, c.core.nc, c.core.log),
bus: c.core.bus.SubBus(),
}
}

Expand Down

0 comments on commit 47c5cef

Please sign in to comment.