Skip to content

Commit

Permalink
process and output billing events
Browse files Browse the repository at this point in the history
  • Loading branch information
withinboredom committed Mar 16, 2024
1 parent 9b8eda1 commit 5a3cbca
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 18 deletions.
Binary file modified bin/dphp-linux-x86_64
Binary file not shown.
152 changes: 152 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
)
Expand Down Expand Up @@ -209,6 +210,157 @@ func execute(args []string, options map[string]string) int {
}
}

if config.Extensions.Billing.Enabled {
if config.Extensions.Billing.Listen {

billings := sync.Map{}
billings.Store("e", 0)
billings.Store("o", 0)
billings.Store("a", 0*time.Minute)
billings.Store("ac", 0)

var incrementInt func(key string, amount int)
incrementInt = func(key string, amount int) {
var old interface{}
old, _ = billings.Load(key)
if !billings.CompareAndSwap(key, old, old.(int)+1) {
incrementInt(key, amount)
}
}

var incrementDur func(key string, amount time.Duration)
incrementDur = func(key string, amount time.Duration) {
var old interface{}
old, _ = billings.Load(key)
if !billings.CompareAndSwap(key, old, old.(time.Duration)+amount) {
incrementDur(key, amount)
}
}

outputBillingStatus := func() {
costC := func(num interface{}, basis int) float64 {
return float64(num.(int)) * float64(basis) / 10_000_000
}

costA := func(dur interface{}, basis int) float64 {
duration := dur.(time.Duration)
seconds := duration.Seconds()
return float64(basis) * seconds / 100_000
}

avg := func(dur interface{}, count interface{}) time.Duration {
seconds := dur.(time.Duration).Seconds()
return time.Duration(seconds/float64(count.(int))) * time.Second
}

e, _ := billings.Load("e")
o, _ := billings.Load("o")
ac, _ := billings.Load("ac")
a, _ := billings.Load("a")

ecost := costC(e, config.Extensions.Billing.Costs.Entities.Cost)
ocost := costC(o, config.Extensions.Billing.Costs.Orchestrations.Cost)
acost := costA(a, config.Extensions.Billing.Costs.Activities.Cost)

logger.Warn("Billing estimate",
zap.Any("launched entities", e),
zap.String("entity cost", fmt.Sprintf("$%.2f", ecost)),
zap.Any("launched orchestrations", o),
zap.String("orchestration cost", fmt.Sprintf("$%.2f", ocost)),
zap.Any("activity time", a),
zap.Any("activities launced", ac),
zap.Any("average activity time", avg(a, ac)),
zap.String("activity cost", fmt.Sprintf("$%.2f", acost)),
zap.String("total estimate", fmt.Sprintf("$%.2f", ecost+ocost+acost)),
)
}

go func() {
ticker := time.NewTicker(3 * time.Second)
for range ticker.C {
outputBillingStatus()
}
}()

billingStream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "billing",
Subjects: []string{
"billing." + config.Stream + ".>",
},
Storage: jetstream.FileStorage,
Retention: jetstream.LimitsPolicy,
MaxAge: 7 * 24 * time.Hour,
})
if err != nil {
panic(err)
}

entityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "entityAggregator",
FilterSubjects: []string{
"billing." + config.Stream + ".entities.>",
},
})
if err != nil {
panic(err)
}

consume, err := entityConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("e", 1)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()

orchestrationConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "orchestrationAggregator",
FilterSubject: "billing." + config.Stream + ".orchestrations.>",
})
if err != nil {
panic(err)
}

consume, err = orchestrationConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("o", 1)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()

activityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "orchestrationAggregator",
FilterSubject: "billing." + config.Stream + ".activities.>",
})
if err != nil {
panic(err)
}

consume, err = activityConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("ac", 1)
var ev lib.BillingEvent
err := json.Unmarshal(msg.Data(), &ev)
if err != nil {
panic(err)
}
incrementDur("a", ev.Duration)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()
}

err := lib.StartBillingProcessor(ctx, config, js, logger)
if err != nil {
panic(err)
}
}

port := options["port"]
if port == "" {
port = "8080"
Expand Down
171 changes: 171 additions & 0 deletions cli/lib/billing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package lib

import (
"context"
"encoding/json"
"fmt"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
"time"
)

type BillingEvent struct {
Id string `json:"id"`
Duration time.Duration `json:"duration,omitempty"`
}

// StartBillingProcessor starts up a consumer on the history stream and waits for
// events that are billable events. It then fires a billing event to the billings stream.
func StartBillingProcessor(ctx context.Context, config *Config, js jetstream.JetStream, logger *zap.Logger) error {
if !config.Extensions.Billing.Enabled {
logger.Info("Billing events are disabled")
return nil
}

consumer, err := js.CreateOrUpdateConsumer(ctx, config.Stream+"_history", jetstream.ConsumerConfig{
Durable: "billing-consumer",
HeadersOnly: true,
})
if err != nil {
return err
}

activityTracker, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Storage: jetstream.MemoryStorage,
Bucket: "ActivityTracker",
TTL: 1 * time.Hour,
Compression: true,
})
if err != nil {
return err
}

