Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add global job timeout #512

Merged
merged 19 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/lilypad/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func runJob(cmd *cobra.Command, options jobcreator.JobCreatorOptions, network st
case "JobOfferCancelled":
desc = "Job cancelled..."
emoji = "😭"
case "JobTimedOut":
desc = "Job timed out..."
emoji = "⏱️"
default:
desc = st
emoji = "🌟"
Expand Down
7 changes: 6 additions & 1 deletion pkg/data/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var AgreementState = []string{
"TimeoutJudgeResults",
"TimeoutMediateResults",
"JobOfferCancelled",
"JobTimedOut",
}

// PaymentReason corresponds to PaymentReason in TypeScript
Expand Down Expand Up @@ -84,7 +85,11 @@ func IsActiveAgreementState(itemType uint8) bool {
}

func IsTerminalAgreementState(itemType uint8) bool {
return itemType == GetAgreementStateIndex("JobOfferCancelled") || itemType == GetAgreementStateIndex("ResultsAccepted") || itemType == GetAgreementStateIndex("MediationAccepted") || itemType == GetAgreementStateIndex("MediationRejected")
return itemType == GetAgreementStateIndex("JobOfferCancelled") ||
itemType == GetAgreementStateIndex("JobTimedOut") ||
itemType == GetAgreementStateIndex("ResultsAccepted") ||
itemType == GetAgreementStateIndex("MediationAccepted") ||
itemType == GetAgreementStateIndex("MediationRejected")
}

// GetPaymentReason corresponds to getPaymentReason in TypeScript
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobcreator/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ waitloop:
return nil, fmt.Errorf("job was cancelled")
}

// Check if our job timed out
if finalJobOffer.State == data.GetAgreementStateIndex("JobTimedOut") {
span.SetStatus(codes.Error, "job timed out")
span.RecordError(err)
return nil, fmt.Errorf("job timed out")
}

span.AddEvent("get_result.start")
result, err := jobCreatorService.GetResult(finalJobOffer.DealID)
if err != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/options/solver.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package options

import (
"fmt"

"github.com/lilypad-tech/lilypad/pkg/solver"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/spf13/cobra"
)

