diff --git a/cmd/lilypad/run.go b/cmd/lilypad/run.go index f0d77c74..1c29716e 100644 --- a/cmd/lilypad/run.go +++ b/cmd/lilypad/run.go @@ -117,6 +117,9 @@ func runJob(cmd *cobra.Command, options jobcreator.JobCreatorOptions, network st case "JobOfferCancelled": desc = "Job cancelled..." emoji = "😭" + case "JobTimedOut": + desc = "Job timed out..." + emoji = "⏱️" default: desc = st emoji = "🌟" diff --git a/pkg/data/enums.go b/pkg/data/enums.go index 0f94df3d..5d6d888e 100644 --- a/pkg/data/enums.go +++ b/pkg/data/enums.go @@ -27,6 +27,7 @@ var AgreementState = []string{ "TimeoutJudgeResults", "TimeoutMediateResults", "JobOfferCancelled", + "JobTimedOut", } // PaymentReason corresponds to PaymentReason in TypeScript @@ -84,7 +85,11 @@ func IsActiveAgreementState(itemType uint8) bool { } func IsTerminalAgreementState(itemType uint8) bool { - return itemType == GetAgreementStateIndex("JobOfferCancelled") || itemType == GetAgreementStateIndex("ResultsAccepted") || itemType == GetAgreementStateIndex("MediationAccepted") || itemType == GetAgreementStateIndex("MediationRejected") + return itemType == GetAgreementStateIndex("JobOfferCancelled") || + itemType == GetAgreementStateIndex("JobTimedOut") || + itemType == GetAgreementStateIndex("ResultsAccepted") || + itemType == GetAgreementStateIndex("MediationAccepted") || + itemType == GetAgreementStateIndex("MediationRejected") } // GetPaymentReason corresponds to getPaymentReason in TypeScript diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index 39e70ae9..30044d7e 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -118,6 +118,13 @@ waitloop: return nil, fmt.Errorf("job was cancelled") } + // Check if our job timed out + if finalJobOffer.State == data.GetAgreementStateIndex("JobTimedOut") { + span.SetStatus(codes.Error, "job timed out") + span.RecordError(err) + return nil, fmt.Errorf("job timed out") + } + span.AddEvent("get_result.start") result, err := jobCreatorService.GetResult(finalJobOffer.DealID) if err != nil { diff --git a/pkg/options/solver.go b/pkg/options/solver.go index 8d91cec0..ac1699e1 100644 --- a/pkg/options/solver.go +++ b/pkg/options/solver.go @@ -1,6 +1,8 @@ package options import ( + "fmt" + "github.com/lilypad-tech/lilypad/pkg/solver" "github.com/lilypad-tech/lilypad/pkg/system" "github.com/spf13/cobra" @@ -8,12 +10,13 @@ import ( func NewSolverOptions() solver.SolverOptions { options := solver.SolverOptions{ - Server: GetDefaultServerOptions(), - Store: GetDefaultStoreOptions(), - Web3: GetDefaultWeb3Options(), - Services: GetDefaultServicesOptions(), - Telemetry: GetDefaultTelemetryOptions(), - Metrics: GetDefaultMetricsOptions(), + Server: GetDefaultServerOptions(), + Store: GetDefaultStoreOptions(), + Web3: GetDefaultWeb3Options(), + Services: GetDefaultServicesOptions(), + Telemetry: GetDefaultTelemetryOptions(), + Metrics: GetDefaultMetricsOptions(), + JobTimeoutSeconds: GetDefaultServeOptionInt("JOB_TIMEOUT_SECONDS", 600), // 10 minutes } options.Web3.Service = system.SolverService return options @@ -26,6 +29,10 @@ func AddSolverCliFlags(cmd *cobra.Command, options *solver.SolverOptions) { AddServicesCliFlags(cmd, &options.Services) AddTelemetryCliFlags(cmd, &options.Telemetry) AddMetricsCliFlags(cmd, &options.Metrics) + cmd.PersistentFlags().IntVar( + &options.JobTimeoutSeconds, "job-timeout-seconds", options.JobTimeoutSeconds, + `The global timeout for jobs in seconds (JOB_TIMEOUT_SECONDS)`, + ) } func CheckSolverOptions(options solver.SolverOptions) error { @@ -49,6 +56,9 @@ func CheckSolverOptions(options solver.SolverOptions) error { if err != nil { return err } + if options.JobTimeoutSeconds <= 0 { + return fmt.Errorf("JOB_TIMEOUT_SECONDS must be greater than zero") + } return nil } diff --git a/pkg/solver/controller.go b/pkg/solver/controller.go index d45bcd7d..ff53957c 100644 --- a/pkg/solver/controller.go +++ b/pkg/solver/controller.go @@ -282,7 +282,15 @@ func (controller *SolverController) solve(ctx context.Context) error { ctx, span := controller.tracer.Start(ctx, "solve") defer span.End() - // find out which deals we can make from matching the offers + // Remove expired job offers + err := controller.cancelExpiredJobs(ctx) + if err != nil { + span.SetStatus(codes.Error, "remove expired offers failed") + span.RecordError(err) + return err + } + + // Match job offers with resource offers to make deals deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.log, controller.tracer, controller.meter) if err != nil { span.SetStatus(codes.Error, "get matching deals failed") @@ -294,7 +302,7 @@ func (controller *SolverController) solve(ctx context.Context) error { Value: attribute.StringSliceValue(data.GetDealIDs(deals)), }) - // loop over each of the deals add add them to the store and emit events + // Add deals to the store, update offer states, and notify network span.AddEvent("add_deals.start") for _, deal := range deals { _, err := controller.addDeal(ctx, deal) @@ -304,6 +312,7 @@ func (controller *SolverController) solve(ctx context.Context) error { } span.AddEvent("add_deals.done") + // Report deal state metrics span.AddEvent("report_deal_metrics.start") storedDeals, err := controller.store.GetDealsAll() if err != nil { @@ -311,7 +320,13 @@ func (controller *SolverController) solve(ctx context.Context) error { span.RecordError(err) return err } - err = reportDealMetrics(ctx, controller.meter, storedDeals) + jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{Cancelled: system.BoolPointer(true)}) + if err != nil { + span.SetStatus(codes.Error, "get cancelled job offers failed") + span.RecordError(err) + return err + } + err = reportJobMetrics(ctx, controller.meter, storedDeals, jobOffers) if err != nil { span.SetStatus(codes.Error, "report deal metrics failed") span.RecordError(err) @@ -321,6 +336,65 @@ func (controller *SolverController) solve(ctx context.Context) error { return nil } +func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error { + ctx, span := controller.tracer.Start(ctx, "cancel_expired_jobs") + defer span.End() + + // Get active job offers + span.AddEvent("db.get_job_offers.start") + jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{ + Active: true, + }) + if err != nil { + controller.log.Error("get job offers failed", err) + span.SetStatus(codes.Error, "get job offers failed") + span.RecordError(err) + return err + } + span.AddEvent("db.get_job_offers.done") + + // Check active job offers, and cancel expired offers + // and associated resource offers and deals + span.AddEvent("expire_jobs.start") + expiredOffers := []string{} + expiredDeals := []string{} + for _, jobOffer := range jobOffers { + now := time.Now().UnixMilli() + if now-int64(jobOffer.JobOffer.CreatedAt) > int64(controller.options.JobTimeoutSeconds*1000) { + if jobOffer.DealID == "" { + // Cancel expired job offers + _, err := controller.updateJobOfferState(jobOffer.ID, jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut")) + if err != nil { + controller.log.Error("update expired job offer state failed", err) + span.SetStatus(codes.Error, "update expired job offer state failed") + span.RecordError(err) + } + } else { + // Cancel expired job offers, resource offers, and deals + _, err := controller.updateDealState(jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut")) + if err != nil { + controller.log.Error("update expired deal state failed", err) + span.SetStatus(codes.Error, "update expired deal state failed") + span.RecordError(err) + } + expiredDeals = append(expiredDeals, jobOffer.DealID) + } + expiredOffers = append(expiredOffers, jobOffer.ID) + } + } + span.SetAttributes(attribute.KeyValue{ + Key: "expired_job_offers", + Value: attribute.StringSliceValue(expiredOffers), + }) + span.SetAttributes(attribute.KeyValue{ + Key: "expired_deals", + Value: attribute.StringSliceValue(expiredDeals), + }) + span.AddEvent("expire_jobs.end") + + return nil +} + /* * * diff --git a/pkg/solver/matcher/matcher.go b/pkg/solver/matcher/matcher.go index 7c92a823..27191592 100644 --- a/pkg/solver/matcher/matcher.go +++ b/pkg/solver/matcher/matcher.go @@ -52,6 +52,7 @@ func GetMatchingDeals( span.AddEvent("db.get_job_offers.start") jobOffers, err := db.GetJobOffers(store.GetJobOffersQuery{ NotMatched: true, + Cancelled: system.BoolPointer(false), OrderOldestFirst: true, }) if err != nil { diff --git a/pkg/solver/metric.go b/pkg/solver/metric.go index 8ca93e31..aa00d00b 100644 --- a/pkg/solver/metric.go +++ b/pkg/solver/metric.go @@ -21,9 +21,10 @@ type metrics struct { timeoutJudgeResults metric.Int64Gauge timeoutMediateResults metric.Int64Gauge jobOfferCancelled metric.Int64Gauge + jobTimedOut metric.Int64Gauge } -type dealStats struct { +type jobStats struct { stateCounts stateCounts } @@ -40,90 +41,98 @@ type stateCounts struct { timeoutJudgeResults int64 timeoutMediateResults int64 jobOfferCancelled int64 + jobTimedOut int64 } func newMetrics(meter metric.Meter) (*metrics, error) { // Deal states dealNegotiating, err := meter.Int64Gauge( "solver.deal.state.deal_negotiating", - metric.WithDescription("Number of deals in a DealNegotiating state."), + metric.WithDescription("Number of jobs in a DealNegotiating state."), ) if err != nil { return nil, err } dealAgreed, err := meter.Int64Gauge( "solver.deal.state.deal_agreed", - metric.WithDescription("Number of deals in a DealAgreed state."), + metric.WithDescription("Number of jobs in a DealAgreed state."), ) if err != nil { return nil, err } resultsSubmitted, err := meter.Int64Gauge( "solver.deal.state.results_submitted", - metric.WithDescription("Number of deals in a ResultsSubmitted state."), + metric.WithDescription("Number of jobs in a ResultsSubmitted state."), ) if err != nil { return nil, err } resultsAccepted, err := meter.Int64Gauge( "solver.deal.state.results_accepted", - metric.WithDescription("Number of deals in a ResultsAccepted state."), + metric.WithDescription("Number of jobs in a ResultsAccepted state."), ) if err != nil { return nil, err } resultsRejected, err := meter.Int64Gauge( "solver.deal.state.results_rejected", - metric.WithDescription("Number of deals in a ResultsRejected state."), + metric.WithDescription("Number of jobs in a ResultsRejected state."), ) if err != nil { return nil, err } resultsChecked, err := meter.Int64Gauge( "solver.deal.state.results_checked", - metric.WithDescription("Number of deals in a ResultsChecked state."), + metric.WithDescription("Number of jobs in a ResultsChecked state."), ) if err != nil { return nil, err } mediationAccepted, err := meter.Int64Gauge( "solver.deal.state.mediation_accepted", - metric.WithDescription("Number of deals in a MediationAccepted state."), + metric.WithDescription("Number of jobs in a MediationAccepted state."), ) if err != nil { return nil, err } mediationRejected, err := meter.Int64Gauge( "solver.deal.state.mediation_rejected", - metric.WithDescription("Number of deals in a MediationRejected state."), + metric.WithDescription("Number of jobs in a MediationRejected state."), ) if err != nil { return nil, err } timeoutSubmitResults, err := meter.Int64Gauge( "solver.deal.state.timeout_submit_results", - metric.WithDescription("Number of deals in a TimeoutSubmitResults state."), + metric.WithDescription("Number of jobs in a TimeoutSubmitResults state."), ) if err != nil { return nil, err } timeoutJudgeResults, err := meter.Int64Gauge( "solver.deal.state.timeout_judge_results", - metric.WithDescription("Number of deals in a TimeoutJudgeResults state."), + metric.WithDescription("Number of jobs in a TimeoutJudgeResults state."), ) if err != nil { return nil, err } timeoutMediateResults, err := meter.Int64Gauge( "solver.deal.state.timeout_mediate_results", - metric.WithDescription("Number of deals in a TimeoutMediateResults state."), + metric.WithDescription("Number of jobs in a TimeoutMediateResults state."), ) if err != nil { return nil, err } jobOfferCancelled, err := meter.Int64Gauge( - "solver.deal.state.job_offer_cancelled", - metric.WithDescription("Number of deals in a JobOfferCancelled state."), + "solver.job_offer.state.job_offer_cancelled", + metric.WithDescription("Number of jobs in a JobOfferCancelled state."), + ) + if err != nil { + return nil, err + } + jobTimedOut, err := meter.Int64Gauge( + "solver.job_offer.state.job_timed_out", + metric.WithDescription("Number of jobs in a JobTimedOut state."), ) if err != nil { return nil, err @@ -142,41 +151,52 @@ func newMetrics(meter metric.Meter) (*metrics, error) { timeoutJudgeResults, timeoutMediateResults, jobOfferCancelled, + jobTimedOut, }, nil } -func reportDealMetrics(ctx context.Context, meter metric.Meter, deals []data.DealContainer) error { - var dealStats dealStats +func reportJobMetrics(ctx context.Context, meter metric.Meter, deals []data.DealContainer, jobOffers []data.JobOfferContainer) error { + var jobStats jobStats // Compute deal state counts for _, deal := range deals { switch data.GetAgreementStateString(deal.State) { case "DealNegotiating": - dealStats.stateCounts.dealNegotiating += 1 + jobStats.stateCounts.dealNegotiating += 1 case "DealAgreed": - dealStats.stateCounts.dealAgreed += 1 + jobStats.stateCounts.dealAgreed += 1 case "ResultsSubmitted": - dealStats.stateCounts.resultsSubmitted += 1 + jobStats.stateCounts.resultsSubmitted += 1 case "ResultsAccepted": - dealStats.stateCounts.resultsAccepted += 1 + jobStats.stateCounts.resultsAccepted += 1 case "ResultsRejected": - dealStats.stateCounts.resultsRejected += 1 + jobStats.stateCounts.resultsRejected += 1 case "ResultsChecked": - dealStats.stateCounts.resultsChecked += 1 + jobStats.stateCounts.resultsChecked += 1 case "MediationAccepted": - dealStats.stateCounts.mediationAccepted += 1 + jobStats.stateCounts.mediationAccepted += 1 case "MediationRejected": - dealStats.stateCounts.mediationRejected += 1 + jobStats.stateCounts.mediationRejected += 1 case "TimeoutSubmitResults": - dealStats.stateCounts.timeoutSubmitResults += 1 + jobStats.stateCounts.timeoutSubmitResults += 1 case "TimeoutJudgeResults": - dealStats.stateCounts.timeoutJudgeResults += 1 + jobStats.stateCounts.timeoutJudgeResults += 1 case "TimeoutMediateResults": - dealStats.stateCounts.timeoutMediateResults += 1 + jobStats.stateCounts.timeoutMediateResults += 1 + default: + log.Trace().Msgf("untracked deal state ID: %d", deal.State) + } + } + + // Cancelled states may only exist in the job offer + for _, offer := range jobOffers { + switch data.GetAgreementStateString(offer.State) { case "JobOfferCancelled": - dealStats.stateCounts.jobOfferCancelled += 1 + jobStats.stateCounts.jobOfferCancelled += 1 + case "JobTimedOut": + jobStats.stateCounts.jobTimedOut += 1 default: - log.Warn().Msgf("unknown deal state ID: %d", deal.State) + log.Trace().Msgf("job metrics skipped offer state ID: %d", offer.State) } } @@ -186,18 +206,19 @@ func reportDealMetrics(ctx context.Context, meter metric.Meter, deals []data.Dea log.Warn().Msgf("failed to create solver metrics: %s", err) return err } - metrics.dealNegotiating.Record(ctx, dealStats.stateCounts.dealNegotiating) - metrics.dealAgreed.Record(ctx, dealStats.stateCounts.dealAgreed) - metrics.resultsSubmitted.Record(ctx, dealStats.stateCounts.resultsSubmitted) - metrics.resultsAccepted.Record(ctx, dealStats.stateCounts.resultsAccepted) - metrics.resultsRejected.Record(ctx, dealStats.stateCounts.resultsRejected) - metrics.resultsChecked.Record(ctx, dealStats.stateCounts.resultsChecked) - metrics.mediationAccepted.Record(ctx, dealStats.stateCounts.mediationAccepted) - metrics.mediationRejected.Record(ctx, dealStats.stateCounts.mediationRejected) - metrics.timeoutSubmitResults.Record(ctx, dealStats.stateCounts.timeoutSubmitResults) - metrics.timeoutJudgeResults.Record(ctx, dealStats.stateCounts.timeoutJudgeResults) - metrics.timeoutMediateResults.Record(ctx, dealStats.stateCounts.timeoutMediateResults) - metrics.jobOfferCancelled.Record(ctx, dealStats.stateCounts.jobOfferCancelled) + metrics.dealNegotiating.Record(ctx, jobStats.stateCounts.dealNegotiating) + metrics.dealAgreed.Record(ctx, jobStats.stateCounts.dealAgreed) + metrics.resultsSubmitted.Record(ctx, jobStats.stateCounts.resultsSubmitted) + metrics.resultsAccepted.Record(ctx, jobStats.stateCounts.resultsAccepted) + metrics.resultsRejected.Record(ctx, jobStats.stateCounts.resultsRejected) + metrics.resultsChecked.Record(ctx, jobStats.stateCounts.resultsChecked) + metrics.mediationAccepted.Record(ctx, jobStats.stateCounts.mediationAccepted) + metrics.mediationRejected.Record(ctx, jobStats.stateCounts.mediationRejected) + metrics.timeoutSubmitResults.Record(ctx, jobStats.stateCounts.timeoutSubmitResults) + metrics.timeoutJudgeResults.Record(ctx, jobStats.stateCounts.timeoutJudgeResults) + metrics.timeoutMediateResults.Record(ctx, jobStats.stateCounts.timeoutMediateResults) + metrics.jobOfferCancelled.Record(ctx, jobStats.stateCounts.jobOfferCancelled) + metrics.jobTimedOut.Record(ctx, jobStats.stateCounts.jobTimedOut) return nil } diff --git a/pkg/solver/server.go b/pkg/solver/server.go index 67adeb21..4ceeedce 100644 --- a/pkg/solver/server.go +++ b/pkg/solver/server.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "slices" + "strconv" "strings" "time" @@ -242,9 +243,17 @@ func (solverServer *solverServer) getJobOffers(res corehttp.ResponseWriter, req if notMatched := req.URL.Query().Get("not_matched"); notMatched == "true" { query.NotMatched = true } - if includeCancelled := req.URL.Query().Get("include_cancelled"); includeCancelled == "true" { - query.IncludeCancelled = true + if active := req.URL.Query().Get("active"); active == "true" { + query.Active = true + } + if cancelled := req.URL.Query().Get("cancelled"); cancelled != "" { + if val, err := strconv.ParseBool(cancelled); err == nil { + query.Cancelled = &val + } else { + return nil, fmt.Errorf("invalid cancelled filter value: %s", cancelled) + } } + return solverServer.store.GetJobOffers(query) } @@ -402,6 +411,10 @@ func (solverServer *solverServer) addResult(results data.Result, res corehttp.Re if deal == nil { return nil, fmt.Errorf("deal not found") } + if deal.State == data.GetAgreementStateIndex("JobTimedOut") { + log.Trace().Msgf("attempted results post for timed out job with deal ID: %s", deal.ID) + return nil, fmt.Errorf("job with deal ID %s timed out", deal.ID) + } signerAddress, err := http.CheckSignature(req) if err != nil { log.Error().Err(err).Msgf("error checking signature") @@ -413,7 +426,7 @@ func (solverServer *solverServer) addResult(results data.Result, res corehttp.Re } err = data.CheckResult(results) if err != nil { - log.Error().Err(err).Msgf("Error checking resource offer") + log.Error().Err(err).Msgf("Error checking result for deal ID: %s", results.DealID) return nil, err } results.DealID = id @@ -586,6 +599,10 @@ func (solverServer *solverServer) uploadFiles(res corehttp.ResponseWriter, req * log.Error().Msgf("deal not found") return err } + if deal.State == data.GetAgreementStateIndex("JobTimedOut") { + log.Trace().Msgf("attempted file upload for timed out job with deal ID: %s", deal.ID) + return fmt.Errorf("job with deal ID %s timed out", deal.ID) + } signerAddress, err := http.CheckSignature(req) if err != nil { log.Error().Err(err).Msgf("error checking signature") diff --git a/pkg/solver/solver.go b/pkg/solver/solver.go index e519795b..72f58266 100644 --- a/pkg/solver/solver.go +++ b/pkg/solver/solver.go @@ -15,12 +15,13 @@ import ( ) type SolverOptions struct { - Server http.ServerOptions - Store store.StoreOptions - Web3 web3.Web3Options - Services data.ServiceConfig - Telemetry system.TelemetryOptions - Metrics system.MetricsOptions + Server http.ServerOptions + Store store.StoreOptions + Web3 web3.Web3Options + Services data.ServiceConfig + Telemetry system.TelemetryOptions + Metrics system.MetricsOptions + JobTimeoutSeconds int } type Solver struct { diff --git a/pkg/solver/store/db/db.go b/pkg/solver/store/db/db.go index f1d08581..c47b0de0 100644 --- a/pkg/solver/store/db/db.go +++ b/pkg/solver/store/db/db.go @@ -159,8 +159,25 @@ func (store *SolverStoreDatabase) GetJobOffers(query store.GetJobOffersQuery) ([ if query.NotMatched { q = q.Where("deal_id = ''") } - if !query.IncludeCancelled { - q = q.Where("state != ?", data.GetAgreementStateIndex("JobOfferCancelled")) + if query.Active { + q = q.Where("state IN (?)", []uint8{ + data.GetAgreementStateIndex("DealNegotiating"), + data.GetAgreementStateIndex("DealAgreed"), + data.GetAgreementStateIndex("ResultsSubmitted"), + }) + } + if query.Cancelled != nil { + if *query.Cancelled { + q = q.Where("state IN (?)", []uint8{ + data.GetAgreementStateIndex("JobOfferCancelled"), + data.GetAgreementStateIndex("JobTimedOut"), + }) + } else { + q = q.Where("state NOT IN (?)", []uint8{ + data.GetAgreementStateIndex("JobOfferCancelled"), + data.GetAgreementStateIndex("JobTimedOut"), + }) + } } if query.OrderOldestFirst { q = q.Order("created_at ASC") diff --git a/pkg/solver/store/memory/store.go b/pkg/solver/store/memory/store.go index 1e7ac0b9..122fccbb 100644 --- a/pkg/solver/store/memory/store.go +++ b/pkg/solver/store/memory/store.go @@ -105,9 +105,23 @@ func (s *SolverStoreMemory) GetJobOffers(query store.GetJobOffersQuery) ([]data. matching = false } } - if !query.IncludeCancelled && jobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") { + if query.Active && + jobOffer.State != data.GetAgreementStateIndex("DealNegotiating") && + jobOffer.State != data.GetAgreementStateIndex("DealAgreed") && + jobOffer.State != data.GetAgreementStateIndex("ResultsSubmitted") { matching = false } + if query.Cancelled != nil { + isCancelled := jobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") || + jobOffer.State == data.GetAgreementStateIndex("JobTimedOut") + + wantCancelled := *query.Cancelled + if wantCancelled && !isCancelled { + matching = false + } else if !wantCancelled && isCancelled { + matching = false + } + } if matching { jobOffers = append(jobOffers, *jobOffer) } diff --git a/pkg/solver/store/store.go b/pkg/solver/store/store.go index 7be644f6..64186a97 100644 --- a/pkg/solver/store/store.go +++ b/pkg/solver/store/store.go @@ -20,8 +20,13 @@ type GetJobOffersQuery struct { // we use the DealID property of the jobOfferContainer to tell if it's been matched NotMatched bool `json:"not_matched"` - // this will include cancelled job offers in the results - IncludeCancelled bool `json:"include_cancelled"` + // Active job offers are umatched or in an in-progress deal. + // This includes the DealNegotiating, DealAgreed, or ResultsSubmitted states. + Active bool `json:"in_progress"` + + // Cancelled job offers are in a JobOfferCancelled or JobTimedOut state. + // All job offers are included when Cancelled is nil. + Cancelled *bool `json:"cancelled"` // Sort job offers oldest first OrderOldestFirst bool `json:"order_oldest_first"` diff --git a/pkg/solver/store/store_test.go b/pkg/solver/store/store_test.go index 4b90bddd..f6bcd27d 100644 --- a/pkg/solver/store/store_test.go +++ b/pkg/solver/store/store_test.go @@ -14,6 +14,7 @@ import ( "github.com/lilypad-tech/lilypad/pkg/solver" "github.com/lilypad-tech/lilypad/pkg/solver/store" solverstore "github.com/lilypad-tech/lilypad/pkg/solver/store" + "github.com/lilypad-tech/lilypad/pkg/system" "golang.org/x/exp/rand" ) @@ -151,9 +152,87 @@ func TestJobOfferQuery(t *testing.T) { DealID: "", State: data.GetAgreementStateIndex("JobOfferCancelled"), }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, + }, + query: store.GetJobOffersQuery{ + Cancelled: system.BoolPointer(false), + }, + expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, + }, + { + name: "filter active offers", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmV8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ku", + State: data.GetAgreementStateIndex("DealAgreed"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("ResultsSubmitted"), + }, + { + ID: "QmW9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kw", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("ResultsAccepted"), + }, + }, + query: store.GetJobOffersQuery{ + Active: true, + }, + expected: []string{ + "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + }, + }, + { + name: "combined filters with active", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0xabcdef0123456789abcdef0123456789abcdef01", + DealID: "", + State: data.GetAgreementStateIndex("DealNegotiating"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmW9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kw", + State: data.GetAgreementStateIndex("DealAgreed"), + }, + { + ID: "QmV9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kv", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "QmU8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kt", + State: data.GetAgreementStateIndex("ResultsAccepted"), + }, }, query: store.GetJobOffersQuery{ - IncludeCancelled: false, + JobCreator: "0x1234567890123456789012345678901234567890", + NotMatched: true, + Active: true, }, expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, }, @@ -172,13 +251,50 @@ func TestJobOfferQuery(t *testing.T) { DealID: "", State: data.GetAgreementStateIndex("JobOfferCancelled"), }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, }, query: store.GetJobOffersQuery{ - IncludeCancelled: true, + Cancelled: nil, }, expected: []string{ "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + }, + }, + { + name: "filter cancelled offers only", + offers: []data.JobOfferContainer{ + { + ID: "QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetDefaultAgreementState(), + }, + { + ID: "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobOfferCancelled"), + }, + { + ID: "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", + JobCreator: "0x1234567890123456789012345678901234567890", + DealID: "", + State: data.GetAgreementStateIndex("JobTimedOut"), + }, + }, + query: store.GetJobOffersQuery{ + Cancelled: system.BoolPointer(true), + }, + expected: []string{ + "QmX9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Ky", + "QmZ9JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kz", }, }, { @@ -210,9 +326,9 @@ func TestJobOfferQuery(t *testing.T) { }, }, query: store.GetJobOffersQuery{ - JobCreator: "0x1234567890123456789012345678901234567890", - NotMatched: true, - IncludeCancelled: false, + JobCreator: "0x1234567890123456789012345678901234567890", + NotMatched: true, + Cancelled: system.BoolPointer(false), }, expected: []string{"QmY8JwJh3bYDUuAnwfpxwStjUY1nQwyhJJ4SPpdV3bZ9Kx"}, }, diff --git a/pkg/solver/testing.go b/pkg/solver/testing.go index 52690689..3ab1cf5b 100644 --- a/pkg/solver/testing.go +++ b/pkg/solver/testing.go @@ -59,9 +59,7 @@ func SetupTestStores(t *testing.T) []TestStoreConfig { func clearStoreDatabase(t *testing.T, s store.SolverStore) { // Delete job offers - jobOffers, err := s.GetJobOffers(store.GetJobOffersQuery{ - IncludeCancelled: true, - }) + jobOffers, err := s.GetJobOffers(store.GetJobOffersQuery{}) if err != nil { t.Fatalf("Failed to get existing job offers: %v", err) } @@ -124,4 +122,17 @@ func clearStoreDatabase(t *testing.T, s store.SolverStore) { t.Fatalf("Failed to remove existing match decision: %v", err) } } + + // Delete allowed resource providers + providers, err := s.GetAllowedResourceProviders() + if err != nil { + t.Fatalf("Failed to get existing allowed resource providers: %v", err) + } + + for _, provider := range providers { + err := s.RemoveAllowedResourceProvider(provider) + if err != nil { + t.Fatalf("Failed to remove existing allowed resource provider: %v", err) + } + } } diff --git a/pkg/system/lang.go b/pkg/system/lang.go new file mode 100644 index 00000000..20df5116 --- /dev/null +++ b/pkg/system/lang.go @@ -0,0 +1,5 @@ +package system + +func BoolPointer(b bool) *bool { + return &b +}