Skip to content

Commit

Permalink
send notification from DAG with AND without payload (#1112) (#1125)
Browse files Browse the repository at this point in the history
* send notification from DAG with AND without payload
* separate observers for dag.STate
* add integration test for tree updates for priate tx
  • Loading branch information
woutslakhorst authored May 18, 2022
1 parent 43004a0 commit ef9f678
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 47 deletions.
14 changes: 10 additions & 4 deletions network/dag/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type State interface {
// Deprecated: remove with V1 protocol
ReadManyPayloads(ctx context.Context, consumer func(context.Context, PayloadReader) error) error

// Add a transactions to the DAG. If it can't be added an error is returned.
// Add a transaction to the DAG. If it can't be added an error is returned.
// If the transaction already exists, nothing is added and no observers are notified.
// The payload may be passed as well. Allowing for better notification of observers
Add(ctx context.Context, transactions Transaction, payload []byte) error
Expand All @@ -61,9 +61,12 @@ type State interface {
IsPresent(context.Context, hash.SHA256Hash) (bool, error)
// PayloadHashes applies the visitor function to the payload hashes of all transactions, in random order.
PayloadHashes(ctx context.Context, visitor func(payloadHash hash.SHA256Hash) error) error
// RegisterObserver allows observers to be notified when a transaction is added to the DAG.
// RegisterTransactionObserver allows observers to be notified when a transaction is added to the DAG.
// If the observer needs to be called within the transaction, transactional must be true.
RegisterObserver(observer Observer, transactional bool)
RegisterTransactionObserver(observer Observer, transactional bool)
// RegisterPayloadObserver allows observers to be notified when a payload is written to the store.
// If the observer needs to be called within the transaction, transactional must be true.
RegisterPayloadObserver(observer PayloadObserver, transactional bool)
// Subscribe lets an application subscribe to a specific type of transaction. When a new transaction is received
// the `receiver` function is called. If an asterisk (`*`) is specified as `payloadType` the receiver is subscribed
// to all payload types.
Expand Down Expand Up @@ -159,7 +162,10 @@ type PayloadReader interface {
}

// Observer defines the signature of an observer which can be called by an Observable.
type Observer func(ctx context.Context, transaction Transaction, payload []byte) error
type Observer func(ctx context.Context, transaction Transaction) error

// PayloadObserver defines the signature of an observer which can be called by an Observable.
type PayloadObserver func(ctx context.Context, transaction Transaction, payload []byte) error

// MinTime returns the minimum value for time.Time
func MinTime() time.Time {
Expand Down
24 changes: 18 additions & 6 deletions network/dag/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions network/dag/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ type replayingDAGPublisher struct {

func (s *replayingDAGPublisher) ConfigureCallbacks(state State) {
// the publisher only signals the VDR, VCR and transaction state. These need to be called after the bbolt transaction is completed.
state.RegisterObserver(func(ctx context.Context, transaction Transaction, payload []byte) error {
state.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error {
s.publishMux.Lock()
defer s.publishMux.Unlock()

if transaction != nil {
s.transactionAdded(ctx, transaction, payload)
}
if payload != nil {
s.payloadWritten(ctx, transaction, payload)
}
return nil
return s.transactionAdded(ctx, transaction, nil)
}, false)

state.RegisterPayloadObserver(func(ctx context.Context, transaction Transaction, payload []byte) error {
s.publishMux.Lock()
defer s.publishMux.Unlock()

return s.payloadWritten(ctx, transaction, payload)
}, false)
}

Expand Down
77 changes: 59 additions & 18 deletions network/dag/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ const (

// State has references to the DAG and the payload store.
type state struct {
db *bbolt.DB
graph *bboltDAG
payloadStore PayloadStore
transactionalObservers []Observer
nonTransactionalObservers []Observer
keyResolver types.KeyResolver
publisher Publisher
txVerifiers []Verifier
xorTree *bboltTree
ibltTree *bboltTree
db *bbolt.DB
graph *bboltDAG
payloadStore PayloadStore
transactionalObservers []Observer
nonTransactionalObservers []Observer
transactionalPayloadObservers []PayloadObserver
nonTransactionalPayloadObservers []PayloadObserver
keyResolver types.KeyResolver
publisher Publisher
txVerifiers []Verifier
xorTree *bboltTree
ibltTree *bboltTree
}

// NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions.
Expand Down Expand Up @@ -88,23 +90,36 @@ func NewState(dataDir string, verifiers ...Verifier) (State, error) {
newState.publisher = publisher

xorTree := newBBoltTreeStore(db, "xorBucket", tree.New(tree.NewXor(), PageSize))
newState.RegisterObserver(xorTree.dagObserver, true)
newState.xorTree = xorTree

ibltTree := newBBoltTreeStore(db, "ibltBucket", tree.New(tree.NewIblt(IbltNumBuckets), PageSize))
newState.RegisterObserver(ibltTree.dagObserver, true)
newState.ibltTree = ibltTree
newState.RegisterTransactionObserver(newState.treeObserver, true)

return newState, nil
}

func (s *state) RegisterObserver(observer Observer, transactional bool) {
func (s *state) RegisterTransactionObserver(observer Observer, transactional bool) {
if transactional {
s.transactionalObservers = append(s.transactionalObservers, observer)
} else {
s.nonTransactionalObservers = append(s.nonTransactionalObservers, observer)
}
}

func (s *state) RegisterPayloadObserver(observer PayloadObserver, transactional bool) {
if transactional {
s.transactionalPayloadObservers = append(s.transactionalPayloadObservers, observer)
} else {
s.nonTransactionalPayloadObservers = append(s.nonTransactionalPayloadObservers, observer)
}
}

func (s *state) treeObserver(ctx context.Context, transaction Transaction) error {
if err := s.ibltTree.dagObserver(ctx, transaction, nil); err != nil {
return err
}
return s.xorTree.dagObserver(ctx, transaction, nil)
}

func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte) error {
Expand All @@ -125,15 +140,15 @@ func (s *state) Add(ctx context.Context, transaction Transaction, payload []byte
if !transaction.PayloadHash().Equals(payloadHash) {
return errors.New("tx.PayloadHash does not match hash of payload")
}
if err := s.payloadStore.WritePayload(contextWithTX, payloadHash, payload); err != nil {
if err := s.WritePayload(contextWithTX, transaction, payloadHash, payload); err != nil {
return err
}
}
if err := s.graph.Add(contextWithTX, transaction); err != nil {
return err
}

return s.notifyObservers(contextWithTX, transaction, payload)
return s.notifyObservers(contextWithTX, transaction)
})
}

Expand Down Expand Up @@ -175,7 +190,7 @@ func (s *state) WritePayload(ctx context.Context, transaction Transaction, paylo
err := s.payloadStore.WritePayload(contextWithTX, payloadHash, data)
if err == nil {
// ctx passed with bbolt transaction
return s.notifyObservers(contextWithTX, transaction, data)
return s.notifyPayloadObservers(contextWithTX, transaction, data)
}
return err
})
Expand Down Expand Up @@ -311,16 +326,42 @@ func (s *state) Walk(ctx context.Context, visitor Visitor, startAt hash.SHA256Ha
}

// notifyObservers is called from a transactional context. The transactional observers need to be called with the TX context, the other observers after the commit.
func (s *state) notifyObservers(ctx context.Context, transaction Transaction, payload []byte) error {
func (s *state) notifyObservers(ctx context.Context, transaction Transaction) error {
// apply TX context observers
for _, observer := range s.transactionalObservers {
if err := observer(ctx, transaction, payload); err != nil {
if err := observer(ctx, transaction); err != nil {
return fmt.Errorf("observer notification failed: %w", err)
}
}

notifyNonTXObservers := func() {
for _, observer := range s.nonTransactionalObservers {
if err := observer(context.Background(), transaction); err != nil {
log.Logger().Errorf("observer notification failed: %v", err)
}
}
}
// check if there's an active transaction
tx, txIsActive := storage.BBoltTX(ctx)
if txIsActive { // sanity check because there should always be a transaction
tx.OnCommit(notifyNonTXObservers)
} else {
notifyNonTXObservers()
}
return nil
}

// notifyObservers is called from a transactional context. The transactional observers need to be called with the TX context, the other observers after the commit.
func (s *state) notifyPayloadObservers(ctx context.Context, transaction Transaction, payload []byte) error {
// apply TX context observers
for _, observer := range s.transactionalPayloadObservers {
if err := observer(ctx, transaction, payload); err != nil {
return fmt.Errorf("observer notification failed: %w", err)
}
}

notifyNonTXObservers := func() {
for _, observer := range s.nonTransactionalPayloadObservers {
if err := observer(context.Background(), transaction, payload); err != nil {
log.Logger().Errorf("observer notification failed: %v", err)
}
Expand Down
37 changes: 33 additions & 4 deletions network/dag/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestState_Observe(t *testing.T) {
ctx := context.Background()
txState := createState(t)
var actual bool
txState.RegisterObserver(func(ctx context.Context, transaction Transaction, _ []byte) error {
txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error {
_, actual = storage.BBoltTX(ctx)
return nil
}, expected)
Expand All @@ -187,7 +187,7 @@ func TestState_Observe(t *testing.T) {
ctx := context.Background()
txState := createState(t)
var actual Transaction
txState.RegisterObserver(func(ctx context.Context, transaction Transaction, _ []byte) error {
txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error {
actual = transaction
return nil
}, false)
Expand All @@ -203,8 +203,11 @@ func TestState_Observe(t *testing.T) {
txState := createState(t)
var actualTX Transaction
var actualPayload []byte
txState.RegisterObserver(func(ctx context.Context, transaction Transaction, payload []byte) error {
txState.RegisterTransactionObserver(func(ctx context.Context, transaction Transaction) error {
actualTX = transaction
return nil
}, false)
txState.RegisterPayloadObserver(func(ctx context.Context, transaction Transaction, payload []byte) error {
actualPayload = payload
return nil
}, false)
Expand All @@ -229,7 +232,7 @@ func TestState_Observe(t *testing.T) {
ctx := context.Background()
txState := createState(t)
var actual []byte
txState.RegisterObserver(func(ctx context.Context, tx Transaction, payload []byte) error {
txState.RegisterPayloadObserver(func(ctx context.Context, tx Transaction, payload []byte) error {
actual = payload
return nil
}, false)
Expand Down Expand Up @@ -363,6 +366,32 @@ func TestState_IBLT(t *testing.T) {
})
}

func TestState_treeObserver(t *testing.T) {
setup := func(t *testing.T) State {
txState := createState(t)
err := txState.Start()
if !assert.NoError(t, err) {
t.Fatal(err)
}
return txState
}
ctx := context.Background()

t.Run("callback for public transaction without payload", func(t *testing.T) {
txState := setup(t)
tx := CreateTestTransactionWithJWK(1)

err := txState.Add(ctx, tx, nil)

if !assert.NoError(t, err) {
return
}

xor, _ := txState.XOR(ctx, 1)
assert.False(t, hash.EmptyHash().Equals(xor))
})
}

func createState(t *testing.T, verifier ...Verifier) State {
testDir := io.TestDirectory(t)
s, _ := NewState(testDir, verifier...)
Expand Down
4 changes: 2 additions & 2 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func (n *Network) Configure(config core.ServerConfig) error {
)
}

// register callback from DAG to other engines.
n.state.RegisterObserver(n.emitEvents, true)
// register callback from DAG to other engines, with payload only.
n.state.RegisterPayloadObserver(n.emitEvents, true)

return nil
}
Expand Down
5 changes: 5 additions & 0 deletions network/network_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ func TestNetworkIntegration_PrivateTransaction(t *testing.T) {
return
}
waitForTransaction(t, tx, "node2")

// assert not only TX is transfered, but state is updates as well
xor1, _ := node1.state.XOR(context.Background(), math.MaxUint32)
xor2, _ := node2.state.XOR(context.Background(), math.MaxUint32)
assert.Equal(t, xor1.String(), xor2.String())
})

t.Run("event received", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions network/transport/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (p *protocol) Configure(_ transport.PeerID) error {
p.gManager.RegisterSender(p.sendGossip)

// called after DAG is committed
p.state.RegisterObserver(p.gossipTransaction, false)
p.state.RegisterTransactionObserver(p.gossipTransaction, false)

return nil
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (p *protocol) connectionStateCallback(peer transport.Peer, state transport.
}

// gossipTransaction is called when a transaction is added to the DAG
func (p *protocol) gossipTransaction(ctx context.Context, tx dag.Transaction, _ []byte) error {
func (p *protocol) gossipTransaction(ctx context.Context, tx dag.Transaction) error {
if tx != nil { // can happen when payload is written for private TX
xor, clock := p.state.XOR(ctx, math.MaxUint32)
p.gManager.TransactionRegistered(tx.Ref(), xor, clock)
Expand Down
6 changes: 3 additions & 3 deletions network/transport/v2/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestDefaultConfig(t *testing.T) {
func TestProtocol_Configure(t *testing.T) {
testDID, _ := did.ParseDID("did:nuts:123")
p, mocks := newTestProtocol(t, testDID)
mocks.State.EXPECT().RegisterObserver(gomock.Any(), false)
mocks.State.EXPECT().RegisterTransactionObserver(gomock.Any(), false)

assert.NoError(t, p.Configure(""))
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestProtocol_gossipTransaction(t *testing.T) {
t.Run("ok - no transaction", func(t *testing.T) {
proto, _ := newTestProtocol(t, nil)

proto.gossipTransaction(context.Background(), nil, nil)
proto.gossipTransaction(context.Background(), nil)
})

t.Run("ok - to gossipManager", func(t *testing.T) {
Expand All @@ -262,7 +262,7 @@ func TestProtocol_gossipTransaction(t *testing.T) {
mocks.State.EXPECT().XOR(context.Background(), uint32(math.MaxUint32))
mocks.Gossip.EXPECT().TransactionRegistered(tx.Ref(), hash.EmptyHash(), uint32(0))

proto.gossipTransaction(context.Background(), tx, nil)
proto.gossipTransaction(context.Background(), tx)
})
}

Expand Down

0 comments on commit ef9f678

Please sign in to comment.