Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement billing events #24

Merged
merged 1 commit into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bin/dphp-linux-x86_64
Binary file not shown.
152 changes: 152 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"
)
Expand Down Expand Up @@ -209,6 +210,157 @@ func execute(args []string, options map[string]string) int {
}
}

if config.Extensions.Billing.Enabled {
if config.Extensions.Billing.Listen {

billings := sync.Map{}
billings.Store("e", 0)
billings.Store("o", 0)
billings.Store("a", 0*time.Minute)
billings.Store("ac", 0)

var incrementInt func(key string, amount int)
incrementInt = func(key string, amount int) {
var old interface{}
old, _ = billings.Load(key)
if !billings.CompareAndSwap(key, old, old.(int)+1) {
incrementInt(key, amount)
}
}

var incrementDur func(key string, amount time.Duration)
incrementDur = func(key string, amount time.Duration) {
var old interface{}
old, _ = billings.Load(key)
if !billings.CompareAndSwap(key, old, old.(time.Duration)+amount) {
incrementDur(key, amount)
}
}

outputBillingStatus := func() {
costC := func(num interface{}, basis int) float64 {
return float64(num.(int)) * float64(basis) / 10_000_000
}

costA := func(dur interface{}, basis int) float64 {
duration := dur.(time.Duration)
seconds := duration.Seconds()
return float64(basis) * seconds / 100_000
}

avg := func(dur interface{}, count interface{}) time.Duration {
seconds := dur.(time.Duration).Seconds()
return time.Duration(seconds/float64(count.(int))) * time.Second
}

e, _ := billings.Load("e")
o, _ := billings.Load("o")
ac, _ := billings.Load("ac")
a, _ := billings.Load("a")

ecost := costC(e, config.Extensions.Billing.Costs.Entities.Cost)
ocost := costC(o, config.Extensions.Billing.Costs.Orchestrations.Cost)
acost := costA(a, config.Extensions.Billing.Costs.Activities.Cost)

logger.Warn("Billing estimate",
zap.Any("launched entities", e),
zap.String("entity cost", fmt.Sprintf("$%.2f", ecost)),
zap.Any("launched orchestrations", o),
zap.String("orchestration cost", fmt.Sprintf("$%.2f", ocost)),
zap.Any("activity time", a),
zap.Any("activities launced", ac),
zap.Any("average activity time", avg(a, ac)),
zap.String("activity cost", fmt.Sprintf("$%.2f", acost)),
zap.String("total estimate", fmt.Sprintf("$%.2f", ecost+ocost+acost)),
)
}

go func() {
ticker := time.NewTicker(3 * time.Second)
for range ticker.C {
outputBillingStatus()
}
}()

billingStream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "billing",
Subjects: []string{
"billing." + config.Stream + ".>",
},
Storage: jetstream.FileStorage,
Retention: jetstream.LimitsPolicy,
MaxAge: 7 * 24 * time.Hour,
})
if err != nil {
panic(err)
}

entityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "entityAggregator",
FilterSubjects: []string{
"billing." + config.Stream + ".entities.>",
},
})
if err != nil {
panic(err)
}

consume, err := entityConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("e", 1)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()

orchestrationConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "orchestrationAggregator",
FilterSubject: "billing." + config.Stream + ".orchestrations.>",
})
if err != nil {
panic(err)
}

consume, err = orchestrationConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("o", 1)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()

activityConsumer, err := billingStream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "orchestrationAggregator",
FilterSubject: "billing." + config.Stream + ".activities.>",
})
if err != nil {
panic(err)
}

consume, err = activityConsumer.Consume(func(msg jetstream.Msg) {
incrementInt("ac", 1)
var ev lib.BillingEvent
err := json.Unmarshal(msg.Data(), &ev)
if err != nil {
panic(err)
}
incrementDur("a", ev.Duration)
msg.Ack()
})
if err != nil {
panic(err)
}
defer consume.Drain()
}

err := lib.StartBillingProcessor(ctx, config, js, logger)
if err != nil {
panic(err)
}
}

port := options["port"]
if port == "" {
port = "8080"
Expand Down
171 changes: 171 additions & 0 deletions cli/lib/billing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package lib

import (
"context"
"encoding/json"
"fmt"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
"time"
)

type BillingEvent struct {
Id string `json:"id"`
Duration time.Duration `json:"duration,omitempty"`
}

// StartBillingProcessor starts up a consumer on the history stream and waits for
// events that are billable events. It then fires a billing event to the billings stream.
func StartBillingProcessor(ctx context.Context, config *Config, js jetstream.JetStream, logger *zap.Logger) error {
if !config.Extensions.Billing.Enabled {
logger.Info("Billing events are disabled")
return nil
}

consumer, err := js.CreateOrUpdateConsumer(ctx, config.Stream+"_history", jetstream.ConsumerConfig{
Durable: "billing-consumer",
HeadersOnly: true,
})
if err != nil {
return err
}

activityTracker, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Storage: jetstream.MemoryStorage,
Bucket: "ActivityTracker",
TTL: 1 * time.Hour,
Compression: true,
})
if err != nil {
return err
}

