diff --git a/internal/events/event_enrich.go b/internal/events/event_enrich.go index 2036b202d..9b986ae00 100644 --- a/internal/events/event_enrich.go +++ b/internal/events/event_enrich.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -44,6 +44,18 @@ func newEventEnricher(ns string, di database.Plugin, dm data.Manager, om operati } } +func (em *eventEnricher) enrichEvents(ctx context.Context, events []*core.Event) ([]*core.EnrichedEvent, error) { + enriched := make([]*core.EnrichedEvent, len(events)) + for i, event := range events { + enrichedEvent, err := em.enrichEvent(ctx, event) + if err != nil { + return nil, err + } + enriched[i] = enrichedEvent + } + return enriched, nil +} + func (em *eventEnricher) enrichEvent(ctx context.Context, event *core.Event) (*core.EnrichedEvent, error) { e := &core.EnrichedEvent{ Event: *event, diff --git a/internal/events/event_enrich_test.go b/internal/events/event_enrich_test.go index 4c708cf7a..d98e7773f 100644 --- a/internal/events/event_enrich_test.go +++ b/internal/events/event_enrich_test.go @@ -70,6 +70,33 @@ func TestEnrichMessageConfirmed(t *testing.T) { assert.Equal(t, ref1, enriched.Message.Header.ID) } +func TestEnrichEventsMessageConfirmed(t *testing.T) { + em := newTestEventManager(t) + defer em.cleanup(t) + ctx := context.Background() + + // Setup the IDs + ref1 := fftypes.NewUUID() + ev1 := fftypes.NewUUID() + + // Setup enrichment + em.mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{ + Header: core.MessageHeader{ID: ref1}, + }, nil, true, nil) + + event := []*core.Event{ + { + ID: ev1, + Type: core.EventTypeMessageConfirmed, + Reference: ref1, + }, + } + + enriched, err := em.EnrichEvents(ctx, event) + assert.NoError(t, err) + assert.Equal(t, ref1, enriched[0].Message.Header.ID) +} + func TestEnrichMessageFail(t *testing.T) { em := newTestEventEnricher() ctx := context.Background() @@ -613,3 +640,63 @@ func TestEnrichOperationFail(t *testing.T) { _, err := em.enrichEvent(ctx, event) assert.EqualError(t, err, "pop") } + +func TestEnrichEventsFails(t *testing.T) { + em := newTestEventEnricher() + ctx := context.Background() + + ev1 := fftypes.NewUUID() + ev2 := fftypes.NewUUID() + ref1 := fftypes.NewUUID() + + // Setup enrichment + mom := em.operations.(*operationmocks.Manager) + mom.On("GetOperationByIDCached", mock.Anything, mock.Anything).Return(&core.Operation{ + ID: ref1, + }, nil).Once() + mom.On("GetOperationByIDCached", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")) + + events := []*core.Event{ + { + ID: ev1, + Type: core.EventTypeApprovalOpFailed, + }, + { + ID: ev2, + Type: core.EventTypeApprovalOpFailed, + }, + } + + _, err := em.enrichEvents(ctx, events) + assert.EqualError(t, err, "pop") +} + +func TestEnrichEventsOK(t *testing.T) { + em := newTestEventEnricher() + ctx := context.Background() + + ev1 := fftypes.NewUUID() + ev2 := fftypes.NewUUID() + ref1 := fftypes.NewUUID() + + // Setup enrichment + mom := em.operations.(*operationmocks.Manager) + mom.On("GetOperationByIDCached", mock.Anything, mock.Anything).Return(&core.Operation{ + ID: ref1, + }, nil) + + events := []*core.Event{ + { + ID: ev1, + Type: core.EventTypeApprovalOpFailed, + }, + { + ID: ev2, + Type: core.EventTypeApprovalOpFailed, + }, + } + + result, err := em.enrichEvents(ctx, events) + assert.Nil(t, err) + assert.Equal(t, 2, len(result)) +} diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 8573c3c14..9a57bda10 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -211,10 +211,6 @@ func (em *eventManager) DeletedSubscriptions() chan<- *fftypes.UUID { return em.subManager.deletedSubscriptions } -func (em *eventManager) ParseSubscriptionDef(ctx context.Context, sub *core.Subscription) (*subscription, error) { - return em.subManager.parseSubscriptionDef(ctx, sub) -} - func (em *eventManager) ResolveTransportAndCapabilities(ctx context.Context, transportName string) (string, *events.Capabilities, error) { if transportName == "" { transportName = em.defaultTransport @@ -307,15 +303,7 @@ func (em *eventManager) EnrichEvent(ctx context.Context, event *core.Event) (*co } func (em *eventManager) EnrichEvents(ctx context.Context, events []*core.Event) ([]*core.EnrichedEvent, error) { - enriched := make([]*core.EnrichedEvent, len(events)) - for i, event := range events { - enrichedEvent, err := em.EnrichEvent(ctx, event) - if err != nil { - return nil, err - } - enriched[i] = enrichedEvent - } - return enriched, nil + return em.enricher.enrichEvents(ctx, events) } func (em *eventManager) QueueBatchRewind(batchID *fftypes.UUID) { diff --git a/internal/orchestrator/subscriptions_test.go b/internal/orchestrator/subscriptions_test.go index 24eccd9d7..169f254ff 100644 --- a/internal/orchestrator/subscriptions_test.go +++ b/internal/orchestrator/subscriptions_test.go @@ -359,6 +359,28 @@ func TestGetSGetSubscriptionsByIDWithStatusUnknownSub(t *testing.T) { assert.Nil(t, subWithStatus) } +func generateFakeEvents(eventCount int) ([]*core.Event, []*core.EnrichedEvent) { + baseEvents := []*core.Event{} + enrichedEvents := []*core.EnrichedEvent{} + baseEvent := &core.Event{ + Type: core.EventTypeIdentityConfirmed, + Topic: "Topic1", + } + enrichedEvent := &core.EnrichedEvent{ + Event: *baseEvent, + BlockchainEvent: &core.BlockchainEvent{ + Namespace: "ns1", + }, + } + + for i := 0; i < eventCount; i++ { + baseEvents = append(baseEvents, baseEvent) + enrichedEvents = append(enrichedEvents, enrichedEvent) + } + + return baseEvents, enrichedEvents +} + func TestGetHistoricalEventsForSubscription(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) @@ -449,28 +471,6 @@ func TestGetHistoricalEventsForSubscriptionBadQueryFilter(t *testing.T) { assert.NotNil(t, err) } -func generateFakeEvents(eventCount int) ([]*core.Event, []*core.EnrichedEvent) { - baseEvents := []*core.Event{} - enrichedEvents := []*core.EnrichedEvent{} - baseEvent := &core.Event{ - Type: core.EventTypeIdentityConfirmed, - Topic: "Topic1", - } - enrichedEvent := &core.EnrichedEvent{ - Event: *baseEvent, - BlockchainEvent: &core.BlockchainEvent{ - Namespace: "ns1", - }, - } - - for i := 0; i < eventCount; i++ { - baseEvents = append(baseEvents, baseEvent) - enrichedEvents = append(enrichedEvents, enrichedEvent) - } - - return baseEvents, enrichedEvents -} - func TestGetHistoricalEventsForSubscriptionGettingHistoricalEventsThrows(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t)