From fe0734552ea96d65222d66e755bce1cb482662aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Sch=C3=BCller?= Date: Thu, 3 Oct 2024 19:07:50 +0200 Subject: [PATCH] internal/worker/server: return an error on depsolve timeout HMS-2989 Fixes the special case that if no worker is available and we generate an internal timeout and cancel the depsolve including all followup jobs, no error was propagated. squash --- internal/cloudapi/v2/server.go | 98 +++++++++++++++------- internal/jobqueue/fsjobqueue/fsjobqueue.go | 34 ++++++++ internal/worker/clienterrors/errors.go | 1 + internal/worker/server.go | 21 +++++ pkg/jobqueue/dbjobqueue/dbjobqueue.go | 32 +++++++ pkg/jobqueue/jobqueue.go | 5 ++ 6 files changed, 160 insertions(+), 31 deletions(-) diff --git a/internal/cloudapi/v2/server.go b/internal/cloudapi/v2/server.go index 741bea64f7..3f0058148c 100644 --- a/internal/cloudapi/v2/server.go +++ b/internal/cloudapi/v2/server.go @@ -3,6 +3,7 @@ package v2 import ( "context" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -258,7 +259,7 @@ func (s *Server) enqueueCompose(irs []imageRequest, channel string) (uuid.UUID, s.goroutinesGroup.Add(1) go func() { - serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed) + serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, id, ir.manifestSeed) defer s.goroutinesGroup.Done() }() @@ -414,7 +415,7 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas // copy the image request while passing it into the goroutine to prevent data races s.goroutinesGroup.Add(1) go func(ir imageRequest) { - serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed) + serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, buildID, ir.manifestSeed) defer s.goroutinesGroup.Done() }(ir) } @@ -435,23 +436,81 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas return id, nil } -func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, seed int64) { - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) +func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, osbuildJobID uuid.UUID, seed int64) { + // prepared to become a config variable + const depsolveTimeout = 5 + ctx, cancel := context.WithTimeout(ctx, time.Minute*depsolveTimeout) defer cancel() - // wait until job is in a pending state - var token uuid.UUID + jobResult := &worker.ManifestJobByIDResult{ + Manifest: nil, + ManifestInfo: worker.ManifestInfo{ + OSBuildComposerVersion: common.BuildVersion(), + }, + } + var dynArgs []json.RawMessage var err error + token := uuid.Nil logWithId := logrus.WithField("jobId", manifestJobID) + + defer func() { + // token == uuid.Nil indicates that no worker even started processing + if token == uuid.Nil { + if jobResult.JobError != nil { + // set all jobs to "failed" + jobs := map[string]uuid.UUID{ + "depsolve": depsolveJobID, + "containerResolve": containerResolveJobID, + "ostreeResolve": ostreeResolveJobID, + "manifest": manifestJobID, + "osbuild": osbuildJobID, + } + + for jobName, jobID := range jobs { + if jobID != uuid.Nil { + err := workers.SetFailed(jobID, jobResult.JobError) + if err != nil { + logWithId.Errorf("Error failing %s job: %v", jobName, err) + } + } + } + + } else { + logWithId.Errorf("Internal error, no worker started depsolve but we didn't get a reason.") + } + } else { + result, err := json.Marshal(jobResult) + if err != nil { + logWithId.Errorf("Error marshalling manifest job results: %v", err) + } + err = workers.FinishJob(token, result) + if err != nil { + logWithId.Errorf("Error finishing manifest job: %v", err) + } + if jobResult.JobError != nil { + logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) + } + } + }() + + + // wait until job is in a pending state for { _, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID) - if err == jobqueue.ErrNotPending { + if errors.Is(err, jobqueue.ErrNotPending) { logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish") time.Sleep(time.Millisecond * 50) select { case <-ctx.Done(): - logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines") + logWithId.Warning(fmt.Sprintf("Manifest job dependencies took longer than %d minutes to finish," + + " or the server is shutting down, returning to avoid dangling routines", depsolveTimeout)) + + jobResult.JobError = clienterrors.New(clienterrors.ErrorDepsolveTimeout, + "Timeout while waiting for package dependency resolution", + "There may be a temporary issue with compute resources. " + + "We’re looking into it, please try again later.", + ) break default: continue @@ -464,13 +523,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w break } - jobResult := &worker.ManifestJobByIDResult{ - Manifest: nil, - ManifestInfo: worker.ManifestInfo{ - OSBuildComposerVersion: common.BuildVersion(), - }, - } - // add osbuild/images dependency info to job result osbuildImagesDep, err := common.GetDepModuleInfoByPath(common.OSBuildImagesModulePath) if err != nil { @@ -482,22 +534,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w jobResult.ManifestInfo.OSBuildComposerDeps = append(jobResult.ManifestInfo.OSBuildComposerDeps, osbuildImagesDepModule) } - defer func() { - if jobResult.JobError != nil { - logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) - } - - result, err := json.Marshal(jobResult) - if err != nil { - logWithId.Errorf("Error marshalling manifest job results: %v", err) - } - - err = workers.FinishJob(token, result) - if err != nil { - logWithId.Errorf("Error finishing manifest job: %v", err) - } - }() - if len(dynArgs) == 0 { reason := "No dynamic arguments" jobResult.JobError = clienterrors.New(clienterrors.ErrorNoDynamicArgs, reason, nil) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 9380319038..15a106aa61 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -403,6 +403,40 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error { + q.mu.Lock() + defer q.mu.Unlock() + + j, err := q.readJob(id) + if err != nil { + return err + } + + if !j.FinishedAt.IsZero() { + return jobqueue.ErrFinished + } + + if !j.StartedAt.IsZero() { + return jobqueue.ErrRunning + } + + j.Result, err = json.Marshal(result) + if err != nil { + return err + } + + j.StartedAt = time.Now() + j.FinishedAt = time.Now() + j.Token = uuid.New() + + err = q.db.Write(id.String(), j) + if err != nil { + return fmt.Errorf("error writing job %s: %v", id, err) + } + + return nil +} + func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { diff --git a/internal/worker/clienterrors/errors.go b/internal/worker/clienterrors/errors.go index 7d25610b06..417c0ee954 100644 --- a/internal/worker/clienterrors/errors.go +++ b/internal/worker/clienterrors/errors.go @@ -44,6 +44,7 @@ const ( ErrorJobPanicked ClientErrorCode = 37 ErrorGeneratingSignedURL ClientErrorCode = 38 ErrorInvalidRepositoryURL ClientErrorCode = 39 + ErrorDepsolveTimeout ClientErrorCode = 40 ) type ClientErrorCode int diff --git a/internal/worker/server.go b/internal/worker/server.go index 665e835d5e..c64e3ce392 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -557,6 +557,27 @@ func (s *Server) Cancel(id uuid.UUID) error { return s.jobs.CancelJob(id) } +// SetFailed sets the given job id to "failed" with the given error +func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error { + /* create a separate metrics? + jobInfo, err := s.jobInfo(id, nil) + if err != nil { + logrus.Errorf("error getting job status: %v", err) + } else { + prometheus.CancelJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel) + }*/ + FailedJobErrorResult := JobResult{ + JobError: error, + } + + res, err := json.Marshal(FailedJobErrorResult) + if err != nil { + logrus.Errorf("error marshalling the error: %v", err) + return nil + } + return s.jobs.FailJob(id, res) +} + // Provides access to artifacts of a job. Returns an io.Reader for the artifact // and the artifact's size. func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 81135e66bb..058407bea1 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -94,6 +94,12 @@ const ( WHERE id = $1 AND finished_at IS NULL RETURNING type, started_at` + sqlFailJob = ` + UPDATE jobs + SET token = $2, started_at = now(), finished_at = now(), result = $3 + WHERE id = $1 AND finished_at IS NULL AND started_at IS NULL AND token IS NULL + RETURNING id, type` + sqlInsertHeartbeat = ` INSERT INTO heartbeats(token, id, heartbeat) VALUES ($1, $2, now())` @@ -591,6 +597,32 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return fmt.Errorf("error connecting to database: %w", err) + } + defer conn.Release() + + var jobType string + var resultId uuid.UUID + dummyToken := uuid.New() + err = conn.QueryRow(context.Background(), sqlFailJob, id, dummyToken, result).Scan(&resultId, &jobType) + if errors.Is(err, pgx.ErrNoRows) { + return jobqueue.ErrNotRunning + } + if err != nil { + return fmt.Errorf("error failing job %s: %w", id, err) + } + if id != resultId { + return fmt.Errorf("that should never happen, I wanted to set %s to failed but got %s back from DB", id, resultId) + } + + q.logger.Info("Job set to failed", "job_type", jobType, "job_id", id.String()) + + return nil +} + func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index cb88ee39a4..f43063e70d 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -59,6 +59,9 @@ type JobQueue interface { // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error + // Fail a job that didn't even start (e.g. no worker available) + FailJob(id uuid.UUID, result interface{}) error + // If the job has finished, returns the result as raw JSON. // // Returns the current status of the job, in the form of three times: @@ -114,6 +117,8 @@ var ( ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ErrActiveJobs = errors.New("worker has active jobs associated with it") ErrWorkerNotExist = errors.New("worker does not exist") + ErrRunning = errors.New("job is running, but wasn't expected to be") + ErrFinished = errors.New("job is finished, but wasn't expected to be") ) type Worker struct {