Skip to content

Commit

Permalink
fix: #98 segvault when moving jobs to dead queue
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Oct 13, 2023
1 parent fb1fec1 commit 84caae3
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
6 changes: 3 additions & 3 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,15 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
}

// moveToDeadQueue moves jobs from the pending queue to the dead queue
func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, jobErr error) (err error) {
func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, jobErr string) (err error) {
_, err = tx.Exec(ctx, "DELETE FROM neoq_jobs WHERE id = $1", j.ID)
if err != nil {
return
}

_, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error, deadline)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error(), j.Deadline)
j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr, j.Deadline)

return
}
Expand Down Expand Up @@ -491,7 +491,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
}

if job.MaxRetries != nil && job.Retries >= *job.MaxRetries {
err = p.moveToDeadQueue(ctx, tx, job, jobErr)
err = p.moveToDeadQueue(ctx, tx, job, errMsg)
return
}

Expand Down
84 changes: 81 additions & 3 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ import (
"github.com/acaloiaro/neoq/jobs"
"github.com/acaloiaro/neoq/logging"
"github.com/acaloiaro/neoq/testutils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

const (
ConcurrentWorkers = 8
)

var (
errPeriodicTimeout = errors.New("timed out waiting for periodic job")
)
var errPeriodicTimeout = errors.New("timed out waiting for periodic job")

// prepareAndCleanupDB should be run at the beginning of each test. It will check to see if the TEST_DATABASE_URL is
// present and has a valid connection string. If it does it will connect to the DB and clean up any jobs that might be
Expand Down Expand Up @@ -497,3 +496,82 @@ results_loop:
t.Error(err)
}
}

// Test_MoveJobsToDeadQueue tests that when a job's MaxRetries is reached, that the job is moved ot the dead queue successfully
func Test_MoveJobsToDeadQueue(t *testing.T) {
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
maxRetries := 0
done := make(chan bool)
defer close(done)

timeoutTimer := time.After(5 * time.Second)

ctx := context.Background()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
postgres.WithTransactionTimeout(60000))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
panic("no good")
})

err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
MaxRetries: &maxRetries,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
}

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Error(err)
}

// ensure job has fields set correctly
maxWait := time.Now().Add(30 * time.Second)
var status string
for {
if time.Now().After(maxWait) {
break
}

err = conn.
QueryRow(context.Background(), "SELECT status FROM neoq_dead_jobs WHERE id = $1", jid).
Scan(&status)

if err == nil {
break
}

if err != nil && errors.Is(err, pgx.ErrNoRows) {
time.Sleep(50 * time.Millisecond)
continue
} else if err != nil {
t.Error(err)
}
}

if status != internal.JobStatusFailed {
t.Error("should be dead")
}
}

0 comments on commit 84caae3

Please sign in to comment.