diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 4a4e45b192f..3313977602e 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -141,6 +141,7 @@ type Engine struct { clock clockwork.Clock ratelimiter *ratelimiter.RateLimiter + meterReport *MeteringReport } func (e *Engine) Start(_ context.Context) error { @@ -545,6 +546,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) { // startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error { + e.meterReport = NewMeteringReport() + lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID) lggr.Debug("executing on a trigger event") ec := &store.WorkflowExecution{ @@ -616,6 +619,12 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow return err } + e.meterReport.AddStep(MeteringReportStepRef(stepUpdate.ExecutionID), MeteringReportStep{ + Peer2PeerID: "TODO", + SpendUnit: "TODO", + SpendValue: "TODO", + }) + if workflowIsFullyProcessed { switch status { case store.StatusTimeout: diff --git a/core/services/workflows/metering.go b/core/services/workflows/metering.go new file mode 100644 index 00000000000..d944ffd0213 --- /dev/null +++ b/core/services/workflows/metering.go @@ -0,0 +1,103 @@ +package workflows + +import ( + "errors" + "fmt" + "sort" + "sync" + + "github.com/shopspring/decimal" +) + +var ( + ErrInvalidMeteringSpendValue = errors.New("invalid metering spend value") +) + +type MeteringReportStepRef string + +type MeteringSpendUnit string + +func (s MeteringSpendUnit) String() string { + return string(s) +} + +func (s MeteringSpendUnit) DecimalToSpendValue(value decimal.Decimal) MeteringSpendValue { + return MeteringSpendValue(value.String()) +} + +type MeteringSpendValue string + +func (s MeteringSpendValue) String() string { + return string(s) +} + +type MeteringReportStep struct { + Peer2PeerID string + SpendUnit MeteringSpendUnit + SpendValue MeteringSpendValue +} + +func (s MeteringReportStep) Value() (decimal.Decimal, error) { + return decimal.NewFromString(s.SpendValue.String()) +} + +type MeteringReport struct { + mu sync.RWMutex + steps map[MeteringReportStepRef]MeteringReportStep +} + +func NewMeteringReport() *MeteringReport { + return &MeteringReport{ + steps: make(map[MeteringReportStepRef]MeteringReportStep), + } +} + +func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue { + r.mu.RLock() + defer r.mu.RUnlock() + + values := map[MeteringSpendUnit][]decimal.Decimal{} + medians := map[MeteringSpendUnit]MeteringSpendValue{} + + for _, step := range r.steps { + vals, ok := values[step.SpendUnit] + if !ok { + vals = []decimal.Decimal{} + } + + // ignoring the error here should be safe as long as AddStep verifies parsing + value, _ := step.Value() + + values[step.SpendUnit] = append(vals, value) + } + + for unit, set := range values { + sort.Slice(set, func(i, j int) bool { + return set[j].GreaterThan(set[i]) + }) + + if len(set)%2 > 0 { + medians[unit] = unit.DecimalToSpendValue(set[len(set)/2]) + + continue + } + + avg := set[len(set)/2-1].Add(set[len(set)/2]).Div(decimal.NewFromInt(2)) + medians[unit] = unit.DecimalToSpendValue(avg) + } + + return medians +} + +func (r *MeteringReport) AddStep(ref MeteringReportStepRef, step MeteringReportStep) error { + r.mu.Lock() + defer r.mu.Unlock() + + if _, err := step.Value(); err != nil { + return fmt.Errorf("%w: %w", ErrInvalidMeteringSpendValue, err) + } + + r.steps[ref] = step + + return nil +} diff --git a/core/services/workflows/metering_test.go b/core/services/workflows/metering_test.go new file mode 100644 index 00000000000..7211abd33c2 --- /dev/null +++ b/core/services/workflows/metering_test.go @@ -0,0 +1,117 @@ +package workflows_test + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" +) + +func TestMeteringReport(t *testing.T) { + t.Parallel() + + t.Run("AddStep returns error for invalid value", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + step := workflows.MeteringReportStep{ + Peer2PeerID: "abc", + SpendUnit: "a", + SpendValue: "not a value", + } + + err := report.AddStep(workflows.MeteringReportStepRef("42"), step) + + require.ErrorIs(t, err, workflows.ErrInvalidMeteringSpendValue) + }) + + t.Run("MedianSpend returns median for multiple spend units", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + {"xyz", "a", "2.0"}, + {"abc", "a", "3.0"}, + {"abc", "b", "0.1"}, + {"xyz", "b", "0.2"}, + {"abc", "b", "0.3"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2", + "b": "0.2", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median single spend value", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "1", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median odd number of spend values", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + {"abc", "a", "3.0"}, + {"xyz", "a", "2.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median as average for even number of spend values", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"xyz", "a", "42.0"}, + {"abc", "a", "1.0"}, + {"abc", "a", "3.0"}, + {"xyz", "a", "2.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2.5", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) +}