maybeSendActivityBilling := func(id *StateId) {
started, err := activityTracker.Get(ctx, id.toSubject().String()+"_start")
if err != nil {
return
}
ended, err := activityTracker.Get(ctx, id.toSubject().String()+"_end")
if err != nil {
return
}

start := time.Time{}
end := time.Time{}
err = start.UnmarshalText(started.Value())
if err != nil {
logger.Warn("Failed to parse start time", zap.String("id", id.String()), zap.Error(err))
return
}
err = end.UnmarshalText(ended.Value())
if err != nil {
logger.Warn("Failed to parse end time", zap.String("id", id.String()), zap.Error(err))
return
}

duration := end.Sub(start)
event, err := json.Marshal(BillingEvent{Id: id.String(), Duration: duration})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
}

_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.activities.%s.duration", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.Error(err))
return
}
}

entityRegistry, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Storage: jetstream.FileStorage,
Bucket: "EntityRegistry",
Compression: true,
})

consume, err := consumer.Consume(func(msg jetstream.Msg) {
targetType := msg.Headers().Get(string(HeaderTargetType))
eventType := msg.Headers().Get(string(HeaderEventType))
id := ParseStateId(msg.Headers().Get(string(HeaderStateId)))
nowBytes := []byte(msg.Headers().Get(string(HeaderEmittedAt)))
emittedBy := ParseStateId(msg.Headers().Get(string(HeaderEmittedBy)))

switch targetType {
case "Activity":
switch eventType {
case "ScheduleTask":
// an activity has been started
_, err := activityTracker.Put(ctx, id.toSubject().String()+"_start", nowBytes)
if err != nil {
panic(err)
}

maybeSendActivityBilling(id)
}
case "Orchestration":
switch eventType {
case "StartExecution":
event, err := json.Marshal(BillingEvent{Id: id.String()})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
return
}
_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.orchestrations.%s.started", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.String("id", id.String()), zap.Error(err))
return
}
case "TaskCompleted":
fallthrough
case "TaskFailed":
_, err := activityTracker.Put(ctx, emittedBy.toSubject().String()+"_end", nowBytes)
if err != nil {
panic(err)
}
maybeSendActivityBilling(emittedBy)
}
case "Entity":
switch eventType {
case "AwaitResult":
fallthrough
case "RaiseEvent":
_, err := entityRegistry.Get(ctx, id.toSubject().String())
if err != nil {
_, _ = entityRegistry.Put(ctx, id.toSubject().String(), []byte{1})
event, err := json.Marshal(BillingEvent{Id: id.String()})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
return
}
_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.entities.%s.started", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.String("id", id.String()), zap.Error(err))
}
logger.Warn("Billed for one entity")
}
case "TaskCompleted":
fallthrough
case "TaskFailed":
_, err := activityTracker.Put(ctx, emittedBy.toSubject().String()+"_end", nowBytes)
if err != nil {
panic(err)
}
maybeSendActivityBilling(emittedBy)
}
}

err := msg.Ack()
if err != nil {
logger.Warn("Failed to ack historical message", zap.Error(err))
}
})
if err != nil {
return err
}

go func() {
<-ctx.Done()
consume.Drain()
}()

return nil
}
14 changes: 14 additions & 0 deletions cli/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
//go:embed defaultConfiguration.json
var defaultConfig string

type BillingUnit struct {
FreeLimit int `json:"freeLimit"`
MaxFree int `json:"maxFree"`
Cost int `json:"cost"`
Limit int `json:"limit"`
}

type Config struct {
Stream string `json:"project"`
Bootstrap string `json:"bootstrap"`
Expand All @@ -29,6 +36,13 @@ type Config struct {
Billing struct {
Enabled bool `json:"enabled"`
Listen bool `json:"listen"`
Costs struct {
Orchestrations BillingUnit `json:"orchestrations"`
Activities BillingUnit `json:"activities"`
Entities BillingUnit `json:"entities"`
ObjectStorage BillingUnit `json:"objectStorage"`
FileStorage BillingUnit `json:"fileStorage"`
} `json:"costs"`
} `json:"billing,omitempty"`
Search struct {
Url string `json:"url"`
Expand Down
2 changes: 1 addition & 1 deletion cli/lib/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func processMsg(ctx context.Context, logger *zap.Logger, msg jetstream.Msg, js j
env["EVENT"] = string(msg.Data())
env["STATE_ID"] = msg.Headers().Get(string(HeaderStateId))

msgs, headers, _ := glu.execute(ctx, headers, logger, env, js)
msgs, headers, _ := glu.execute(ctx, headers, logger, env, js, id)

// now update the stored state, if this fails due to optimistic concurrency, we immediately nak and fail
err := update()
Expand Down
Loading

0 comments on commit 5a3cbca

Please sign in to comment.