From 16f124f7a208f4039eef19d5f609ef5d8ac8be42 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Thu, 13 Feb 2025 13:32:58 -0600 Subject: [PATCH] Add Basic Metering Report to Workflow Engine This commit adds a metering report struct to the workflow engine such that each step can be individually added to the report. Metering units and values are generally treated as string values, but post values are verified to be valid numeric. The metering report can provide a calculated median value for all posted units. --- core/services/workflows/engine.go | 9 ++ core/services/workflows/metering.go | 103 ++++++++++++++++++++ core/services/workflows/metering_test.go | 117 +++++++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 core/services/workflows/metering.go create mode 100644 core/services/workflows/metering_test.go diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 4a4e45b192f..3313977602e 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -141,6 +141,7 @@ type Engine struct { clock clockwork.Clock ratelimiter *ratelimiter.RateLimiter + meterReport *MeteringReport } func (e *Engine) Start(_ context.Context) error { @@ -545,6 +546,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) { // startExecution kicks off a new workflow execution when a trigger event is received. func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error { + e.meterReport = NewMeteringReport() + lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID) lggr.Debug("executing on a trigger event") ec := &store.WorkflowExecution{ @@ -616,6 +619,12 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow return err } + e.meterReport.AddStep(MeteringReportStepRef(stepUpdate.ExecutionID), MeteringReportStep{ + Peer2PeerID: "TODO", + SpendUnit: "TODO", + SpendValue: "TODO", + }) + if workflowIsFullyProcessed { switch status { case store.StatusTimeout: diff --git a/core/services/workflows/metering.go b/core/services/workflows/metering.go new file mode 100644 index 00000000000..d944ffd0213 --- /dev/null +++ b/core/services/workflows/metering.go @@ -0,0 +1,103 @@ +package workflows + +import ( + "errors" + "fmt" + "sort" + "sync" + + "github.com/shopspring/decimal" +) + +var ( + ErrInvalidMeteringSpendValue = errors.New("invalid metering spend value") +) + +type MeteringReportStepRef string + +type MeteringSpendUnit string + +func (s MeteringSpendUnit) String() string { + return string(s) +} + +func (s MeteringSpendUnit) DecimalToSpendValue(value decimal.Decimal) MeteringSpendValue { + return MeteringSpendValue(value.String()) +} + +type MeteringSpendValue string + +func (s MeteringSpendValue) String() string { + return string(s) +} + +type MeteringReportStep struct { + Peer2PeerID string + SpendUnit MeteringSpendUnit + SpendValue MeteringSpendValue +} + +func (s MeteringReportStep) Value() (decimal.Decimal, error) { + return decimal.NewFromString(s.SpendValue.String()) +} + +type MeteringReport struct { + mu sync.RWMutex + steps map[MeteringReportStepRef]MeteringReportStep +} + +func NewMeteringReport() *MeteringReport { + return &MeteringReport{ + steps: make(map[MeteringReportStepRef]MeteringReportStep), + } +} + +func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue { + r.mu.RLock() + defer r.mu.RUnlock() + + values := map[MeteringSpendUnit][]decimal.Decimal{} + medians := map[MeteringSpendUnit]MeteringSpendValue{} + + for _, step := range r.steps { + vals, ok := values[step.SpendUnit] + if !ok { + vals = []decimal.Decimal{} + } + + // ignoring the error here should be safe as long as AddStep verifies parsing + value, _ := step.Value() + + values[step.SpendUnit] = append(vals, value) + } + + for unit, set := range values { + sort.Slice(set, func(i, j int) bool { + return set[j].GreaterThan(set[i]) + }) + + if len(set)%2 > 0 { + medians[unit] = unit.DecimalToSpendValue(set[len(set)/2]) + + continue + } + + avg := set[len(set)/2-1].Add(set[len(set)/2]).Div(decimal.NewFromInt(2)) + medians[unit] = unit.DecimalToSpendValue(avg) + } + + return medians +} + +func (r *MeteringReport) AddStep(ref MeteringReportStepRef, step MeteringReportStep) error { + r.mu.Lock() + defer r.mu.Unlock() + + if _, err := step.Value(); err != nil { + return fmt.Errorf("%w: %w", ErrInvalidMeteringSpendValue, err) + } + + r.steps[ref] = step + + return nil +} diff --git a/core/services/workflows/metering_test.go b/core/services/workflows/metering_test.go new file mode 100644 index 00000000000..7211abd33c2 --- /dev/null +++ b/core/services/workflows/metering_test.go @@ -0,0 +1,117 @@ +package workflows_test + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" +) + +func TestMeteringReport(t *testing.T) { + t.Parallel() + + t.Run("AddStep returns error for invalid value", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + step := workflows.MeteringReportStep{ + Peer2PeerID: "abc", + SpendUnit: "a", + SpendValue: "not a value", + } + + err := report.AddStep(workflows.MeteringReportStepRef("42"), step) + + require.ErrorIs(t, err, workflows.ErrInvalidMeteringSpendValue) + }) + + t.Run("MedianSpend returns median for multiple spend units", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + {"xyz", "a", "2.0"}, + {"abc", "a", "3.0"}, + {"abc", "b", "0.1"}, + {"xyz", "b", "0.2"}, + {"abc", "b", "0.3"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2", + "b": "0.2", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median single spend value", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "1", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median odd number of spend values", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"abc", "a", "1.0"}, + {"abc", "a", "3.0"}, + {"xyz", "a", "2.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) + + t.Run("MedianSpend returns median as average for even number of spend values", func(t *testing.T) { + t.Parallel() + + report := workflows.NewMeteringReport() + steps := []workflows.MeteringReportStep{ + {"xyz", "a", "42.0"}, + {"abc", "a", "1.0"}, + {"abc", "a", "3.0"}, + {"xyz", "a", "2.0"}, + } + + for idx, step := range steps { + require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step)) + } + + expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{ + "a": "2.5", + } + + assert.Equal(t, expected, report.MedianSpend()) + }) +}