Skip to content

Commit

Permalink
Extend job delete DB timeout to 10 minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk committed Nov 12, 2024
1 parent 275cf37 commit 4a9775a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
17 changes: 13 additions & 4 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,11 +892,20 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
}
}

// Delete the job
if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID); serr != nil {
logger.Errorw("Failed to delete the job", "err", serr)
if j.Type == job.Workflow {
// Delete workflow
if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID+1000000000); serr != nil {
logger.Errorw("Failed to delete the job", "err", serr)

return errors.Wrap(serr, "DeleteJob failed")
return errors.Wrap(serr, "DeleteJob failed")
}
} else {
// Delete the job
if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID); serr != nil {
logger.Errorw("Failed to delete the job", "err", serr)

return errors.Wrap(serr, "DeleteJob failed")
}
}
}

Expand Down
29 changes: 28 additions & 1 deletion core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,38 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error {

// DeleteJob removes a job
func (o *orm) DeleteJob(ctx context.Context, id int32) error {
if id >= 1000000000 {
id -= 1000000000
o.lggr.Debugw("Deleting Workflow job", "jobID", id)
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), 10*time.Minute)

Check failure on line 716 in core/services/job/orm.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "ctx" shadows declaration at line 712 (govet)
defer cancel()
query := `
WITH deleted_jobs AS (
DELETE FROM jobs WHERE id = $1 RETURNING
id,
workflow_spec_id
),
DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)`
res, err := o.ds.ExecContext(ctx, query, id)
if err != nil {
return errors.Wrap(err, "DeleteJob failed to delete workflow job")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return errors.Wrap(err, "DeleteJob failed getting RowsAffected")
}
if rowsAffected == 0 {
return sql.ErrNoRows
}
o.lggr.Debugw("Deleted job", "jobID", id)
return nil
}
o.lggr.Debugw("Deleting job", "jobID", id)
// Added a 1-minute timeout to this query since this can take a long time as data increases.
// This was added specifically due to an issue with a database that had a million of pipeline_runs and pipeline_task_runs
// and this query was taking ~40secs.
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute)
// TODO: KS-489 - Remove this timeout once we have a better solution for this.
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), 10*time.Minute)
defer cancel()
query := `
WITH deleted_jobs AS (
Expand Down
6 changes: 5 additions & 1 deletion core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (js *spawner) CreateJob(ctx context.Context, ds sqlutil.DataSource, jb *Job

// Should not get called before Start()
func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID int32) error {
originalJobID := jobID
if jobID >= 1000000000 {
jobID -= 1000000000
}
if ds == nil {
ds = js.orm.DataSource()
}
Expand Down Expand Up @@ -315,7 +319,7 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i
lggr.Debugw("Callback: BeforeDeleteJob done")

err := sqlutil.Transact(ctx, js.orm.WithDataSource, ds, nil, func(tx ORM) error {
err := tx.DeleteJob(ctx, jobID)
err := tx.DeleteJob(ctx, originalJobID)
if err != nil {
js.lggr.Errorw("Error deleting job", "jobID", jobID, "err", err)
return err
Expand Down

0 comments on commit 4a9775a

Please sign in to comment.