maybeSendActivityBilling := func(id *StateId) {
started, err := activityTracker.Get(ctx, id.toSubject().String()+"_start")
if err != nil {
return
}
ended, err := activityTracker.Get(ctx, id.toSubject().String()+"_end")
if err != nil {
return
}

start := time.Time{}
end := time.Time{}
err = start.UnmarshalText(started.Value())
if err != nil {
logger.Warn("Failed to parse start time", zap.String("id", id.String()), zap.Error(err))
return
}
err = end.UnmarshalText(ended.Value())
if err != nil {
logger.Warn("Failed to parse end time", zap.String("id", id.String()), zap.Error(err))
return
}

duration := end.Sub(start)
event, err := json.Marshal(BillingEvent{Id: id.String(), Duration: duration})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
}

_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.activities.%s.duration", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.Error(err))
return
}
}

entityRegistry, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Storage: jetstream.FileStorage,
Bucket: "EntityRegistry",
Compression: true,
})

consume, err := consumer.Consume(func(msg jetstream.Msg) {
targetType := msg.Headers().Get(string(HeaderTargetType))
eventType := msg.Headers().Get(string(HeaderEventType))
id := ParseStateId(msg.Headers().Get(string(HeaderStateId)))
nowBytes := []byte(msg.Headers().Get(string(HeaderEmittedAt)))
emittedBy := ParseStateId(msg.Headers().Get(string(HeaderEmittedBy)))

switch targetType {
case "Activity":
switch eventType {
case "ScheduleTask":
// an activity has been started
_, err := activityTracker.Put(ctx, id.toSubject().String()+"_start", nowBytes)
if err != nil {
panic(err)
}

maybeSendActivityBilling(id)
}
case "Orchestration":
switch eventType {
case "StartExecution":
event, err := json.Marshal(BillingEvent{Id: id.String()})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
return
}
_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.orchestrations.%s.started", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.String("id", id.String()), zap.Error(err))
return
}
case "TaskCompleted":
fallthrough
case "TaskFailed":
_, err := activityTracker.Put(ctx, emittedBy.toSubject().String()+"_end", nowBytes)
if err != nil {
panic(err)
}
maybeSendActivityBilling(emittedBy)
}
case "Entity":
switch eventType {
case "AwaitResult":
fallthrough
case "RaiseEvent":
_, err := entityRegistry.Get(ctx, id.toSubject().String())
if err != nil {
_, _ = entityRegistry.Put(ctx, id.toSubject().String(), []byte{1})
event, err := json.Marshal(BillingEvent{Id: id.String()})
if err != nil {
logger.Warn("Failed to create billing event", zap.String("id", id.String()), zap.Error(err))
return
}
_, err = js.Publish(ctx, fmt.Sprintf("billing.%s.entities.%s.started", config.Stream, id.toSubject().String()), event)
if err != nil {
logger.Warn("Failed to publish billing event", zap.String("id", id.String()), zap.Error(err))
}
logger.Warn("Billed for one entity")
}
case "TaskCompleted":
fallthrough
case "TaskFailed":
_, err := activityTracker.Put(ctx, emittedBy.toSubject().String()+"_end", nowBytes)
if err != nil {
panic(err)
}
maybeSendActivityBilling(emittedBy)
}
}

err := msg.Ack()
if err != nil {
logger.Warn("Failed to ack historical message", zap.Error(err))
}
})
if err != nil {
return err
}

go func() {
<-ctx.Done()
consume.Drain()
}()

return nil
}
14 changes: 14 additions & 0 deletions cli/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
//go:embed defaultConfiguration.json
var defaultConfig string

type BillingUnit struct {
FreeLimit int `json:"freeLimit"`
MaxFree int `json:"maxFree"`
Cost int `json:"cost"`
Limit int `json:"limit"`
}

type Config struct {
Stream string `json:"project"`
Bootstrap string `json:"bootstrap"`
Expand All @@ -29,6 +36,13 @@ type Config struct {
Billing struct {
Enabled bool `json:"enabled"`
Listen bool `json:"listen"`
Costs struct {
Orchestrations BillingUnit `json:"orchestrations"`
Activities BillingUnit `json:"activities"`
Entities BillingUnit `json:"entities"`
ObjectStorage BillingUnit `json:"objectStorage"`
FileStorage BillingUnit `json:"fileStorage"`
} `json:"costs"`
} `json:"billing,omitempty"`
Search struct {
Url string `json:"url"`
Expand Down
2 changes: 1 addition & 1 deletion cli/lib/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func processMsg(ctx context.Context, logger *zap.Logger, msg jetstream.Msg, js j
env["EVENT"] = string(msg.Data())
env["STATE_ID"] = msg.Headers().Get(string(HeaderStateId))

msgs, headers, _ := glu.execute(ctx, headers, logger, env, js)
msgs, headers, _ := glu.execute(ctx, headers, logger, env, js, id)

// now update the stored state, if this fails due to optimistic concurrency, we immediately nak and fail
err := update()
Expand Down
Loading
Loading