Skip to content

Commit

Permalink
dbjobqueue: Remove root job and unused dependencies
Browse files Browse the repository at this point in the history
This adds SQL to delete jobs and dependencies, and implements the
database version of the RemoveJobs function.

Related: RHEL-60120
  • Loading branch information
bcl committed Sep 25, 2024
1 parent 9089001 commit 4b67782
Showing 1 changed file with 90 additions and 4 deletions.
94 changes: 90 additions & 4 deletions pkg/jobqueue/dbjobqueue/dbjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const (
SET started_at = NULL, token = NULL, retries = retries + 1
WHERE id = $1 AND started_at IS NOT NULL AND finished_at IS NULL`

sqlDelete = `
DELETE FROM jobs
WHERE id = $1`

sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
sqlQueryDependencies = `
SELECT dependency_id
Expand All @@ -70,6 +74,9 @@ const (
SELECT job_id
FROM job_dependencies
WHERE dependency_id = $1`
sqlDeleteDependencies = `
DELETE FROM job_dependencies
WHERE job_id = $1 AND dependency_id = $2`

sqlQueryJob = `
SELECT type, args, channel, started_at, finished_at, retries, canceled
Expand Down Expand Up @@ -873,6 +880,39 @@ func (q *DBJobQueue) jobDependents(ctx context.Context, conn connection, id uuid
return dependents, nil
}

// deleteJob removes the job from the database
// the CASCADE constraint will also delete any entries from the job_dependencies table
func (q *DBJobQueue) deleteJob(jobID uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()

_, err = conn.Exec(context.Background(), sqlDelete, jobID)
if err != nil {
q.logger.Error(err, "Error deleting job")
return err
}
return nil
}

// deleteJobDependencies removes job dependencies
func (q *DBJobQueue) deleteJobDependencies(jobID, dependencyID uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()

_, err = conn.Exec(context.Background(), sqlDeleteDependencies, jobID, dependencyID)
if err != nil {
q.logger.Error(err, "Error deleting dependencies")
return err
}
return nil
}

// AllJobIDs returns a list of all job UUIDs that the worker knows about
func (q *DBJobQueue) AllJobIDs() (jobs []uuid.UUID, err error) {
conn, err := q.pool.Acquire(context.Background())
Expand Down Expand Up @@ -933,9 +973,55 @@ func (q *DBJobQueue) AllRootJobIDs() (rootJobs []uuid.UUID, err error) {
return
}

// RemoveJob deletes a job from the database
func (q *DBJobQueue) RemoveJob(uuid.UUID) error {
// TODO write this
// RemoveJob removes a job and all of its dependencies from the database
// If a dependency has multiple dependents it will only remove the parent job from
// the dependents list for that job instead of removing it.
//
// This assumes that the jobs have been created correctly, and that they have
// no dependency loops. Shared Dependents are ok, but a job cannot have a dependency
// on any of its parents (this should never happen).
func (q *DBJobQueue) RemoveJob(id uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()

return nil
// Start it off with an empty parent
return q.removeJob(conn, uuid.UUID{}, id)
}

// removeJob will remove jobs as far down the list as possible
// missing dependencies are ignored, it deletes as much as it can.
// A missing parent (the first call) will be returned as an error
func (q *DBJobQueue) removeJob(conn connection, parent, id uuid.UUID) error {
// Delete parent:id dependencies if they exist
if len(parent.String()) > 0 {
err := q.deleteJobDependencies(parent, id)
if err != nil {
return err
}
}

// Get the list of dependents for this id
dependents, err := q.jobDependents(context.Background(), conn, id)
if err != nil {
return err
}

// If this is > 0 then we are done, cannot delete further
if len(dependents) > 0 {
return nil
}

// Nothing depends on this job, recursively remove the dependencies
deps, err := q.jobDependencies(context.Background(), conn, id)
if err != nil {
return err
}
for _, d := range deps {
_ = q.removeJob(conn, id, d)
}

return q.deleteJob(id)
}

0 comments on commit 4b67782

Please sign in to comment.