Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an infinite loop when jobs are scheduled past their deadline #125

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ var (
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrConnectionStringEmpty = errors.New("connection string cannot be empty")
ErrDuplicateJob = errors.New("duplicate job")
ErrNoTransactionInContext = errors.New("context does not have a Tx set")
ErrExceededConnectionPoolTimeout = errors.New("exceeded timeout acquiring a connection from the pool")
ErrUnsupportedURIScheme = errors.New("only postgres:// and postgresql:// scheme URIs are supported, invalid connection string")
)

// PgBackend is a Postgres-based Neoq backend
Expand Down Expand Up @@ -855,6 +857,17 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID))
err = p.updateJob(ctx, err)
if err != nil {
p.logger.Error("unable to update job status", "error", err, "job_id", job.ID)
return
}

err = tx.Commit(ctx)
if err != nil {
p.logger.Error("unable to update job status", "error", err, "job_id", job.ID)
return
}

return
}

Expand Down Expand Up @@ -1027,7 +1040,7 @@ func GetPQConnectionString(connectionString string) (string, error) {
}

if dbURI.String() == "" {
return "", fmt.Errorf("connection string cannot be empty")
return "", ErrConnectionStringEmpty
}

scheme := dbURI.Scheme
Expand All @@ -1038,7 +1051,7 @@ func GetPQConnectionString(connectionString string) (string, error) {

if scheme != "postgres" && scheme != "postgresql" {
// This isn't a postgresql URI-style string (postgres://hostname/db)
return "", fmt.Errorf("only postgres and postgresql scheme URIs are supported, invalid connection string: %s", connectionString)
return "", ErrUnsupportedURIScheme
}

sslMode := "verify-ca"
Expand Down
39 changes: 34 additions & 5 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestGetPQConnectionString(t *testing.T) {
{
name: "multiple hostnames",
input: "postgres://username:password@hostname1,hostname2,hostname3:5432/database",
want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations",
want: "postgres://username:password@hostname1,hostname2,hostname3:5432/database?sslmode=require&x-migrations-table=neoq_schema_migrations", // nolint: lll
wantErr: false,
},

Expand Down Expand Up @@ -1004,8 +1004,9 @@ func TestGetPQConnectionString(t *testing.T) {
// with an error indicating that its deadline was not met
// https://github.com/acaloiaro/neoq/issues/123
func TestJobWithPastDeadline(t *testing.T) {
connString, _ := prepareAndCleanupDB(t)
connString, conn := prepareAndCleanupDB(t)
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
maxRetries := 5
done := make(chan bool)
defer close(done)
Expand All @@ -1018,7 +1019,6 @@ func TestJobWithPastDeadline(t *testing.T) {
defer nq.Shutdown(ctx)

h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

Expand All @@ -1038,10 +1038,39 @@ func TestJobWithPastDeadline(t *testing.T) {
MaxRetries: &maxRetries,
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
t.Error(e) // nolint: goerr113
}

if e != nil && !errors.Is(e, jobs.ErrJobExceededDeadline) {
t.Error(err)
t.Error(err) // nolint: goerr113
}

var status string
go func() {
// ensure job has failed/has the correct status
for {
err = conn.
QueryRow(context.Background(), "SELECT status FROM neoq_jobs WHERE id = $1", jid).
Scan(&status)
if err != nil {
break
}

if status == internal.JobStatusFailed {
done <- true
break
}

time.Sleep(50 * time.Millisecond)
}
}()

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}
if err != nil {
t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status)
}
}
Loading