Skip to content

Commit

Permalink
update mathematical operations on metering spend value
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Feb 25, 2025
1 parent 16f124f commit bfbfcf2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 66 deletions.
6 changes: 0 additions & 6 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,6 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
return err
}

e.meterReport.AddStep(MeteringReportStepRef(stepUpdate.ExecutionID), MeteringReportStep{
Peer2PeerID: "TODO",
SpendUnit: "TODO",
SpendValue: "TODO",
})

if workflowIsFullyProcessed {
switch status {
case store.StatusTimeout:
Expand Down
61 changes: 34 additions & 27 deletions core/services/workflows/metering.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package workflows

import (
"errors"
"fmt"
"sort"
"sync"

"github.com/shopspring/decimal"
)

var (
ErrInvalidMeteringSpendValue = errors.New("invalid metering spend value")
)

type MeteringReportStepRef string

type MeteringSpendUnit string
Expand All @@ -22,13 +16,38 @@ func (s MeteringSpendUnit) String() string {
}

func (s MeteringSpendUnit) DecimalToSpendValue(value decimal.Decimal) MeteringSpendValue {
return MeteringSpendValue(value.String())
return MeteringSpendValue{value: value, roundingPlace: 18}
}

type MeteringSpendValue string
func (s MeteringSpendUnit) IntToSpendValue(value int64) MeteringSpendValue {
return MeteringSpendValue{value: decimal.NewFromInt(value), roundingPlace: 18}
}

func (s MeteringSpendValue) String() string {
return string(s)
type MeteringSpendValue struct {
value decimal.Decimal
roundingPlace uint8
}

func (v MeteringSpendValue) Add(value MeteringSpendValue) MeteringSpendValue {
return MeteringSpendValue{
value: v.value.Add(value.value),
roundingPlace: v.roundingPlace,
}
}

func (v MeteringSpendValue) Div(value MeteringSpendValue) MeteringSpendValue {
return MeteringSpendValue{
value: v.value.Div(value.value),
roundingPlace: v.roundingPlace,
}
}

func (v MeteringSpendValue) GreaterThan(value MeteringSpendValue) bool {
return v.value.GreaterThan(v.value)
}

func (v MeteringSpendValue) String() string {
return v.value.StringFixedBank(int32(v.roundingPlace))
}

type MeteringReportStep struct {
Expand All @@ -37,10 +56,6 @@ type MeteringReportStep struct {
SpendValue MeteringSpendValue
}

func (s MeteringReportStep) Value() (decimal.Decimal, error) {
return decimal.NewFromString(s.SpendValue.String())
}

type MeteringReport struct {
mu sync.RWMutex
steps map[MeteringReportStepRef]MeteringReportStep
Expand All @@ -56,19 +71,16 @@ func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue
r.mu.RLock()
defer r.mu.RUnlock()

values := map[MeteringSpendUnit][]decimal.Decimal{}
values := map[MeteringSpendUnit][]MeteringSpendValue{}
medians := map[MeteringSpendUnit]MeteringSpendValue{}

for _, step := range r.steps {
vals, ok := values[step.SpendUnit]
if !ok {
vals = []decimal.Decimal{}
vals = []MeteringSpendValue{}
}

// ignoring the error here should be safe as long as AddStep verifies parsing
value, _ := step.Value()

values[step.SpendUnit] = append(vals, value)
values[step.SpendUnit] = append(vals, step.SpendValue)
}

for unit, set := range values {
Expand All @@ -77,13 +89,12 @@ func (r *MeteringReport) MedianSpend() map[MeteringSpendUnit]MeteringSpendValue
})

if len(set)%2 > 0 {
medians[unit] = unit.DecimalToSpendValue(set[len(set)/2])
medians[unit] = set[len(set)/2]

continue
}

avg := set[len(set)/2-1].Add(set[len(set)/2]).Div(decimal.NewFromInt(2))
medians[unit] = unit.DecimalToSpendValue(avg)
medians[unit] = set[len(set)/2-1].Add(set[len(set)/2]).Div(unit.IntToSpendValue(2))
}

return medians
Expand All @@ -93,10 +104,6 @@ func (r *MeteringReport) AddStep(ref MeteringReportStepRef, step MeteringReportS
r.mu.Lock()
defer r.mu.Unlock()

if _, err := step.Value(); err != nil {
return fmt.Errorf("%w: %w", ErrInvalidMeteringSpendValue, err)
}

r.steps[ref] = step

return nil
Expand Down
55 changes: 22 additions & 33 deletions core/services/workflows/metering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strconv"
"testing"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -13,41 +14,29 @@ import (
func TestMeteringReport(t *testing.T) {
t.Parallel()

t.Run("AddStep returns error for invalid value", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
step := workflows.MeteringReportStep{
Peer2PeerID: "abc",
SpendUnit: "a",
SpendValue: "not a value",
}

err := report.AddStep(workflows.MeteringReportStepRef("42"), step)

require.ErrorIs(t, err, workflows.ErrInvalidMeteringSpendValue)
})
testUnitA := workflows.MeteringSpendUnit("a")
testUnitB := workflows.MeteringSpendUnit("b")

t.Run("MedianSpend returns median for multiple spend units", func(t *testing.T) {
t.Parallel()

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
{"xyz", "a", "2.0"},
{"abc", "a", "3.0"},
{"abc", "b", "0.1"},
{"xyz", "b", "0.2"},
{"abc", "b", "0.3"},
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
{"abc", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.1))},
{"xyz", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.2))},
{"abc", testUnitB, testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.3))},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2",
"b": "0.2",
testUnitA: testUnitB.IntToSpendValue(2),
testUnitB: testUnitB.DecimalToSpendValue(decimal.NewFromFloat(0.2)),
}

assert.Equal(t, expected, report.MedianSpend())
Expand All @@ -58,15 +47,15 @@ func TestMeteringReport(t *testing.T) {

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "1",
testUnitA: testUnitA.IntToSpendValue(1),
}

assert.Equal(t, expected, report.MedianSpend())
Expand All @@ -77,17 +66,17 @@ func TestMeteringReport(t *testing.T) {

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"abc", "a", "1.0"},
{"abc", "a", "3.0"},
{"xyz", "a", "2.0"},
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2",
testUnitA: testUnitA.IntToSpendValue(2),
}

assert.Equal(t, expected, report.MedianSpend())
Expand All @@ -98,18 +87,18 @@ func TestMeteringReport(t *testing.T) {

report := workflows.NewMeteringReport()
steps := []workflows.MeteringReportStep{
{"xyz", "a", "42.0"},
{"abc", "a", "1.0"},
{"abc", "a", "3.0"},
{"xyz", "a", "2.0"},
{"xyz", testUnitA, testUnitA.IntToSpendValue(42)},
{"abc", testUnitA, testUnitA.IntToSpendValue(1)},
{"abc", testUnitA, testUnitA.IntToSpendValue(3)},
{"xyz", testUnitA, testUnitA.IntToSpendValue(2)},
}

for idx, step := range steps {
require.NoError(t, report.AddStep(workflows.MeteringReportStepRef(strconv.Itoa(idx)), step))
}

expected := map[workflows.MeteringSpendUnit]workflows.MeteringSpendValue{
"a": "2.5",
testUnitA: testUnitA.DecimalToSpendValue(decimal.NewFromFloat(2.5)),
}

assert.Equal(t, expected, report.MedianSpend())
Expand Down

0 comments on commit bfbfcf2

Please sign in to comment.