diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 8d2b465..048e1a5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -649,7 +649,7 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // notify listeners that a job is ready to run - _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", queue, jobID)) + _, err = tx.Exec(ctx, fmt.Sprintf(`SELECT pg_notify('%s', '%s')`, queue, jobID)) if err != nil { return } @@ -761,13 +761,13 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connnection", "queue", queue, "error", err) + p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err) return } defer p.release(ctx, conn, queue) // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) + _, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) p.logger.Error("unable to configure listener connection", "queue", queue, "error", err) @@ -779,7 +779,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) - p.logger.Debug("job notification for queue", "queue", queue, "notification", notification) + p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return @@ -802,7 +802,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re } func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { - query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %s", p.config.IdleTransactionTimeout, queue) + query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %q", p.config.IdleTransactionTimeout, queue) _, err := conn.Exec(ctx, query) if err != nil { if errors.Is(err, context.Canceled) { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index bf8b50e..df5c8a2 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -577,13 +577,13 @@ func Test_MoveJobsToDeadQueue(t *testing.T) { } func TestJobEnqueuedSeparately(t *testing.T) { - connString, conn := prepareAndCleanupDB(t) + connString, _ := prepareAndCleanupDB(t) const queue = "SyncThing" maxRetries := 5 done := make(chan bool) defer close(done) - timeoutTimer := time.After(30 * time.Second) + timeoutTimer := time.After(5 * time.Second) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -613,22 +613,17 @@ func TestJobEnqueuedSeparately(t *testing.T) { return }) - go func() { - err = consumer.Start(ctx, h) - if err != nil { - t.Error(err) - } - }() + err = consumer.Start(ctx, h) + if err != nil { + t.Error(err) + } // Wait a bit more before enqueueing - time.Sleep(10 * time.Second) - deadline := time.Now().UTC().Add(5 * time.Second) jid, e := enqueuer.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello world", }, - Deadline: &deadline, MaxRetries: &maxRetries, }) if e != nil || jid == jobs.DuplicateJobID { @@ -643,26 +638,4 @@ func TestJobEnqueuedSeparately(t *testing.T) { if err != nil { t.Error(err) } - - // ensure job has fields set correctly - var jdl time.Time - var jmxrt int - - err = conn. - QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid). - Scan(&jdl, &jmxrt) - if err != nil { - t.Error(err) - } - - jdl = jdl.In(time.UTC) - // dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as - // strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html - if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) { - t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113 - } - - if jmxrt != maxRetries { - t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113 - } }