From 67de36167984fa13ee713e7bda99f094ca0ae84d Mon Sep 17 00:00:00 2001 From: dwertent Date: Wed, 14 Aug 2024 17:16:45 -0400 Subject: [PATCH 1/4] Refactor maybePersistBlockchainEvent to return a boolean indicating if the event was created Signed-off-by: dwertent --- internal/events/blockchain_event.go | 9 ++-- internal/events/blockchain_event_test.go | 60 ++++++++++++++++-------- internal/events/event_manager_test.go | 2 +- internal/events/token_pool_created.go | 7 ++- internal/events/tokens_approved.go | 7 ++- internal/events/tokens_transferred.go | 7 ++- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index 8708c9cd31..55e6ba5bb5 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -71,20 +71,21 @@ func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context, return l, nil } -func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error { +// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate +func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) (bool, error) { existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) if err != nil { - return err + return false, err } if existing != nil { log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID) // Return the ID of the existing event chainEvent.ID = existing.ID - return nil + return false, nil } topic := em.getTopicForChainListener(listener) ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic) - return em.database.InsertEvent(ctx, ffEvent) + return true, em.database.InsertEvent(ctx, ffEvent) } func (em *eventManager) getChainListenerCached(cacheKey string, getter func() (*core.ContractListener, error)) (*core.ContractListener, error) { diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index 67c293a5cd..f35c80dfeb 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -152,31 +152,53 @@ func TestContractEventWrongNS(t *testing.T) { } func TestPersistBlockchainEventDuplicate(t *testing.T) { - em := newTestEventManager(t) - defer em.cleanup(t) + type testCase struct { + name string + event *core.BlockchainEvent + existingID *fftypes.UUID + expectedID *fftypes.UUID + expectedError error + expectedCreated bool + } - ev := &core.BlockchainEvent{ - ID: fftypes.NewUUID(), - Name: "Changed", - Namespace: "ns1", - ProtocolID: "10/20/30", - Output: fftypes.JSONObject{ - "value": "1", - }, - Info: fftypes.JSONObject{ - "blockNumber": "10", + testCases := []testCase{ + { + name: "Event already exists", + event: &core.BlockchainEvent{ + ID: fftypes.NewUUID(), + Name: "Changed", + Namespace: "ns1", + ProtocolID: "10/20/30", + Output: fftypes.JSONObject{ + "value": "1", + }, + Info: fftypes.JSONObject{ + "blockNumber": "10", + }, + Listener: fftypes.NewUUID(), + }, + existingID: fftypes.NewUUID(), + expectedID: fftypes.NewUUID(), + expectedError: nil, + expectedCreated: false, }, - Listener: fftypes.NewUUID(), + // TODO: Add test case for event not existing } - existingID := fftypes.NewUUID() - em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev). - Return(&core.BlockchainEvent{ID: existingID}, nil) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) - err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) - assert.NoError(t, err) - assert.Equal(t, existingID, ev.ID) + em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, tc.event). + Return(&core.BlockchainEvent{ID: tc.existingID}, tc.expectedError) + created, err := em.maybePersistBlockchainEvent(em.ctx, tc.event, nil) + assert.NoError(t, err) + assert.Equal(t, tc.existingID, tc.event.ID) + assert.Equal(t, tc.expectedCreated, created) + }) + } } func TestGetTopicForChainListenerFallback(t *testing.T) { diff --git a/internal/events/event_manager_test.go b/internal/events/event_manager_test.go index f634108c42..ac06c88c08 100644 --- a/internal/events/event_manager_test.go +++ b/internal/events/event_manager_test.go @@ -681,7 +681,7 @@ func TestEventFilterOnSubscriptionMatchesEventType(t *testing.T) { filteredEvents, _ = em.FilterHistoricalEventsOnSubscription(context.Background(), events, subscription) assert.NotNil(t, filteredEvents) assert.Equal(t, 1, len(filteredEvents)) - + listenerUuid := fftypes.NewUUID() events[0].Event.Topic = "" diff --git a/internal/events/token_pool_created.go b/internal/events/token_pool_created.go index dec69f4468..2fb4c1ea2c 100644 --- a/internal/events/token_pool_created.go +++ b/internal/events/token_pool_created.go @@ -62,10 +62,13 @@ func (em *eventManager) confirmPool(ctx context.Context, pool *core.TokenPool, e Type: pool.TX.Type, BlockchainID: blockchainID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return err } - em.emitBlockchainEventMetric(ev) + if created { + em.emitBlockchainEventMetric(ev) + } } if _, err := em.txHelper.PersistTransaction(ctx, pool.TX.ID, pool.TX.Type, blockchainID); err != nil { return err diff --git a/internal/events/tokens_approved.go b/internal/events/tokens_approved.go index f93b719681..a5f5275db7 100644 --- a/internal/events/tokens_approved.go +++ b/internal/events/tokens_approved.go @@ -97,10 +97,13 @@ func (em *eventManager) persistTokenApproval(ctx context.Context, approval *toke Type: approval.TX.Type, BlockchainID: approval.Event.BlockchainTXID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return false, err } - em.emitBlockchainEventMetric(approval.Event) + if created { + em.emitBlockchainEventMetric(approval.Event) + } approval.BlockchainEvent = chainEvent.ID fb := database.TokenApprovalQueryFactory.NewFilter(ctx) diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 74cccc89de..9cf98b872f 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -89,10 +89,13 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke Type: transfer.TX.Type, BlockchainID: transfer.Event.BlockchainTXID, }) - if err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil); err != nil { + created, err := em.maybePersistBlockchainEvent(ctx, chainEvent, nil) + if err != nil { return false, err } - em.emitBlockchainEventMetric(transfer.Event) + if created { + em.emitBlockchainEventMetric(transfer.Event) + } transfer.BlockchainEvent = chainEvent.ID // This is a no-op if we've already persisted this token transfer From 0ae11b91679dd9827cf2a34728fc82cd073c17ef Mon Sep 17 00:00:00 2001 From: David Wertenteil Date: Thu, 15 Aug 2024 10:09:22 -0400 Subject: [PATCH 2/4] Update internal/events/blockchain_event.go Co-authored-by: Enrique Lacal Signed-off-by: David Wertenteil Signed-off-by: dwertent --- internal/events/blockchain_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index 55e6ba5bb5..337cadcff2 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -71,7 +71,7 @@ func (em *eventManager) getChainListenerByProtocolIDCached(ctx context.Context, return l, nil } -// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate +// handleBlockchainBatchPinEvent handles a blockchain event, returning true if the event was created, false if it was a duplicate along with an error if any failures occur func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) (bool, error) { existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) if err != nil { From 55cf0592eaa48d0ff7058a93bc91438695855916 Mon Sep 17 00:00:00 2001 From: dwertent Date: Thu, 15 Aug 2024 10:19:58 -0400 Subject: [PATCH 3/4] Reverting test structure change Signed-off-by: dwertent --- internal/events/blockchain_event_test.go | 62 ++++++++---------------- 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index f35c80dfeb..4823eff538 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -151,54 +151,34 @@ func TestContractEventWrongNS(t *testing.T) { } +// TODO: Add test case for event not existing func TestPersistBlockchainEventDuplicate(t *testing.T) { - type testCase struct { - name string - event *core.BlockchainEvent - existingID *fftypes.UUID - expectedID *fftypes.UUID - expectedError error - expectedCreated bool - } + em := newTestEventManager(t) + defer em.cleanup(t) - testCases := []testCase{ - { - name: "Event already exists", - event: &core.BlockchainEvent{ - ID: fftypes.NewUUID(), - Name: "Changed", - Namespace: "ns1", - ProtocolID: "10/20/30", - Output: fftypes.JSONObject{ - "value": "1", - }, - Info: fftypes.JSONObject{ - "blockNumber": "10", - }, - Listener: fftypes.NewUUID(), - }, - existingID: fftypes.NewUUID(), - expectedID: fftypes.NewUUID(), - expectedError: nil, - expectedCreated: false, + ev := &core.BlockchainEvent{ + ID: fftypes.NewUUID(), + Name: "Changed", + Namespace: "ns1", + ProtocolID: "10/20/30", + Output: fftypes.JSONObject{ + "value": "1", + }, + Info: fftypes.JSONObject{ + "blockNumber": "10", }, - // TODO: Add test case for event not existing + Listener: fftypes.NewUUID(), } + existingID := fftypes.NewUUID() - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - em := newTestEventManager(t) - defer em.cleanup(t) + em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev). + Return(&core.BlockchainEvent{ID: existingID}, nil) - em.mth.On("InsertOrGetBlockchainEvent", mock.Anything, tc.event). - Return(&core.BlockchainEvent{ID: tc.existingID}, tc.expectedError) + created, err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) + assert.NoError(t, err) + assert.Equal(t, existingID, ev.ID) + assert.False(t, created) - created, err := em.maybePersistBlockchainEvent(em.ctx, tc.event, nil) - assert.NoError(t, err) - assert.Equal(t, tc.existingID, tc.event.ID) - assert.Equal(t, tc.expectedCreated, created) - }) - } } func TestGetTopicForChainListenerFallback(t *testing.T) { From a6ae64c55bc5859dd00d4e9fc0e3295fa980a45c Mon Sep 17 00:00:00 2001 From: dwertent Date: Fri, 16 Aug 2024 09:51:48 -0400 Subject: [PATCH 4/4] Bump copyright year Signed-off-by: dwertent --- internal/events/tokens_approved.go | 2 +- internal/events/tokens_transferred.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/events/tokens_approved.go b/internal/events/tokens_approved.go index a5f5275db7..272641f546 100644 --- a/internal/events/tokens_approved.go +++ b/internal/events/tokens_approved.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/internal/events/tokens_transferred.go b/internal/events/tokens_transferred.go index 9cf98b872f..c4048611f3 100644 --- a/internal/events/tokens_transferred.go +++ b/internal/events/tokens_transferred.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 //