diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..ede3569f1ac 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -892,11 +892,20 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { } } - // Delete the job - if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID); serr != nil { - logger.Errorw("Failed to delete the job", "err", serr) + if j.Type == job.Workflow { + // Delete workflow + if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID+1000000000); serr != nil { + logger.Errorw("Failed to delete the job", "err", serr) - return errors.Wrap(serr, "DeleteJob failed") + return errors.Wrap(serr, "DeleteJob failed") + } + } else { + // Delete the job + if serr = s.jobSpawner.DeleteJob(ctx, tx.ds, existingJobID); serr != nil { + logger.Errorw("Failed to delete the job", "err", serr) + + return errors.Wrap(serr, "DeleteJob failed") + } } } diff --git a/core/services/job/orm.go b/core/services/job/orm.go index a86da5f7111..6741e9dc142 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -710,11 +710,38 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error { // DeleteJob removes a job func (o *orm) DeleteJob(ctx context.Context, id int32) error { + if id >= 1000000000 { + id -= 1000000000 + o.lggr.Debugw("Deleting Workflow job", "jobID", id) + ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), 10*time.Minute) + defer cancel() + query := ` + WITH deleted_jobs AS ( + DELETE FROM jobs WHERE id = $1 RETURNING + id, + workflow_spec_id + ), + DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)` + res, err := o.ds.ExecContext(ctx, query, id) + if err != nil { + return errors.Wrap(err, "DeleteJob failed to delete workflow job") + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return errors.Wrap(err, "DeleteJob failed getting RowsAffected") + } + if rowsAffected == 0 { + return sql.ErrNoRows + } + o.lggr.Debugw("Deleted job", "jobID", id) + return nil + } o.lggr.Debugw("Deleting job", "jobID", id) // Added a 1-minute timeout to this query since this can take a long time as data increases. // This was added specifically due to an issue with a database that had a million of pipeline_runs and pipeline_task_runs // and this query was taking ~40secs. - ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute) + // TODO: KS-489 - Remove this timeout once we have a better solution for this. + ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), 10*time.Minute) defer cancel() query := ` WITH deleted_jobs AS ( diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 16889cbe10b..9a7dd47738c 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -275,6 +275,10 @@ func (js *spawner) CreateJob(ctx context.Context, ds sqlutil.DataSource, jb *Job // Should not get called before Start() func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID int32) error { + originalJobID := jobID + if jobID >= 1000000000 { + jobID -= 1000000000 + } if ds == nil { ds = js.orm.DataSource() } @@ -315,7 +319,7 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i lggr.Debugw("Callback: BeforeDeleteJob done") err := sqlutil.Transact(ctx, js.orm.WithDataSource, ds, nil, func(tx ORM) error { - err := tx.DeleteJob(ctx, jobID) + err := tx.DeleteJob(ctx, originalJobID) if err != nil { js.lggr.Errorw("Error deleting job", "jobID", jobID, "err", err) return err