Skip to content

Commit

Permalink
[CAPPL-523] Filter events by DON ID
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Feb 17, 2025
1 parent 4c51787 commit dd23def
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,83 @@ func Test_SecretsWorker(t *testing.T) {
}, tests.WaitTimeout(t), time.Second)
}

func Test_RegistrySyncer_SkipsEventsNotBelongingToDON(t *testing.T) {
var (
lggr = logger.TestLogger(t)
backendTH = testutils.NewEVMBackendTH(t)

giveTicker = time.NewTicker(500 * time.Millisecond)
giveBinaryURL = "https://original-url.com"
donID = uint32(1)
otherDonID = uint32(2)
skippedWorkflow = RegisterWorkflowCMD{
Name: "test-wf2",
DonID: otherDonID,
Status: uint8(1),
BinaryURL: giveBinaryURL,
}
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(1),
BinaryURL: "someurl",
}
wantContents = "updated contents"
)

defer giveTicker.Stop()

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

from := [20]byte(backendTH.ContractsOwner.From)
id, err := pkgworkflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte(""), "")
require.NoError(t, err)
giveWorkflow.ID = id

from = [20]byte(backendTH.ContractsOwner.From)
id, err = pkgworkflows.GenerateWorkflowID(from[:], "test-wf", []byte(wantContents), []byte("dummy config"), "")
require.NoError(t, err)
skippedWorkflow.ID = id

handler := newTestEvtHandler()

worker := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
},
syncer.WithTicker(giveTicker.C),
)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID, otherDonID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

servicetest.Run(t, worker)

// generate a log event
registerWorkflow(t, backendTH, wfRegistryC, skippedWorkflow)
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

require.Eventually(t, func() bool {
// we process events in order, and should only receive 1 event
// the first is skipped as it belongs to another don.
return len(handler.GetEvents()) == 1
}, tests.WaitTimeout(t), time.Second)
}

func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
var (
ctx = coretestutils.Context(t)
Expand Down
29 changes: 26 additions & 3 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type WorkflowRegistryEvent struct {
Data any
EventType WorkflowRegistryEventType
Head Head
DonID *uint32
}

func (we WorkflowRegistryEvent) GetEventType() WorkflowRegistryEventType {
Expand Down Expand Up @@ -231,7 +232,7 @@ func (w *workflowRegistry) Start(_ context.Context) error {
}
}

w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height)
w.readRegistryEvents(ctx, don, reader, loadWorkflowsHead.Height)
}()

return nil
Expand Down Expand Up @@ -259,7 +260,7 @@ func (w *workflowRegistry) Name() string {
}

// readRegistryEvents polls the contract for events and send them to the events channel.
func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader ContractReader, lastReadBlockNumber string) {
func (w *workflowRegistry) readRegistryEvents(ctx context.Context, don capabilities.DON, reader ContractReader, lastReadBlockNumber string) {
ticker := w.getTicker()

var keyQueries = make([]types.ContractKeyFilter, 0, len(w.eventTypes))
Expand Down Expand Up @@ -325,7 +326,24 @@ func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader Contra
continue
}

events = append(events, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr))
event := toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr)

if event.Event.DonID == nil {
// event is missing a DonID, so don't filter it out;
// it applies to all Dons
events = append(events, event)
} else if *event.Event.DonID == don.ID {
// event has a DonID and matches, so it applies to this DON.
events = append(events, event)
} else {
// event doesn't match, let's skip it
donID := "MISSING_DON_ID"
if event.Event.DonID != nil {
donID = fmt.Sprintf("%d", *event.Event.DonID)
}
w.lggr.Debugw("event belongs to a different don, skipping...", "don", don.ID, "gotDON", donID)
}

cursor = log.Sequence.Cursor
}

Expand Down Expand Up @@ -540,6 +558,7 @@ func toWorkflowRegistryEventResponse(
return resp
}
resp.Event.Data = data
resp.Event.DonID = &data.DonID
case WorkflowUpdatedEvent:
var data WorkflowRegistryWorkflowUpdatedV1
if err := dataAsValuesMap.UnwrapTo(&data); err != nil {
Expand All @@ -549,6 +568,7 @@ func toWorkflowRegistryEventResponse(
return resp
}
resp.Event.Data = data
resp.Event.DonID = &data.DonID
case WorkflowPausedEvent:
var data WorkflowRegistryWorkflowPausedV1
if err := dataAsValuesMap.UnwrapTo(&data); err != nil {
Expand All @@ -558,6 +578,7 @@ func toWorkflowRegistryEventResponse(
return resp
}
resp.Event.Data = data
resp.Event.DonID = &data.DonID
case WorkflowActivatedEvent:
var data WorkflowRegistryWorkflowActivatedV1
if err := dataAsValuesMap.UnwrapTo(&data); err != nil {
Expand All @@ -567,6 +588,7 @@ func toWorkflowRegistryEventResponse(
return resp
}
resp.Event.Data = data
resp.Event.DonID = &data.DonID
case WorkflowDeletedEvent:
var data WorkflowRegistryWorkflowDeletedV1
if err := dataAsValuesMap.UnwrapTo(&data); err != nil {
Expand All @@ -576,6 +598,7 @@ func toWorkflowRegistryEventResponse(
return resp
}
resp.Event.Data = data
resp.Event.DonID = &data.DonID
default:
lggr.Errorf("unknown event type: %s", evt)
resp.Event = nil
Expand Down

0 comments on commit dd23def

Please sign in to comment.