From 1c9997b0f1b160154bd4d5bc475f0d067c1938ab Mon Sep 17 00:00:00 2001 From: Brian Ginsburg <7957636+bgins@users.noreply.github.com> Date: Tue, 18 Feb 2025 15:38:10 -0800 Subject: [PATCH] feat: Add global job timeout (#512) * test: Clean up allowed resource providers between test runs * feat: Add active job offers filter query * feat: Add job timeout field and options * feat: Add terminal JobTimedOut agreement state * feat: Add cancelled job offers filter query * chore: Rename solver dealStats to jobStats * feat: Add cancelled job metrics * feat: Add cancelExpiredJobs * chore: Update comments * chore: Rename reportDealMetrics to reportJobMetrics * feat: Add BoolPointer lang helper * feat: Combine cancelled filter queries The cancelled filter query now includes the include cancelled, which was our mechanism for requesting all job offers. The cancelled filter query includes all when nil, or cancelled or not when true or false. * feat: Reject results and file uploads for timed out jobs * chore: Improve results checking log message --- cmd/lilypad/run.go | 3 + pkg/data/enums.go | 7 +- pkg/jobcreator/run.go | 7 ++ pkg/options/solver.go | 22 ++++-- pkg/solver/controller.go | 80 +++++++++++++++++++- pkg/solver/matcher/matcher.go | 1 + pkg/solver/metric.go | 103 +++++++++++++++---------- pkg/solver/server.go | 23 +++++- pkg/solver/solver.go | 13 ++-- pkg/solver/store/db/db.go | 21 +++++- pkg/solver/store/memory/store.go | 16 +++- pkg/solver/store/store.go | 9 ++- pkg/solver/store/store_test.go | 126 +++++++++++++++++++++++++++++-- pkg/solver/testing.go | 17 ++++- pkg/system/lang.go | 5 ++ 15 files changed, 380 insertions(+), 73 deletions(-) create mode 100644 pkg/system/lang.go diff --git a/cmd/lilypad/run.go b/cmd/lilypad/run.go index f0d77c74..1c29716e 100644 --- a/cmd/lilypad/run.go +++ b/cmd/lilypad/run.go @@ -117,6 +117,9 @@ func runJob(cmd *cobra.Command, options jobcreator.JobCreatorOptions, network st case "JobOfferCancelled": desc = "Job cancelled..." emoji = "😭" + case "JobTimedOut": + desc = "Job timed out..." + emoji = "⏱️" default: desc = st emoji = "🌟" diff --git a/pkg/data/enums.go b/pkg/data/enums.go index 0f94df3d..5d6d888e 100644 --- a/pkg/data/enums.go +++ b/pkg/data/enums.go @@ -27,6 +27,7 @@ var AgreementState = []string{ "TimeoutJudgeResults", "TimeoutMediateResults", "JobOfferCancelled", + "JobTimedOut", } // PaymentReason corresponds to PaymentReason in TypeScript @@ -84,7 +85,11 @@ func IsActiveAgreementState(itemType uint8) bool { } func IsTerminalAgreementState(itemType uint8) bool { - return itemType == GetAgreementStateIndex("JobOfferCancelled") || itemType == GetAgreementStateIndex("ResultsAccepted") || itemType == GetAgreementStateIndex("MediationAccepted") || itemType == GetAgreementStateIndex("MediationRejected") + return itemType == GetAgreementStateIndex("JobOfferCancelled") || + itemType == GetAgreementStateIndex("JobTimedOut") || + itemType == GetAgreementStateIndex("ResultsAccepted") || + itemType == GetAgreementStateIndex("MediationAccepted") || + itemType == GetAgreementStateIndex("MediationRejected") } // GetPaymentReason corresponds to getPaymentReason in TypeScript diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index 39e70ae9..30044d7e 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -118,6 +118,13 @@ waitloop: return nil, fmt.Errorf("job was cancelled") } + // Check if our job timed out + if finalJobOffer.State == data.GetAgreementStateIndex("JobTimedOut") { + span.SetStatus(codes.Error, "job timed out") + span.RecordError(err) + return nil, fmt.Errorf("job timed out") + } + span.AddEvent("get_result.start") result, err := jobCreatorService.GetResult(finalJobOffer.DealID) if err != nil { diff --git a/pkg/options/solver.go b/pkg/options/solver.go index 8d91cec0..ac1699e1 100644 --- a/pkg/options/solver.go +++ b/pkg/options/solver.go @@ -1,6 +1,8 @@ package options import ( + "fmt" + "github.com/lilypad-tech/lilypad/pkg/solver" "github.com/lilypad-tech/lilypad/pkg/system" "github.com/spf13/cobra" @@ -8,12 +10,13 @@ import ( func NewSolverOptions() solver.SolverOptions { options := solver.SolverOptions{ - Server: GetDefaultServerOptions(), - Store: GetDefaultStoreOptions(), - Web3: GetDefaultWeb3Options(), - Services: GetDefaultServicesOptions(), - Telemetry: GetDefaultTelemetryOptions(), - Metrics: GetDefaultMetricsOptions(), + Server: GetDefaultServerOptions(), + Store: GetDefaultStoreOptions(), + Web3: GetDefaultWeb3Options(), + Services: GetDefaultServicesOptions(), + Telemetry: GetDefaultTelemetryOptions(), + Metrics: GetDefaultMetricsOptions(), + JobTimeoutSeconds: GetDefaultServeOptionInt("JOB_TIMEOUT_SECONDS", 600), // 10 minutes } options.Web3.Service = system.SolverService return options @@ -26,6 +29,10 @@ func AddSolverCliFlags(cmd *cobra.Command, options *solver.SolverOptions) { AddServicesCliFlags(cmd, &options.Services) AddTelemetryCliFlags(cmd, &options.Telemetry) AddMetricsCliFlags(cmd, &options.Metrics) + cmd.PersistentFlags().IntVar( + &options.JobTimeoutSeconds, "job-timeout-seconds", options.JobTimeoutSeconds, + `The global timeout for jobs in seconds (JOB_TIMEOUT_SECONDS)`, + ) } func CheckSolverOptions(options solver.SolverOptions) error { @@ -49,6 +56,9 @@ func CheckSolverOptions(options solver.SolverOptions) error { if err != nil { return err } + if options.JobTimeoutSeconds <= 0 { + return fmt.Errorf("JOB_TIMEOUT_SECONDS must be greater than zero") + } return nil } diff --git a/pkg/solver/controller.go b/pkg/solver/controller.go index d45bcd7d..ff53957c 100644 --- a/pkg/solver/controller.go +++ b/pkg/solver/controller.go @@ -282,7 +282,15 @@ func (controller *SolverController) solve(ctx context.Context) error { ctx, span := controller.tracer.Start(ctx, "solve") defer span.End() - // find out which deals we can make from matching the offers + // Remove expired job offers + err := controller.cancelExpiredJobs(ctx) + if err != nil { + span.SetStatus(codes.Error, "remove expired offers failed") + span.RecordError(err) + return err + } + + // Match job offers with resource offers to make deals deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.log, controller.tracer, controller.meter) if err != nil { span.SetStatus(codes.Error, "get matching deals failed") @@ -294,7 +302,7 @@ func (controller *SolverController) solve(ctx context.Context) error { Value: attribute.StringSliceValue(data.GetDealIDs(deals)), }) - // loop over each of the deals add add them to the store and emit events + // Add deals to the store, update offer states, and notify network span.AddEvent("add_deals.start") for _, deal := range deals { _, err := controller.addDeal(ctx, deal) @@ -304,6 +312,7 @@ func (controller *SolverController) solve(ctx context.Context) error { } span.AddEvent("add_deals.done") + // Report deal state metrics span.AddEvent("report_deal_metrics.start") storedDeals, err := controller.store.GetDealsAll() if err != nil { @@ -311,7 +320,13 @@ func (controller *SolverController) solve(ctx context.Context) error { span.RecordError(err) return err } - err = reportDealMetrics(ctx, controller.meter, storedDeals) + jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{Cancelled: system.BoolPointer(true)}) + if err != nil { + span.SetStatus(codes.Error, "get cancelled job offers failed") + span.RecordError(err) + return err + } + err = reportJobMetrics(ctx, controller.meter, storedDeals, jobOffers) if err != nil { span.SetStatus(codes.Error, "report deal metrics failed") span.RecordError(err) @@ -321,6 +336,65 @@ func (controller *SolverController) solve(ctx context.Context) error { return nil } +func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error { + ctx, span := controller.tracer.Start(ctx, "cancel_expired_jobs") + defer span.End() + + // Get active job offers + span.AddEvent("db.get_job_offers.start") + jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{ + Active: true, + }) + if err != nil { + controller.log.Error("get job offers failed", err) + span.SetStatus(codes.Error, "get job offers failed") + span.RecordError(err) + return err + } + span.AddEvent("db.get_job_offers.done") + + // Check active job offers, and cancel expired offers + // and associated resource offers and deals + span.AddEvent("expire_jobs.start") + expiredOffers := []string{} + expiredDeals := []string{} + for _, jobOffer := range jobOffers { + now := time.Now().UnixMilli() + if now-int64(jobOffer.JobOffer.CreatedAt) > int64(controller.options.JobTimeoutSeconds*1000) { + if jobOffer.DealID == "" { + // Cancel expired job offers + _, err := controller.updateJobOfferState(jobOffer.ID, jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut")) + if err != nil { + controller.log.Error("update expired job offer state failed", err) + span.SetStatus(codes.Error, "update expired job offer state failed") + span.RecordError(err) + } + } else { + // Cancel expired job offers, resource offers, and deals + _, err := controller.updateDealState(jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut")) + if err != nil { + controller.log.Error("update expired deal state failed", err) + span.SetStatus(codes.Error, "update expired deal state failed") + span.RecordError(err) + } + expiredDeals = append(expiredDeals, jobOffer.DealID) + } + expiredOffers = append(expiredOffers, jobOffer.ID) + } + } + span.SetAttributes(attribute.KeyValue{ + Key: "expired_job_offers", + Value: attribute.StringSliceValue(expiredOffers), + }) + span.SetAttributes(attribute.KeyValue{ + Key: "expired_deals", + Value: attribute.StringSliceValue(expiredDeals), + }) + span.AddEvent("expire_jobs.end") + + return nil +} + /* * * diff --git a/pkg/solver/matcher/matcher.go b/pkg/solver/matcher/matcher.go index 7c92a823..27191592 100644 --- a/pkg/solver/matcher/matcher.go +++ b/pkg/solver/matcher/matcher.go @@ -52,6 +52,7 @@ func GetMatchingDeals( span.AddEvent("db.get_job_offers.start") jobOffers, err := db.GetJobOffers(store.GetJobOffersQuery{ NotMatched: true, + Cancelled: system.BoolPointer(false), OrderOldestFirst: true, }) if err != nil { diff --git a/pkg/solver/metric.go b/pkg/solver/metric.go index 8ca93e31..aa00d00b 100644 --- a/pkg/solver/metric.go +++ b/pkg/solver/metric.go @@ -21,9 +21,10 @@ type metrics struct { timeoutJudgeResults metric.Int64Gauge timeoutMediateResults metric.Int64Gauge jobOfferCancelled metric.Int64Gauge + jobTimedOut metric.Int64Gauge } -type dealStats struct { +type jobStats struct { stateCounts stateCounts } @@ -40,90 +41,98 @@ type stateCounts struct { timeoutJudgeResults int64 timeoutMediateResults int64 jobOfferCancelled int64 + jobTimedOut int64 } func newMetrics(meter metric.Meter) (*metrics, error) { // Deal states dealNegotiating, err := meter.Int64Gauge( "solver.deal.state.deal_negotiating", - metric.WithDescription("Number of deals in a DealNegotiating state."), + metric.WithDescription("Number of jobs in a DealNegotiating state."), ) if err != nil { return nil, err } dealAgreed, err := meter.Int64Gauge( "solver.deal.state.deal_agreed", - metric.WithDescription("Number of deals in a DealAgreed state."), + metric.WithDescription("Number of jobs in a DealAgreed state."), ) if err != nil { return nil, err } resultsSubmitted, err := meter.Int64Gauge( "solver.deal.state.results_submitted", - metric.WithDescription("Number of deals in a ResultsSubmitted state."), + metric.WithDescription("Number of jobs in a ResultsSubmitted state."), ) if err != nil { return nil, err } resultsAccepted, err := meter.Int64Gauge( "solver.deal.state.results_accepted", - metric.WithDescription("Number of deals in a ResultsAccepted state."), + metric.WithDescription("Number of jobs in a ResultsAccepted state."), ) if err != nil { return nil, err } resultsRejected, err := meter.Int64Gauge( "solver.deal.state.results_rejected", - metric.WithDescription("Number of deals in a ResultsRejected state."), + metric.WithDescription("Number of jobs in a ResultsRejected state."), ) if err != nil { return nil, err } resultsChecked, err := meter.Int64Gauge( "solver.deal.state.results_checked", - metric.WithDescription("Number of deals in a ResultsChecked state."), + metric.WithDescription("Number of jobs in a ResultsChecked state."), ) if err != nil { return nil, err } mediationAccepted, err := meter.Int64Gauge( "solver.deal.state.mediation_accepted", - metric.WithDescription("Number of deals in a MediationAccepted state."), + metric.WithDescription("Number of jobs in a MediationAccepted state."), ) if err != nil { return nil, err } mediationRejected, err := meter.Int64Gauge( "solver.deal.state.mediation_rejected", - metric.WithDescription("Number of deals in a MediationRejected state."), + metric.WithDescription("Number of jobs in a MediationRejected state."), ) if err != nil { return nil, err } timeoutSubmitResults, err := meter.Int64Gauge( "solver.deal.state.timeout_submit_results", - metric.WithDescription("Number of deals in a TimeoutSubmitResults state."), + metric.WithDescription("Number of jobs in a TimeoutSubmitResults state."), ) if err != nil { return nil, err } timeoutJudgeResults, err := meter.Int64Gauge( "solver.deal.state.timeout_judge_results", - metric.WithDescription("Number of deals in a TimeoutJudgeResults state."), + metric.WithDescription("Number of jobs in a TimeoutJudgeResults state."), ) if err != nil { return nil, err } timeoutMediateResults, err := meter.Int64Gauge( "solver.deal.state.timeout_mediate_results", - metric.WithDescription("Number of deals in a TimeoutMediateResults state."), + metric.WithDescription("Number of jobs in a TimeoutMediateResults state."), ) if err != nil { return nil, err } jobOfferCancelled, err := meter.Int64Gauge( - "solver.deal.state.job_offer_cancelled", - metric.WithDescription("Number of deals in a JobOfferCancelled state."), + "solver.job_offer.state.job_offer_cancelled", + metric.WithDescription("Number of jobs in a JobOfferCancelled state."), + ) + if err != nil { + return nil, err + } + jobTimedOut, err := meter.Int64Gauge( + "solver.job_offer.state.job_timed_out", + metric.WithDescription("Number of jobs in a JobTimedOut state."), ) if err != nil { return nil, err @@ -142,41 +151,52 @@ func newMetrics(meter metric.Meter) (*metrics, error) { timeoutJudgeResults, timeoutMediateResults, jobOfferCancelled, + jobTimedOut, }, nil } -func reportDealMetrics(ctx context.Context, meter metric.Meter, deals []data.DealContainer) error { - var dealStats dealStats +func reportJobMetrics(ctx context.Context, meter metric.Meter, deals []data.DealContainer, jobOffers []data.JobOfferContainer) error { + var jobStats jobStats // Compute deal state counts for _, deal := range deals { switch data.GetAgreementStateString(deal.State) { case "DealNegotiating": - dealStats.stateCounts.dealNegotiating += 1 + jobStats.stateCounts.dealNegotiating += 1 case "DealAgreed": - dealStats.stateCounts.dealAgreed += 1 + jobStats.stateCounts.dealAgreed += 1 case "ResultsSubmitted": - dealStats.stateCounts.resultsSubmitted += 1 + jobStats.stateCounts.resultsSubmitted += 1 case "ResultsAccepted": - dealStats.stateCounts.resultsAccepted += 1 + jobStats.stateCounts.resultsAccepted += 1 case "ResultsRejected": - dealStats.stateCounts.resultsRejected += 1 + jobStats.stateCounts.resultsRejected += 1 case "ResultsChecked": - dealStats.stateCounts.resultsChecked += 1 + jobStats.stateCounts.resultsChecked += 1 case "MediationAccepted": - dealStats.stateCounts.mediationAccepted += 1 + jobStats.stateCounts.mediationAccepted += 1 case "MediationRejected": - dealStats.stateCounts.mediationRejected += 1 + jobStats.stateCounts.mediationRejected += 1 case "TimeoutSubmitResults": - dealStats.stateCounts.timeoutSubmitResults += 1 + jobStats.stateCounts.timeoutSubmitResults += 1 case "TimeoutJudgeResults": - dealStats.stateCounts.timeoutJudgeResults += 1 + jobStats.stateCounts.timeoutJudgeResults += 1 case "TimeoutMediateResults": - dealStats.stateCounts.timeoutMediateResults += 1 + jobStats.stateCounts.timeoutMediateResults += 1 + default: + log.Trace().Msgf("untracked deal state ID: %d", deal.State) + } + } + + // Cancelled states may only exist in the job offer + for _, offer := range jobOffers { + switch data.GetAgreementStateString(offer.State) { case "JobOfferCancelled": - dealStats.stateCounts.jobOfferCancelled += 1 + jobStats.stateCounts.jobOfferCancelled += 1 + case "JobTimedOut": + jobStats.stateCounts.jobTimedOut += 1 default: - log.Warn().Msgf("unknown deal state ID: %d", deal.State) + log.Trace().Msgf("job metrics skipped offer state ID: %d", offer.State) } } @@ -186,18 +206,19 @@ func reportDealMetrics(ctx context.Context, meter metric.Meter, deals []data.Dea log.Warn().Msgf("failed to create solver metrics: %s", err) return err } - metrics.dealNegotiating.Record(ctx, dealStats.stateCounts.dealNegotiating) - metrics.dealAgreed.Record(ctx, dealStats.stateCounts.dealAgreed) - metrics.resultsSubmitted.Record(ctx, dealStats.stateCounts.resultsSubmitted) - metrics.resultsAccepted.Record(ctx, dealStats.stateCounts.resultsAccepted) - metrics.resultsRejected.Record(ctx, dealStats.stateCounts.resultsRejected) - metrics.resultsChecked.Record(ctx, dealStats.stateCounts.resultsChecked) - metrics.mediationAccepted.Record(ctx, dealStats.stateCounts.mediationAccepted) - metrics.mediationRejected.Record(ctx, dealStats.stateCounts.mediationRejected) - metrics.timeoutSubmitResults.Record(ctx, dealStats.stateCounts.timeoutSubmitResults) - metrics.timeoutJudgeResults.Record(ctx, dealStats.stateCounts.timeoutJudgeResults) - metrics.timeoutMediateResults.Record(ctx, dealStats.stateCounts.timeoutMediateResults) - metrics.jobOfferCancelled.Record(ctx, dealStats.stateCounts.jobOfferCancelled) + metrics.dealNegotiating.Record(ctx, jobStats.stateCounts.dealNegotiating) + metrics.dealAgreed.Record(ctx, jobStats.stateCounts.dealAgreed) + metrics.resultsSubmitted.Record(ctx, jobStats.stateCounts.resultsSubmitted) + metrics.resultsAccepted.Record(ctx, jobStats.stateCounts.resultsAccepted) + metrics.resultsRejected.Record(ctx, jobStats.stateCounts.resultsRejected) + metrics.resultsChecked.Record(ctx, jobStats.stateCounts.resultsChecked) + metrics.mediationAccepted.Record(ctx, jobStats.stateCounts.mediationAccepted) + metrics.mediationRejected.Record(ctx, jobStats.stateCounts.mediationRejected) + metrics.timeoutSubmitResults.Record(ctx, jobStats.stateCounts.timeoutSubmitResults) + metrics.timeoutJudgeResults.Record(ctx, jobStats.stateCounts.timeoutJudgeResults) + metrics.timeoutMediateResults.Record(ctx, jobStats.stateCounts.timeoutMediateResults) + metrics.jobOfferCancelled.Record(ctx, jobStats.stateCounts.jobOfferCancelled) + metrics.jobTimedOut.Record(ctx, jobStats.stateCounts.jobTimedOut) return nil } diff --git a/pkg/solver/server.go b/pkg/solver/server.go index 67adeb21..4ceeedce 100644 --- a/pkg/solver/server.go +++ b/pkg/solver/server.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "slices" + "strconv" "strings" "time" @@ -242,9 +243,17 @@ func (solverServer *solverServer) getJobOffers(res corehttp.ResponseWriter, req if notMatched := req.URL.Query().Get("not_matched"); notMatched == "true" { query.NotMatched = true } - if includeCancelled := req.URL.Query().Get("include_cancelled"); includeCancelled == "true" { - query.IncludeCancelled = true + if active := req.URL.Query().Get("active"); active == "true" { + query.Active = true + } + if cancelled := req.URL.Query().Get("cancelled"); cancelled != "" { + if val, err := strconv.ParseBool(cancelled); err == nil { + query.Cancelled = &val + } else { + return nil, fmt.Errorf("invalid cancelled filter value: %s", cancelled) + } } + return solverServer.store.GetJobOffers(query) } @@ -402,6 +411,10 @@ func (solverServer *solverServer) addResult(results data.Result, res corehttp.Re if deal == nil { return nil, fmt.Errorf("deal not found") } + if deal.State == data.GetAgreementStateIndex("JobTimedOut") { + log.Trace().Msgf("attempted results post for timed out job with deal ID: %s", deal.ID) + return nil, fmt.Errorf("job with deal ID %s timed out", deal.ID) + } signerAddress, err := http.CheckSignature(req) if err != nil { log.Error().Err(err).Msgf("error checking signature") @@ -413,7 +426,7 @@ func (solverServer *solverServer) addResult(results data.Result, res corehttp.Re } err = data.CheckResult(results) if err != nil { - log.Error().Err(err).Msgf("Error checking resource offer") + log.Error().Err(err).Msgf("Error checking result for deal ID: %s", results.DealID) return nil, err } results.DealID = id @@ -586,6 +599,10 @@ func (solverServer *solverServer) uploadFiles(res corehttp.ResponseWriter, req * log.Error().Msgf("deal not found") return err } + if deal.State == data.GetAgreementStateIndex("JobTimedOut") { + log.Trace().Msgf("attempted file upload for timed out job with deal ID: %s", deal.ID) + return fmt.Errorf("job with deal ID %s timed out", deal.ID) + } signerAddress, err := http.CheckSignature(req) if err != nil { log.Error().Err(err).Msgf("error checking signature") diff --git a/pkg/solver/solver.go b/pkg/solver/solver.go index e519795b..72f58266 100644 --- a/pkg/solver/solver.go +++ b/pkg/solver/solver.go @@ -15,12 +15,13 @@ import ( ) type SolverOptions struct { - Server http.ServerOptions - Store store.StoreOptions - Web3 web3.Web3Options - Services data.ServiceConfig - Telemetry system.TelemetryOptions - Metrics system.MetricsOptions + Server http.ServerOptions + Store store.StoreOptions + Web3 web3.Web3Options + Services data.ServiceConfig + Telemetry system.TelemetryOptions + Metrics system.MetricsOptions + JobTimeoutSeconds int } type Solver struct { diff --git a/pkg/solver/store/db/db.go b/pkg/solver/store/db/db.go index f1d08581..c47b0de0 100644 --- a/pkg/solver/store/db/db.go +++ b/pkg/solver/store/db/db.go @@ -159,8 +159,25 @@ func (store *SolverStoreDatabase) GetJobOffers(query store.GetJobOffersQuery) ([ if query.NotMatched { q = q.Where("deal_id = ''") } - if !query.IncludeCancelled { - q = q.Where("state != ?", data.GetAgreementStateIndex("JobOfferCancelled")) + if query.Active { + q = q.Where("state IN (?)", []uint8{ + data.GetAgreementStateIndex("DealNegotiating"), + data.GetAgreementStateIndex("DealAgreed"), + data.GetAgreementStateIndex("ResultsSubmitted"), + }) + } + if query.Cancelled != nil { + if *query.Cancelled { + q = q.Where("state IN (?)", []uint8{ + data.GetAgreementStateIndex("JobOfferCancelled"), + data.GetAgreementStateIndex("JobTimedOut"), + }) + } else { + q = q.Where("state NOT IN (?)", []uint8{ + data.GetAgreementStateIndex("JobOfferCancelled"), + data.GetAgreementStateIndex("JobTimedOut"), + }) + } } if query.OrderOldestFirst { q = q.Order("created_at ASC") diff --git a/pkg/solver/store/memory/store.go b/pkg/solver/store/memory/store.go index 1e7ac0b9..122fccbb 100644 --- a/pkg/solver/store/memory/store.go +++ b/pkg/solver/store/memory/store.go @@ -105,9 +105,23 @@ func (s *SolverStoreMemory) GetJobOffers(query store.GetJobOffersQuery) ([]data. matching = false } } - if !query.IncludeCancelled && jobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") { + if query.Active && + jobOffer.State != data.GetAgreementStateIndex("DealNegotiating") && + jobOffer.State != data.GetAgreementStateIndex("DealAgreed") && + jobOffer.State != data.GetAgreementStateIndex("ResultsSubmitted") { matching = false } + if query.Cancelled != nil { + isCancelled := jobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") || + jobOffer.State == data.GetAgreementStateIndex("JobTimedOut") + + wantCancelled := *query.Cancelled + if wantCancelled && !isCancelled { + matching = false + } else if !wantCancelled && isCancelled { + matching = false + } + } if matching { jobOffers = append(jobOffers, *jobOffer) } diff --git a/pkg/solver/store/store.go b/pkg/solver/store/store.go index 7be644f6..64186a97 100644 --- a/pkg/solver/store/store.go +++ b/pkg/solver/store/store.go @@ -20,8 +20,13 @@ type GetJobOffersQuery struct { // we use the DealID property of the jobOfferContainer to tell if it's been matched NotMatched bool `json:"not_matched"` - // this will include cancelled job offers in the results - IncludeCancelled bool `json:"include_cancelled"` + // Active job offers are umatched or in an in-progress deal. + // This includes the DealNegotiating, DealAgreed, or ResultsSubmitted states. + Active bool `json:"in_progress"` + + // Cancelled job offers are in a JobOfferCancelled or JobTimedOut state. + // All job offers are included when Cancelled is nil. + Cancelled *bool `json:"cancelled"` // Sort job offers oldest first OrderOldestFirst bool `json:"order_oldest_first"` diff --git a/pkg/solver/store/store_test.go b/pkg/solver/store/store_test.go index 4b90bddd..f6bcd27d 100644 --- a/pkg/solver/store/store_test.go +++ b/pkg/solver/store/store_test.go @@ -14,6 +14,7 @@ import ( "github.com/lilypad-tech/lilypad/pkg/solver" "github.com/lilypad-tech/lilypad/pkg/solver/store" solverstore "github.com/lilypad-tech/lilypad/pkg/solver/store" + "github.com/lilypad-tech/lilypad/pkg/system" "golang.org/x/exp/rand" ) @@ -151,9 +152,87 @@ func TestJobOfferQuery(t *testing.T) { DealID: "", State: data.GetAgreementStateIndex("JobOfferCancelled"), }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, + }, + query: store.GetJobOffersQuery{ + Cancelled: system.BoolPointer(false), + }, + expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, + }, + { + name: "filter active offers", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmV8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ku", + State: data.GetAgreementStateIndex("DealAgreed"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("ResultsSubmitted"), + }, + { + ID: "QmW9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kw", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("ResultsAccepted"), + }, + }, + query: store.GetJobOffersQuery{ + Active: true, + }, + expected: []string{ + "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + }, + }, + { + name: "combined filters with active", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0xabcdef0123456789abcdef0123456789abcdef01", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmW9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kw", + State: data.GetAgreementStateIndex("DealAgreed"), + }, + { + ID: "QmV9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kv", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmU8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kt", + State: data.GetAgreementStateIndex("ResultsAccepted"), + }, }, query: store.GetJobOffersQuery{ - IncludeCancelled: false, + JobCreator: "0x1234567890123456789012345678901234567890", + NotMatched: true, + Active: true, }, expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, }, @@ -172,13 +251,50 @@ func TestJobOfferQuery(t *testing.T) { DealID: "", State: data.GetAgreementStateIndex("JobOfferCancelled"), }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, }, query: store.GetJobOffersQuery{ - IncludeCancelled: true, + Cancelled: nil, }, expected: []string{ "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + }, + }, + { + name: "filter cancelled offers only", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetDefaultAgreementState(), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobOfferCancelled"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, + }, + query: store.GetJobOffersQuery{ + Cancelled: system.BoolPointer(true), + }, + expected: []string{ + "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", }, }, { @@ -210,9 +326,9 @@ func TestJobOfferQuery(t *testing.T) { }, }, query: store.GetJobOffersQuery{ - JobCreator: "0x1234567890123456789012345678901234567890", - NotMatched: true, - IncludeCancelled: false, + JobCreator: "0x1234567890123456789012345678901234567890", + NotMatched: true, + Cancelled: system.BoolPointer(false), }, expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, }, diff --git a/pkg/solver/testing.go b/pkg/solver/testing.go index 52690689..3ab1cf5b 100644 --- a/pkg/solver/testing.go +++ b/pkg/solver/testing.go @@ -59,9 +59,7 @@ func SetupTestStores(t *testing.T) []TestStoreConfig { func clearStoreDatabase(t *testing.T, s store.SolverStore) { // Delete job offers - jobOffers, err := s.GetJobOffers(store.GetJobOffersQuery{ - IncludeCancelled: true, - }) + jobOffers, err := s.GetJobOffers(store.GetJobOffersQuery{}) if err != nil { t.Fatalf("Failed to get existing job offers: %v", err) } @@ -124,4 +122,17 @@ func clearStoreDatabase(t *testing.T, s store.SolverStore) { t.Fatalf("Failed to remove existing match decision: %v", err) } } + + // Delete allowed resource providers + providers, err := s.GetAllowedResourceProviders() + if err != nil { + t.Fatalf("Failed to get existing allowed resource providers: %v", err) + } + + for _, provider := range providers { + err := s.RemoveAllowedResourceProvider(provider) + if err != nil { + t.Fatalf("Failed to remove existing allowed resource provider: %v", err) + } + } } diff --git a/pkg/system/lang.go b/pkg/system/lang.go new file mode 100644 index 00000000..20df5116 --- /dev/null +++ b/pkg/system/lang.go @@ -0,0 +1,5 @@ +package system + +func BoolPointer(b bool) *bool { + return &b +}