Skip to content

Commit

Permalink
Add Basic Metering Report to Workflow Engine
Browse files Browse the repository at this point in the history
This commit adds a metering report struct to the workflow engine such that each step can be individually added to the
report. Metering units and values are generally treated as string values, but post values are verified to be valid
numeric.

The metering report can provide a calculated median value for all posted units.
  • Loading branch information
EasterTheBunny committed Feb 13, 2025
1 parent f9c3869 commit 16f124f
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 0 deletions.
9 changes: 9 additions & 0 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type Engine struct {

clock clockwork.Clock
ratelimiter *ratelimiter.RateLimiter
meterReport *MeteringReport
}

func (e *Engine) Start(_ context.Context) error {
Expand Down Expand Up @@ -545,6 +546,8 @@ func generateExecutionID(workflowID, eventID string) (string, error) {

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, event *values.Map) error {
e.meterReport = NewMeteringReport()

lggr := e.logger.With("event", event, platform.KeyWorkflowExecutionID, executionID)
lggr.Debug("executing on a trigger event")
ec := &store.WorkflowExecution{
Expand Down Expand Up @@ -616,6 +619,12 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
return err
}

e.meterReport.AddStep(MeteringReportStepRef(stepUpdate.ExecutionID), MeteringReportStep{
Peer2PeerID: "TODO",
SpendUnit: "TODO",
SpendValue: "TODO",

Check failure on line 625 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

Error return value of `e.meterReport.AddStep` is not checked (errcheck)
})

if workflowIsFullyProcessed {
switch status {
case store.StatusTimeout:
Expand Down
103 changes: 103 additions & 0 deletions core/services/workflows/metering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package workflows

import (
"errors"
"fmt"
"sort"
"sync"

"github.com/shopspring/decimal"
)

var (
ErrInvalidMeteringSpendValue = errors.New("invalid metering spend value")
)

type MeteringReportStepRef string

type MeteringSpendUnit string

func (s MeteringSpendUnit) String() string {
return string(s)
}

func (s MeteringSpendUnit) DecimalToSpendValue(value decimal.Decimal) MeteringSpendValue {
return MeteringSpendValue(value.String())
}

type MeteringSpendValue string

func (s MeteringSpendValue) String() string {
return string(s)
}

type MeteringReportStep struct {
Peer2PeerID string
SpendUnit MeteringSpendUnit
SpendValue MeteringSpendValue
}

func (s MeteringReportStep) Value() (decimal.Decimal, error) {
return decimal.NewFromString(s.SpendValue.String())
}

type MeteringReport struct {
mu sync.RWMutex
steps map[MeteringReportStepRef]MeteringReportStep
}

func NewMeteringReport() *MeteringReport {
return &MeteringReport{
steps: make(map[MeteringReportStepRef]MeteringReportStep),
}
}

func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue {
r.mu.RLock()
defer r.mu.RUnlock()

values := map[MeteringSpendUnit][]decimal.Decimal{}
medians := map[MeteringSpendUnit]MeteringSpendValue{}

for _, step := range r.steps {
vals, ok := values[step.SpendUnit]
if !ok {
vals = []decimal.Decimal{}
}

// ignoring the error here should be safe as long as AddStep verifies parsing
value, _ := step.Value()

values[step.SpendUnit] = append(vals, value)
}

for unit, set := range values {
sort.Slice(set, func(i, j int) bool {
return set[j].GreaterThan(set[i])
})

if len(set)%2 > 0 {
medians[unit] = unit.DecimalToSpendValue(set[len(set)/2])

continue
}

avg := set[len(set)/2-1].Add(set[len(set)/2]).Div(decimal.NewFromInt(2))
medians[unit] = unit.DecimalToSpendValue(avg)
}

return medians
}

func (r *MeteringReport) AddStep(ref MeteringReportStepRef, step MeteringReportStep) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, err := step.Value(); err != nil {
return fmt.Errorf("%w: %w", ErrInvalidMeteringSpendValue, err)
}

r.steps[ref] = step

return nil
}
117 changes: 117 additions & 0 deletions core/services/workflows/metering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package workflows_test

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
)

func TestMeteringReport(t *testing.T) {
t.Parallel()

t.Run("AddStep returns error for invalid value", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
step := workflows.MeteringReportStep{
Peer2PeerID: "abc",
SpendUnit: "a",
SpendValue: "not a value",
}

err := report.AddStep(workflows.MeteringReportStepRef("42"), step)

require.ErrorIs(t, err, workflows.ErrInvalidMeteringSpendValue)
})

t.Run("MedianSpend returns median for multiple spend units", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
{"xyz", "a", "2.0"},
{"abc", "a", "3.0"},
{"abc", "b", "0.1"},
{"xyz", "b", "0.2"},
{"abc", "b", "0.3"},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2",
"b": "0.2",
}

assert.Equal(t, expected, report.MedianSpend())
})

t.Run("MedianSpend returns median single spend value", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "1",
}

assert.Equal(t, expected, report.MedianSpend())
})

t.Run("MedianSpend returns median odd number of spend values", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
{"abc", "a", "3.0"},
{"xyz", "a", "2.0"},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2",
}

assert.Equal(t, expected, report.MedianSpend())
})

t.Run("MedianSpend returns median as average for even number of spend values", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"xyz", "a", "42.0"},
{"abc", "a", "1.0"},
{"abc", "a", "3.0"},
{"xyz", "a", "2.0"},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2.5",
}

assert.Equal(t, expected, report.MedianSpend())
})
}

0 comments on commit 16f124f

Please sign in to comment.