Skip to content

Commit

Permalink
Merge pull request #1 from mtryfoss/issue-26-fixleak-callback
Browse files Browse the repository at this point in the history
Fix performance issue
  • Loading branch information
mtryfoss authored Sep 21, 2020
2 parents 1a170bf + 56edf95 commit 457de47
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 159 deletions.
227 changes: 124 additions & 103 deletions client/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,59 @@ import (
"sync"

"github.com/CyCoreSystems/ari/v5"
"github.com/CyCoreSystems/ari/v5/stdbus"
"github.com/inconshreveable/log15"
"github.com/pkg/errors"

"github.com/nats-io/nats.go"
)

// EventChanBufferLength is the number of unhandled events which can be queued
// to the event channel buffer before further events are lost.
var EventChanBufferLength = 10
// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus
type busWrapper struct {
subject string

log log15.Logger

sub *nats.Subscription

bus ari.Bus
}

func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) {
var err error

w := &busWrapper{
subject: subject,
log: log,
bus: stdbus.New(),
}

w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) {
w.receive(m)
})
if err != nil {
return nil, errors.Wrapf(err, "failed to subscribe to NATS subject %s", subject)
}

return w, nil
}

func (w *busWrapper) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
w.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

w.bus.Send(e)
}

func (w *busWrapper) Close() {
if err := w.sub.Unsubscribe(); err != nil {
w.log.Error("failed to unsubscribe when closing NATS subscription:", err)
}
w.bus.Close()
}

// Bus provides an ari.Bus interface to NATS
type Bus struct {
Expand All @@ -21,14 +66,70 @@ type Bus struct {
log log15.Logger

nc *nats.EncodedConn

subjectBuses map[string]*busWrapper

mu sync.RWMutex
}

// New returns a new Bus
func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus {
return &Bus{
prefix: prefix,
log: log,
nc: nc,
prefix: prefix,
log: log,
nc: nc,
subjectBuses: make(map[string]*busWrapper),
}
}

type subBus struct {
bus ari.Bus
subs []ari.Subscription
mu sync.Mutex
}

func (b *subBus) Close() {
for _, s := range b.subs {
s.Cancel()
}
b.subs = nil

// NOTE: we are NOT closing the parent bus here and now, since it could be used by any number of other clients
// TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed.
}

// Used as callback from stdbus
func (b *subBus) Cancel(s interface{}) {
b.mu.Lock()
for i, si := range b.subs {
if s == si {
b.subs[i] = b.subs[len(b.subs)-1] // replace the current with the end
b.subs[len(b.subs)-1] = nil // remove the end
b.subs = b.subs[:len(b.subs)-1] // lop off the end
break
}
}
b.mu.Unlock()
}

func (b *subBus) Send(e ari.Event) {
b.bus.Send(e)
}

func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription {
sub := b.bus.Subscribe(key, eTypes...)
sub.AddCancelCallback(b.Cancel)
b.mu.Lock()
b.subs = append(b.subs, sub)
b.mu.Unlock()

return sub
}

// SubBus creates and returns a new ariBus which is subtended from this one
func (b *Bus) SubBus() ari.Bus {
return &subBus{
bus: b,
}
}

Expand All @@ -53,26 +154,15 @@ func (b *Bus) subjectFromKey(key *ari.Key) string {
return subj + key.Node
}

// Subscription represents an ari.Subscription over NATS
type Subscription struct {
key *ari.Key

log log15.Logger

subscription *nats.Subscription

eventChan chan ari.Event

events []string

closed bool

mu sync.RWMutex
}

// Close implements ari.Bus
func (b *Bus) Close() {
// No-op
b.mu.Lock()

for _, w := range b.subjectBuses {
w.Close()
}

b.mu.Unlock()
}

// Send implements ari.Bus
Expand All @@ -84,88 +174,19 @@ func (b *Bus) Send(e ari.Event) {
func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription {
var err error

s := &Subscription{
key: key,
log: b.log,
eventChan: make(chan ari.Event, EventChanBufferLength),
events: n,
}

s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) {
s.receive(m)
})
if err != nil {
b.log.Error("failed to subscribe to NATS", "error", err)
return nil
}
return s
}

// Events returns the channel on which events from this subscription will be sent
func (s *Subscription) Events() <-chan ari.Event {
return s.eventChan
}

// Cancel destroys the subscription
func (s *Subscription) Cancel() {
if s == nil {
return
}
subject := b.subjectFromKey(key)

if s.subscription != nil {
err := s.subscription.Unsubscribe()
b.mu.Lock()
w, ok := b.subjectBuses[subject]
if !ok {
w, err = newBusWrapper(subject, b.nc, b.log)
if err != nil {
s.log.Error("failed unsubscribe from NATS", "error", err)
b.log.Error("failed to create bus wrapper", "key", key, "error", err)
return nil
}
b.subjectBuses[subject] = w
}
b.mu.Unlock()

s.mu.Lock()
if !s.closed {
s.closed = true
close(s.eventChan)
}
s.mu.Unlock()
}

func (s *Subscription) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
s.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

if s.matchEvent(e) {
s.mu.RLock()
if !s.closed {
s.eventChan <- e
}
s.mu.RUnlock()
}
}

func (s *Subscription) matchEvent(o ari.Event) bool {
// First, filter by type
var match bool
for _, kind := range s.events {
if kind == o.GetType() || kind == ari.Events.All {
match = true
break
}
}
if !match {
return false
}

// If we don't have a resource ID, we match everything
// Next, match the entity
if s.key == nil || s.key.ID == "" {
return true
}

for _, k := range o.Keys() {
if s.key.Match(k) {
return true
}
}
return false
return w.bus.Subscribe(key, n...)
}
52 changes: 0 additions & 52 deletions client/bus/bus_test.go

This file was deleted.

14 changes: 10 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var ErrNil = errors.New("Nil")
// family of derived clients. It manages stateful elements such as the bus,
// the NATS connection, and the cluster membership
type core struct {
// bus is the core ari-proxy bus, which handles NATS subscription binding
bus *bus.Bus

// cluster describes the cluster of ARI proxies
cluster *cluster.Cluster

Expand Down Expand Up @@ -232,8 +235,11 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) {
return nil, errors.Wrap(err, "failed to start core")
}

// Create the bus
c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)
// Create the core bus
c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists)
c.bus = c.core.bus.SubBus()

// Call Close whenever the context is closed
go func() {
Expand Down Expand Up @@ -262,7 +268,7 @@ func (c *Client) New(ctx context.Context) *Client {
appName: c.appName,
cancel: cancel,
core: c.core,
bus: bus.New(c.core.prefix, c.core.nc, c.core.log),
bus: c.core.bus.SubBus(),
}
}

Expand Down Expand Up @@ -320,7 +326,7 @@ func WithURI(uri string) OptionFunc {
// WithNATS binds an existing NATS connection
func WithNATS(nc *nats.EncodedConn) OptionFunc {
return func(c *Client) {
c.nc = nc
c.core.nc = nc
}
}

Expand Down

0 comments on commit 457de47

Please sign in to comment.