Skip to content

Commit

Permalink
Fixed: Races caught by backend integration testing (#89)
Browse files Browse the repository at this point in the history
* Fixed: Races within manipvortex

* Fixed: Another race

* Temp: Point to fork

* Fixed: Protect from encoding race condition

* Fixed: More read protection

* Update: Enable typecheck

* Revert "Temp: Point to fork"

This reverts commit e17a9e7.

* Update: Make errors distinct to help identify what is full

* Attempt: Only queue events we care about

* Update: Remove a lock and copystructure and rearrange one other lock

* Update: Move dropping of messages into debug territory

* Update: Remove changes to allow for others to tackle

* Update: Remove changes to expose data race

* Fixed: Use copy of object in manipmemory

* Update: Address review comments
  • Loading branch information
Eric Powers authored and primalmotion committed Jul 25, 2019
1 parent 2a9a3c7 commit 6ef3cdd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
7 changes: 2 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ lint:
--enable=misspell \
--enable=prealloc \
--enable=nakedret \
--enable=typecheck \
./...

.PHONY: test
test:
@ echo 'mode: atomic' > unit_coverage.cov
@ for d in $(shell go list ./... | grep -v vendor); do \
go test -race -coverprofile=profile.out -covermode=atomic "$$d"; \
if [ -f profile.out ]; then tail -q -n +2 profile.out >> unit_coverage.cov; rm -f profile.out; fi; \
done;
@ go test ./... -race -cover -covermode=atomic -coverprofile=unit_coverage.cov

coverage_aggregate:
@ mkdir -p artifacts
Expand Down
31 changes: 27 additions & 4 deletions manipmemory/manipulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/globalsign/mgo/bson"
memdb "github.com/hashicorp/go-memdb"
"github.com/mitchellh/copystructure"
"go.aporeto.io/elemental"
"go.aporeto.io/manipulate"
)
Expand Down Expand Up @@ -111,7 +112,12 @@ func (m *memdbManipulator) Retrieve(mctx manipulate.Context, object elemental.Id
return manipulate.NewErrObjectNotFound("cannot find the object for the given ID")
}

reflect.ValueOf(object).Elem().Set(reflect.ValueOf(raw).Elem())
cp, err := copystructure.Copy(raw)
if err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}

reflect.ValueOf(object).Elem().Set(reflect.ValueOf(cp).Elem())

return nil
}
Expand All @@ -133,7 +139,12 @@ func (m *memdbManipulator) Create(mctx manipulate.Context, object elemental.Iden
object.SetIdentifier(bson.NewObjectId().Hex())
}

if err := txn.Insert(object.Identity().Category, object); err != nil {
cp, err := copystructure.Copy(object)
if err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}

if err := txn.Insert(object.Identity().Category, cp); err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}

Expand All @@ -160,7 +171,12 @@ func (m *memdbManipulator) Update(mctx manipulate.Context, object elemental.Iden
return manipulate.NewErrObjectNotFound("Cannot find object with given ID")
}

if err := txn.Insert(object.Identity().Category, object); err != nil {
cp, err := copystructure.Copy(object)
if err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}

if err := txn.Insert(object.Identity().Category, cp); err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}

Expand Down Expand Up @@ -403,7 +419,14 @@ func (m *memdbManipulator) retrieveIntersection(identity string, k string, value
raw := iterator.Next()

for raw != nil {
obj := raw.(elemental.Identifiable)
o, err := copystructure.Copy(raw)
if err != nil {
return manipulate.NewErrCannotExecuteQuery(err.Error())
}
obj, ok := o.(elemental.Identifiable)
if !ok {
return manipulate.NewErrCannotExecuteQuery("stored object is not an identifiable")
}
if _, ok := existingItems[obj.Identifier()]; ok || fullquery {
combinedItems[obj.Identifier()] = obj
}
Expand Down
34 changes: 21 additions & 13 deletions manipvortex/manipulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync"
"time"

"github.com/mitchellh/copystructure"
"go.aporeto.io/elemental"
"go.aporeto.io/manipulate"
"go.uber.org/zap"
Expand Down Expand Up @@ -334,8 +333,8 @@ func (m *vortexManipulator) registerSubscriber(s manipulate.Subscriber) {
// UpdateFilter updates the current filter.
func (m *vortexManipulator) updateFilter() {

m.RLock()
defer m.RUnlock()
m.Lock()
defer m.Unlock()

if m.upstreamSubscriber == nil {
return
Expand Down Expand Up @@ -697,40 +696,49 @@ func (m *vortexManipulator) monitor(ctx context.Context) {

func (m *vortexManipulator) pushEvent(evt *elemental.Event) {

m.RLock()
defer m.RUnlock()

for _, s := range m.subscribers {
sevent, err := copystructure.Copy(evt)
if err != nil {
zap.L().Error("failed to copy event", zap.Error(err))
continue
}

if !s.filter.IsFilteredOut(evt.Identity, evt.Type) {
s.RLock()
isFiltered := s.filter.IsFilteredOut(evt.Identity, evt.Type)
s.RUnlock()

if !isFiltered {
select {
case s.subscriberEventChannel <- sevent.(*elemental.Event):
case s.subscriberEventChannel <- evt.Duplicate():
default:
zap.L().Error("Subscriber channel is full")
zap.L().Error("Subscriber event channel is full")
}
}
}
}

func (m *vortexManipulator) pushStatus(status manipulate.SubscriberStatus) {

m.RLock()
defer m.RUnlock()

for _, s := range m.subscribers {
select {
case s.subscriberStatusChannel <- status:
default:
zap.L().Error("Subscriber channel is full")
zap.L().Error("Subscriber status channel is full", zap.Int("status", int(status)))
}
}
}

func (m *vortexManipulator) pushErrors(err error) {

m.RLock()
defer m.RUnlock()

for _, s := range m.subscribers {
select {
case s.subscriberErrorChannel <- err:
default:
zap.L().Error("Subscriber channel is full")
zap.L().Error("Subscriber error channel is full", zap.Error(err))
}
}
}
Expand Down

0 comments on commit 6ef3cdd

Please sign in to comment.