diff --git a/pkg/data/utils.go b/pkg/data/utils.go index f55a1e9d..e347f945 100644 --- a/pkg/data/utils.go +++ b/pkg/data/utils.go @@ -33,16 +33,48 @@ func GetJobOfferID(offer JobOffer) (string, error) { return CalculateCID(offer) } +func GetJobOfferContainerIDs(jobOffers []JobOfferContainer) []string { + var ids []string + for _, offer := range jobOffers { + ids = append(ids, offer.ID) + } + return ids +} + func GetResourceOfferID(offer ResourceOffer) (string, error) { offer.ID = "" return CalculateCID(offer) } +func GetResourceOfferIDs(resourceOffers []ResourceOffer) []string { + var ids []string + for _, offer := range resourceOffers { + ids = append(ids, offer.ID) + } + return ids +} + +func GetResourceOfferContainerIDs(resourceOffers []ResourceOfferContainer) []string { + var ids []string + for _, offer := range resourceOffers { + ids = append(ids, offer.ID) + } + return ids +} + func GetDealID(deal Deal) (string, error) { deal.ID = "" return CalculateCID(deal) } +func GetDealIDs(deals []Deal) []string { + var ids []string + for _, deal := range deals { + ids = append(ids, deal.ID) + } + return ids +} + func GetModuleID(module ModuleConfig) (string, error) { return CalculateCID(module) } diff --git a/pkg/options/resource-provider.go b/pkg/options/resource-provider.go index a3109edb..06e7a1cc 100644 --- a/pkg/options/resource-provider.go +++ b/pkg/options/resource-provider.go @@ -46,7 +46,7 @@ func GetDefaultResourceProviderOfferOptions() resourceprovider.ResourceProviderO // this can be populated by a config file Specs: []data.MachineSpec{}, // if an RP wants to only run certain modules they list them here - // XXX SECURITY: enforce that they are specified with specific git hashes! + // XXX SECURITY: enforce that they are specified by CID Modules: GetDefaultServeOptionStringArray("OFFER_MODULES", []string{}), // this is the default pricing mode for an RP Mode: GetDefaultPricingMode(data.FixedPrice), diff --git a/pkg/solver/controller.go b/pkg/solver/controller.go index baf53d27..2365ecb3 100644 --- a/pkg/solver/controller.go +++ b/pkg/solver/controller.go @@ -14,6 +14,8 @@ import ( "github.com/lilypad-tech/lilypad/pkg/web3/bindings/mediation" "github.com/lilypad-tech/lilypad/pkg/web3/bindings/storage" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -107,7 +109,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu ctx, CONTROL_LOOP_INTERVAL, func() error { - err := controller.solve() + err := controller.solve(ctx) if err != nil { errorChan <- err } @@ -272,20 +274,32 @@ func (controller *SolverController) registerAsSolver() error { * */ -func (controller *SolverController) solve() error { +func (controller *SolverController) solve(ctx context.Context) error { + ctx, span := controller.tracer.Start(ctx, "solve") + defer span.End() + // find out which deals we can make from matching the offers - deals, err := matcher.GetMatchingDeals(controller.store, controller.updateJobOfferState) + deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.tracer) if err != nil { + span.SetStatus(codes.Error, "get matching deals failed") + span.RecordError(err) return err } + span.SetAttributes(attribute.KeyValue{ + Key: "deal_ids", + Value: attribute.StringSliceValue(data.GetDealIDs(deals)), + }) // loop over each of the deals add add them to the store and emit events + span.AddEvent("add_deals.start") for _, deal := range deals { - _, err := controller.addDeal(deal) + _, err := controller.addDeal(ctx, deal) if err != nil { return err } } + span.AddEvent("add_deals.done") + return nil } @@ -391,31 +405,57 @@ func (controller *SolverController) removeResourceOfferByResourceProvider(ID str return nil } -func (controller *SolverController) addDeal(deal data.Deal) (*data.DealContainer, error) { +func (controller *SolverController) addDeal(ctx context.Context, deal data.Deal) (*data.DealContainer, error) { + ctx, span := controller.tracer.Start(ctx, "add_deal") + defer span.End() + + span.AddEvent("data.get_deal_id.start") id, err := data.GetDealID(deal) if err != nil { + span.SetStatus(codes.Error, "get deal ID failed") + span.RecordError(err) return nil, err } deal.ID = id + span.SetAttributes(attribute.String("deal.id", deal.ID)) + span.AddEvent("data.get_deal_id.done") controller.log.Info("add deal", deal) + span.AddEvent("store.add_deal.start") ret, err := controller.store.AddDeal(data.GetDealContainer(deal)) if err != nil { + span.SetStatus(codes.Error, "add deal to store failed") + span.RecordError(err) return nil, err } + span.AddEvent("store.add_deal.done") + + span.AddEvent("write_event.start") controller.writeEvent(SolverEvent{ EventType: DealAdded, Deal: ret, }) + span.AddEvent("write_event.done") + + span.AddEvent("update_job_offer_state.start") _, err = controller.updateJobOfferState(ret.JobOffer, ret.ID, ret.State) if err != nil { + span.SetStatus(codes.Error, "updated job offer state failed") + span.RecordError(err) return nil, err } + span.AddEvent("update_job_offer_state.done") + + span.AddEvent("update_resource_offer_state.start") _, err = controller.updateResourceOfferState(ret.ResourceOffer, ret.ID, ret.State) if err != nil { + span.SetStatus(codes.Error, "updated resource offer state failed") + span.RecordError(err) return nil, err } + span.AddEvent("update_resource_offer_state.done") + return ret, nil } diff --git a/pkg/solver/matcher/match.go b/pkg/solver/matcher/match.go index 6e5d9f09..702ead6f 100644 --- a/pkg/solver/matcher/match.go +++ b/pkg/solver/matcher/match.go @@ -1,15 +1,18 @@ package matcher import ( + "fmt" "strings" "github.com/lilypad-tech/lilypad/pkg/data" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel/attribute" ) type matchResult interface { matched() bool message() string + attributes() []attribute.KeyValue } type offersMatched struct { @@ -19,6 +22,13 @@ type offersMatched struct { func (_ offersMatched) matched() bool { return true } func (_ offersMatched) message() string { return "offers matched" } +func (result offersMatched) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + } +} type cpuMismatch struct { resourceOffer data.ResourceOffer @@ -27,6 +37,15 @@ type cpuMismatch struct { func (_ cpuMismatch) matched() bool { return false } func (_ cpuMismatch) message() string { return "did not match CPU" } +func (result cpuMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.Int("match_result.job_offer.spec.cpu", result.jobOffer.Spec.CPU), + attribute.Int("match_result.resource_offer.spec.cpu", result.resourceOffer.Spec.CPU), + } +} type gpuMismatch struct { resourceOffer data.ResourceOffer @@ -35,6 +54,15 @@ type gpuMismatch struct { func (_ gpuMismatch) matched() bool { return false } func (_ gpuMismatch) message() string { return "did not match GPU" } +func (result gpuMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.Int("match_result.job_offer.spec.gpu", result.jobOffer.Spec.GPU), + attribute.Int("match_result.resource_offer.spec.gpu", result.resourceOffer.Spec.GPU), + } +} type ramMismatch struct { resourceOffer data.ResourceOffer @@ -43,6 +71,15 @@ type ramMismatch struct { func (_ ramMismatch) matched() bool { return false } func (_ ramMismatch) message() string { return "did not match RAM" } +func (result ramMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.Int("match_result.job_offer.spec.ram", result.jobOffer.Spec.RAM), + attribute.Int("match_result.resource_offer.spec.ram", result.resourceOffer.Spec.RAM), + } +} type moduleIDError struct { resourceOffer data.ResourceOffer @@ -51,35 +88,84 @@ type moduleIDError struct { } func (_ moduleIDError) matched() bool { return false } -func (_ moduleIDError) message() string { return "error getting module ID" } +func (_ moduleIDError) message() string { return "error computing module ID" } +func (result moduleIDError) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.String("match_result.err", result.err.Error()), + attribute.String("match_result.job_offer.module.repo", result.jobOffer.Module.Repo), + attribute.String("match_result.job_offer.module.hash", result.jobOffer.Module.Hash), + } +} type moduleMismatch struct { resourceOffer data.ResourceOffer jobOffer data.JobOffer + moduleID string } func (_ moduleMismatch) matched() bool { return false } -func (_ moduleMismatch) message() string { return "did not match modules" } +func (_ moduleMismatch) message() string { return "resource provider does not provide module" } +func (result moduleMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.String("match_result.module_id", result.moduleID), + attribute.StringSlice("match_result.resource_offer.modules", result.resourceOffer.Modules), + attribute.String("match_result.job_offer.module.repo", result.jobOffer.Module.Repo), + attribute.String("match_result.job_offer.module.hash", result.jobOffer.Module.Hash), + } +} type marketPriceUnavailable struct { resourceOffer data.ResourceOffer - jobOffer data.JobOffer } func (_ marketPriceUnavailable) matched() bool { return false } func (_ marketPriceUnavailable) message() string { - return "do not support market priced resource offers" + return "no support for market priced resource offers" +} +func (result marketPriceUnavailable) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.String("match_result.resource_offer.mode", string(result.resourceOffer.Mode)), + } } type priceMismatch struct { resourceOffer data.ResourceOffer jobOffer data.JobOffer + moduleID string } func (_ priceMismatch) matched() bool { return false } func (_ priceMismatch) message() string { return "fixed price job offer cannot afford resource offer" } +func (result priceMismatch) attributes() []attribute.KeyValue { + // If the module instruction price is not specified, this lookup will use the zero-value of 0 + moduleInstructionPrice := result.resourceOffer.ModulePricing[result.moduleID].InstructionPrice + + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.Int("match_result.job_offer.pricing.instruction_price", int(result.jobOffer.Pricing.InstructionPrice)), + attribute.Int("match_result.resource_offer.module_pricing.instruction_price", int(moduleInstructionPrice)), + attribute.Int("match_result.resource_offer.default_pricing.instruction_price", int(result.resourceOffer.DefaultPricing.InstructionPrice)), + attribute.String("match_result.job_offer.mode", string(result.jobOffer.Mode)), + attribute.String("match_result.resource_offer.mode", string(result.resourceOffer.Mode)), + attribute.String("match_result.module_id", result.moduleID), + attribute.StringSlice("match_result.resource_offer.modules", result.resourceOffer.Modules), + attribute.String("match_result.job_offer.module.repo", result.jobOffer.Module.Repo), + attribute.String("match_result.job_offer.module.hash", result.jobOffer.Module.Hash), + } +} type mediatorMismatch struct { resourceOffer data.ResourceOffer @@ -88,6 +174,15 @@ type mediatorMismatch struct { func (_ mediatorMismatch) matched() bool { return false } func (_ mediatorMismatch) message() string { return "no matching mutual mediators" } +func (result mediatorMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.StringSlice("match_result.job_offer.services.mediator", result.jobOffer.Services.Mediator), + attribute.StringSlice("match_result.resource_offer.services.mediator", result.resourceOffer.Services.Mediator), + } +} type solverMismatch struct { resourceOffer data.ResourceOffer @@ -96,6 +191,15 @@ type solverMismatch struct { func (_ solverMismatch) matched() bool { return false } func (_ solverMismatch) message() string { return "no matching solver" } +func (result solverMismatch) attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("match_result", fmt.Sprintf("%T", result)), + attribute.Bool("match_result.matched", result.matched()), + attribute.String("match_result.message", result.message()), + attribute.String("match_result.job_offer.services.solver", result.jobOffer.Services.Solver), + attribute.String("match_result.resource_offer.services.solver", result.resourceOffer.Services.Solver), + } +} // the most basic of matchers // basically just check if the resource offer >= job offer cpu, gpu & ram @@ -123,16 +227,17 @@ func matchOffers( } } + moduleID, err := data.GetModuleID(jobOffer.Module) + if err != nil { + return &moduleIDError{ + jobOffer: jobOffer, + resourceOffer: resourceOffer, + err: err, + } + } + // if the resource provider has specified modules then check them if len(resourceOffer.Modules) > 0 { - moduleID, err := data.GetModuleID(jobOffer.Module) - if err != nil { - return &moduleIDError{ - jobOffer: jobOffer, - resourceOffer: resourceOffer, - err: err, - } - } // if the resourceOffer.Modules array does not contain the moduleID then we don't match hasModule := false for _, module := range resourceOffer.Modules { @@ -146,6 +251,7 @@ func matchOffers( return &moduleMismatch{ jobOffer: jobOffer, resourceOffer: resourceOffer, + moduleID: moduleID, } } } @@ -153,7 +259,6 @@ func matchOffers( // we don't currently support market priced resource offers if resourceOffer.Mode == data.MarketPrice { return &marketPriceUnavailable{ - jobOffer: jobOffer, resourceOffer: resourceOffer, } } @@ -164,6 +269,7 @@ func matchOffers( return &priceMismatch{ jobOffer: jobOffer, resourceOffer: resourceOffer, + moduleID: moduleID, } } } @@ -232,7 +338,7 @@ func logMatch(result matchResult) { case marketPriceUnavailable: log.Trace(). Str("resource offer", r.resourceOffer.ID). - Str("job offer", r.jobOffer.ID). + Str("pricing mode", string(r.resourceOffer.Mode)). Msg(r.message()) case priceMismatch: log.Trace(). diff --git a/pkg/solver/matcher/matcher.go b/pkg/solver/matcher/matcher.go index 992539ea..053edca4 100644 --- a/pkg/solver/matcher/matcher.go +++ b/pkg/solver/matcher/matcher.go @@ -1,12 +1,17 @@ package matcher import ( + "context" + "errors" "sort" "github.com/lilypad-tech/lilypad/pkg/data" "github.com/lilypad-tech/lilypad/pkg/solver/store" "github.com/lilypad-tech/lilypad/pkg/system" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type ListOfResourceOffers []data.ResourceOffer @@ -18,31 +23,54 @@ func (a ListOfResourceOffers) Less(i, j int) bool { func (a ListOfResourceOffers) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func GetMatchingDeals( + ctx context.Context, db store.SolverStore, updateJobOfferState func(string, string, uint8) (*data.JobOfferContainer, error), + tracer trace.Tracer, ) ([]data.Deal, error) { + ctx, span := tracer.Start(ctx, "get_matching_deals") + defer span.End() + deals := []data.Deal{} + // Get resource offers + span.AddEvent("db.get_resource_offers.start") resourceOffers, err := db.GetResourceOffers(store.GetResourceOffersQuery{ NotMatched: true, }) if err != nil { + span.SetStatus(codes.Error, "get resource offers failed") + span.RecordError(err) return nil, err } + span.SetAttributes(attribute.KeyValue{ + Key: "resource_offers", + Value: attribute.StringSliceValue(data.GetResourceOfferContainerIDs(resourceOffers)), + }) + span.AddEvent("db.get_resource_offers.done") + // Get job offers + span.AddEvent("db.get_job_offers.start") jobOffers, err := db.GetJobOffers(store.GetJobOffersQuery{ NotMatched: true, }) if err != nil { + span.SetStatus(codes.Error, "get job offers failed") + span.RecordError(err) return nil, err } + span.SetAttributes(attribute.KeyValue{ + Key: "job_offers", + Value: attribute.StringSliceValue(data.GetJobOfferContainerIDs(jobOffers)), + }) + span.AddEvent("db.get_resource_offers.done") // loop over job offers for _, jobOffer := range jobOffers { // Check for targeted jobs if jobOffer.JobOffer.Target.Address != "" { - deal, err := getTargetedDeal(db, jobOffer, updateJobOfferState) + deal, err := getTargetedDeal(ctx, db, jobOffer, updateJobOfferState, tracer) if err != nil { return nil, err } @@ -56,27 +84,52 @@ func GetMatchingDeals( // loop over resource offers matchingResourceOffers := []data.ResourceOffer{} for _, resourceOffer := range resourceOffers { + _, matchSpan := tracer.Start(ctx, "match", + trace.WithAttributes(attribute.String("job_offer.id", jobOffer.ID), + attribute.String("resource_offer.id", resourceOffer.ID)), + ) + + matchSpan.AddEvent("db.get_match_decision.start") decision, err := db.GetMatchDecision(resourceOffer.ID, jobOffer.ID) if err != nil { + matchSpan.SetStatus(codes.Error, "unable to retrieve match decision") + matchSpan.RecordError(err) return nil, err } + matchSpan.AddEvent("db.get_match_decision.done") // if this exists it means we've already tried to match the two elements and should not try again if decision != nil { + matchSpan.AddEvent("decision_already_checked", + trace.WithAttributes(attribute.Bool("decision.result", decision.Result))) + matchSpan.End() continue } + matchSpan.AddEvent("match_offers.start") result := matchOffers(resourceOffer.ResourceOffer, jobOffer.JobOffer) logMatch(result) + matchSpan.AddEvent("match_offers.done", trace.WithAttributes(result.attributes()...)) if result.matched() { matchingResourceOffers = append(matchingResourceOffers, resourceOffer.ResourceOffer) + matchSpan.AddEvent("append_match", + trace.WithAttributes(attribute.KeyValue{ + Key: "matching_resource_offers", + Value: attribute.StringSliceValue(data.GetResourceOfferIDs(matchingResourceOffers)), + })) } else { + matchSpan.AddEvent("add_match_decision.start") _, err := db.AddMatchDecision(resourceOffer.ID, jobOffer.ID, "", false) if err != nil { + matchSpan.SetStatus(codes.Error, "unable to record mismatch decision") + matchSpan.RecordError(err) return nil, err } + matchSpan.AddEvent("add_match_decision.done") } + + matchSpan.End() } // yay - we've got some matching resource offers @@ -84,12 +137,20 @@ func GetMatchingDeals( if len(matchingResourceOffers) > 0 { // now let's order the matching resource offers by price sort.Sort(ListOfResourceOffers(matchingResourceOffers)) - cheapestResourceOffer := matchingResourceOffers[0] + + span.AddEvent("get_deal.start", trace.WithAttributes(attribute.String("cheapest_resource_offer", cheapestResourceOffer.ID), + attribute.KeyValue{ + Key: "matching_resource_offers", + Value: attribute.StringSliceValue(data.GetResourceOfferIDs(matchingResourceOffers)), + })) deal, err := data.GetDeal(jobOffer.JobOffer, cheapestResourceOffer) if err != nil { + span.SetStatus(codes.Error, "unable to get deal") + span.RecordError(err) return nil, err } + span.AddEvent("get_deal.done", trace.WithAttributes(attribute.String("deal.id", deal.ID))) // add the match decision for this job offer for _, matchingResourceOffer := range matchingResourceOffers { @@ -99,13 +160,22 @@ func GetMatchingDeals( addDealID = deal.ID } + span.AddEvent("add_match_decision.start") _, err := db.AddMatchDecision(matchingResourceOffer.ID, jobOffer.ID, addDealID, true) if err != nil { + span.SetStatus(codes.Error, "unable to add match decision") + span.RecordError(err) return nil, err } + span.AddEvent("add_match_decision.done") } deals = append(deals, deal) + span.AddEvent("append_deal", + trace.WithAttributes(attribute.KeyValue{ + Key: "deals", + Value: attribute.StringSliceValue(data.GetDealIDs(deals)), + })) } } @@ -121,10 +191,17 @@ func GetMatchingDeals( // See if our jobOffer targets a specific address. If so, we will create a deal automatically // with the matcing resourceOffer. func getTargetedDeal( + ctx context.Context, db store.SolverStore, jobOffer data.JobOfferContainer, updateJobOfferState func(string, string, uint8) (*data.JobOfferContainer, error), + tracer trace.Tracer, ) (*data.Deal, error) { + ctx, span := tracer.Start(ctx, "get_targeted_deal", + trace.WithAttributes(attribute.String("job_offer.target.address", jobOffer.JobOffer.Target.Address))) + defer span.End() + + span.AddEvent("db.get_resource_offer_by_address.start") resourceOffer, err := db.GetResourceOfferByAddress(jobOffer.JobOffer.Target.Address) if err != nil { return nil, err @@ -136,10 +213,22 @@ func getTargetedDeal( Str("job offer", jobOffer.ID). Str("target address", jobOffer.JobOffer.Target.Address). Msgf("No resource provider found for address") + span.SetStatus(codes.Error, "no resource provider found for address") + span.RecordError(errors.New("no resource provider found for address")) + updateJobOfferState(jobOffer.ID, "", data.GetAgreementStateIndex("JobOfferCancelled")) return nil, nil } + span.AddEvent("db.get_resource_offer_by_address.found", trace.WithAttributes(attribute.String("resource_offer.id", resourceOffer.ID))) + span.AddEvent("get_deal.start") deal, err := data.GetDeal(jobOffer.JobOffer, resourceOffer.ResourceOffer) - return &deal, err + if err != nil { + span.SetStatus(codes.Error, "get deal failed") + span.RecordError(err) + return nil, err + } + span.AddEvent("get_deal.done", trace.WithAttributes(attribute.String("deal.id", deal.ID))) + + return &deal, nil }