func NewSolverOptions() solver.SolverOptions {
options := solver.SolverOptions{
Server: GetDefaultServerOptions(),
Store: GetDefaultStoreOptions(),
Web3: GetDefaultWeb3Options(),
Services: GetDefaultServicesOptions(),
Telemetry: GetDefaultTelemetryOptions(),
Metrics: GetDefaultMetricsOptions(),
Server: GetDefaultServerOptions(),
Store: GetDefaultStoreOptions(),
Web3: GetDefaultWeb3Options(),
Services: GetDefaultServicesOptions(),
Telemetry: GetDefaultTelemetryOptions(),
Metrics: GetDefaultMetricsOptions(),
JobTimeoutSeconds: GetDefaultServeOptionInt("JOB_TIMEOUT_SECONDS", 600), // 10 minutes
}
options.Web3.Service = system.SolverService
return options
Expand All @@ -26,6 +29,10 @@ func AddSolverCliFlags(cmd *cobra.Command, options *solver.SolverOptions) {
AddServicesCliFlags(cmd, &options.Services)
AddTelemetryCliFlags(cmd, &options.Telemetry)
AddMetricsCliFlags(cmd, &options.Metrics)
cmd.PersistentFlags().IntVar(
&options.JobTimeoutSeconds, "job-timeout-seconds", options.JobTimeoutSeconds,
`The global timeout for jobs in seconds (JOB_TIMEOUT_SECONDS)`,
)
}

func CheckSolverOptions(options solver.SolverOptions) error {
Expand All @@ -49,6 +56,9 @@ func CheckSolverOptions(options solver.SolverOptions) error {
if err != nil {
return err
}
if options.JobTimeoutSeconds <= 0 {
return fmt.Errorf("JOB_TIMEOUT_SECONDS must be greater than zero")
}
return nil
}

Expand Down
80 changes: 77 additions & 3 deletions pkg/solver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,15 @@ func (controller *SolverController) solve(ctx context.Context) error {
ctx, span := controller.tracer.Start(ctx, "solve")
defer span.End()

// find out which deals we can make from matching the offers
// Remove expired job offers
err := controller.cancelExpiredJobs(ctx)
if err != nil {
span.SetStatus(codes.Error, "remove expired offers failed")
span.RecordError(err)
return err
}

// Match job offers with resource offers to make deals
deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.log, controller.tracer, controller.meter)
if err != nil {
span.SetStatus(codes.Error, "get matching deals failed")
Expand All @@ -294,7 +302,7 @@ func (controller *SolverController) solve(ctx context.Context) error {
Value: attribute.StringSliceValue(data.GetDealIDs(deals)),
})

// loop over each of the deals add add them to the store and emit events
// Add deals to the store, update offer states, and notify network
span.AddEvent("add_deals.start")
for _, deal := range deals {
_, err := controller.addDeal(ctx, deal)
Expand All @@ -304,14 +312,21 @@ func (controller *SolverController) solve(ctx context.Context) error {
}
span.AddEvent("add_deals.done")

// Report deal state metrics
span.AddEvent("report_deal_metrics.start")
storedDeals, err := controller.store.GetDealsAll()
if err != nil {
span.SetStatus(codes.Error, "get all deals failed")
span.RecordError(err)
return err
}
err = reportDealMetrics(ctx, controller.meter, storedDeals)
jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{Cancelled: system.BoolPointer(true)})
if err != nil {
span.SetStatus(codes.Error, "get cancelled job offers failed")
span.RecordError(err)
return err
}
err = reportJobMetrics(ctx, controller.meter, storedDeals, jobOffers)
if err != nil {
span.SetStatus(codes.Error, "report deal metrics failed")
span.RecordError(err)
Expand All @@ -321,6 +336,65 @@ func (controller *SolverController) solve(ctx context.Context) error {
return nil
}

func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error {
ctx, span := controller.tracer.Start(ctx, "cancel_expired_jobs")
defer span.End()

// Get active job offers
span.AddEvent("db.get_job_offers.start")
jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{
Active: true,
})
if err != nil {
controller.log.Error("get job offers failed", err)
span.SetStatus(codes.Error, "get job offers failed")
span.RecordError(err)
return err
}
span.AddEvent("db.get_job_offers.done")

// Check active job offers, and cancel expired offers
// and associated resource offers and deals
span.AddEvent("expire_jobs.start")
expiredOffers := []string{}
expiredDeals := []string{}
for _, jobOffer := range jobOffers {
now := time.Now().UnixMilli()
if now-int64(jobOffer.JobOffer.CreatedAt) > int64(controller.options.JobTimeoutSeconds*1000) {
if jobOffer.DealID == "" {
// Cancel expired job offers
_, err := controller.updateJobOfferState(jobOffer.ID, jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired job offer state failed", err)
span.SetStatus(codes.Error, "update expired job offer state failed")
span.RecordError(err)
}
} else {
// Cancel expired job offers, resource offers, and deals
_, err := controller.updateDealState(jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired deal state failed", err)
span.SetStatus(codes.Error, "update expired deal state failed")
span.RecordError(err)
}
expiredDeals = append(expiredDeals, jobOffer.DealID)
}
expiredOffers = append(expiredOffers, jobOffer.ID)
}
}
span.SetAttributes(attribute.KeyValue{
Key: "expired_job_offers",
Value: attribute.StringSliceValue(expiredOffers),
})
span.SetAttributes(attribute.KeyValue{
Key: "expired_deals",
Value: attribute.StringSliceValue(expiredDeals),
})
span.AddEvent("expire_jobs.end")

return nil
}

/*
*
*
Expand Down
1 change: 1 addition & 0 deletions pkg/solver/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func GetMatchingDeals(
span.AddEvent("db.get_job_offers.start")
jobOffers, err := db.GetJobOffers(store.GetJobOffersQuery{
NotMatched: true,
Cancelled: system.BoolPointer(false),
OrderOldestFirst: true,
})
if err != nil {
Expand Down
Loading