From c09d476bdf8d56d70aba9f52debc72531df05a3d Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Wed, 27 Sep 2023 16:45:48 -0500 Subject: [PATCH 1/4] chore(test): Adding a test for multiple consumers with the pg backend. This test is to provide a minimal proof that jobs can be consumed by multiple workers and in a way can only be consumed once. If the execCount does not match the expected count then this test will fail because either too many jobs were executed (like one executing twice) or a job was dropped when it should not have been. --- backends/postgres/postgres_backend_test.go | 77 ++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 45c69b5..5412617 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -6,6 +6,8 @@ import ( "fmt" "os" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -114,6 +116,81 @@ func TestBasicJobProcessing(t *testing.T) { }) } +func TestMultipleProcessors(t *testing.T) { + const queue = "testing" + + connString := os.Getenv("TEST_DATABASE_URL") + if connString == "" { + t.Skip("Skipping: TEST_DATABASE_URL not set") + return + } + + t.Cleanup(func() { + flushDB() + }) + + var execCount uint32 + var wg sync.WaitGroup + count := 8 + neos := make([]neoq.Neoq, 0, count) + // Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different + // workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers. + for i := 0; i < count; i++ { + ctx := context.Background() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + nq.Shutdown(ctx) + }) + + h := handler.New(queue, func(_ context.Context) (err error) { + // Make sure that by wasting some time working on a thing we don't consume two jobs back to back. + // This should give the other neoq clients enough time to grab a job as well. + time.Sleep(500 * time.Millisecond) + atomic.AddUint32(&execCount, 1) + wg.Done() + return + }) + // Make sure that each neoq worker only works on one thing at a time. + h.Concurrency = 1 + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + neos = append(neos, nq) + } + + // From one of the neoq clients, enqueue several jobs. At least one per processor registered above. + nq := neos[0] + for i := 0; i < count; i++ { + wg.Add(1) + ctx := context.Background() + deadline := time.Now().UTC().Add(10 * time.Second) + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world: %d", i), + }, + Deadline: &deadline, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + } + + // Wait for all jobs to complete. + wg.Wait() + + // Make sure that we executed the expected number of jobs. + if execCount != uint32(count) { + t.Fatalf("mismatch number of executions. Expected: %d Found: %d", count, execCount) + } +} + // TestDuplicateJobRejection tests that the backend rejects jobs that are duplicates func TestDuplicateJobRejection(t *testing.T) { const queue = "testing" From 2755e68b8a9dcd69196c1763e1c33188102bd433 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Wed, 27 Sep 2023 19:00:00 -0500 Subject: [PATCH 2/4] fix(ci): Fixed github actions tests on PRs --- .github/workflows/test.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d8fdd5b..ae1f325 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,5 +1,11 @@ name: test -on: [push] +on: + push: + branches: + - main + pull_request: + branches: + - main concurrency: group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}' From f59d0f742308168b2998625334e84b746b31df70 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 29 Sep 2023 11:15:34 -0600 Subject: [PATCH 3/4] feat: Announce jobs using PG trigger Previously, new, non-future jobs were announced by executing `NOTIFY` in Go code. Triggers are much better suited for this, and reduces neoq complexity by allowing PG to perform notification work for most jobs. "Future" jobs continue to use the `announceJob` method on a timer. --- ...30928141356_create-new-job-trigger.down.sql | 1 + ...0230928141356_create-new-job-trigger.up.sql | 12 ++++++++++++ backends/postgres/postgres_backend.go | 18 +++++------------- 3 files changed, 18 insertions(+), 13 deletions(-) create mode 100644 backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql create mode 100644 backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql new file mode 100644 index 0000000..c28d4b1 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql @@ -0,0 +1 @@ +DROP TRIGGER IF EXISTS announce_job ON neoq_jobs CASCADE; \ No newline at end of file diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql new file mode 100644 index 0000000..865d442 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION announce_job() RETURNS trigger AS $$ +DECLARE +BEGIN + PERFORM pg_notify(CAST(NEW.queue AS text), CAST(NEW.id AS text)); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER announce_job + AFTER INSERT ON neoq_jobs FOR EACH ROW + WHEN (NEW.run_after <= timezone('utc', NEW.created_at)) + EXECUTE PROCEDURE announce_job(); \ No newline at end of file diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 1d76b71..cb1fc22 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -271,15 +271,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e // Rollback is safe to call even if the tx is already closed, so if // the tx commits successfully, this is a no-op defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - - // Make sure RunAfter is set to a non-zero value if not provided by the caller - // if already set, schedule the future job - now := time.Now().UTC() - if job.RunAfter.IsZero() { - p.logger.Debug("RunAfter not set, job will run immediately after being enqueued") - job.RunAfter = now - } - jobID, err = p.enqueueJob(ctx, tx, job) if err != nil { var pgErr *pgconn.PgError @@ -300,10 +291,8 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } p.logger.Debug("job added to queue:", "job_id", jobID) - // notify listeners that a new job has arrived if it's not a future job - if job.RunAfter.Equal(now) { - p.announceJob(ctx, job.Queue, jobID) - } else { + // add future jobs to the future job list + if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() p.futureJobs[jobID] = job.RunAfter p.mu.Unlock() @@ -607,6 +596,9 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { // announceJob announces jobs to queue listeners. // +// When jobs are inserted into the neoq_jobs table, a trigger announces the new job's arrival. This function is to be +// used for announcing jobs that have not been recently inserted into the neoq_jobs table. +// // Announced jobs are executed by the first worker to respond to the announcement. func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { conn, err := p.pool.Acquire(ctx) From 820370e06dbe4533cc0bad216fe2c0c4666dda87 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 29 Sep 2023 11:14:16 -0600 Subject: [PATCH 4/4] feat: Default to disable synchronous postgres commit --- backends/postgres/postgres_backend.go | 22 ++++++++++++++++++++++ neoq.go | 3 ++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index cb1fc22..7889ea5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -149,6 +149,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err // there is no limit to the amount of time a worker's transactions may be idle query = setIdleInTxSessionTimeout } + + if !p.config.SynchronousCommit { + query = fmt.Sprintf("%s; SET synchronous_commit = 'off';", query) + } _, err = conn.Exec(ctx, query) return } @@ -184,6 +188,24 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption { } } +// WithSynchronousCommit enables postgres parameter `synchronous_commit`. +// +// By default, neoq runs with synchronous_commit disabled. +// +// Postgres incurrs significant transactional overhead from synchronously committing small transactions. Because +// neoq jobs must be enqueued individually, and payloads are generally quite small, synchronous_commit introduces +// significant overhead, but increases data durability. +// +// See https://www.postgresql.org/docs/current/wal-async-commit.html for details on the implications that this has for +// neoq jobs. +// +// Enabling synchronous commit results in an order of magnitude slowdown in enqueueing and processing jobs. +func WithSynchronousCommit(enabled bool) neoq.ConfigOption { + return func(c *neoq.Config) { + c.SynchronousCommit = enabled + } +} + // txFromContext gets the transaction from a context, if the transaction is already set func txFromContext(ctx context.Context) (t pgx.Tx, err error) { var ok bool diff --git a/neoq.go b/neoq.go index b79397c..3749506 100644 --- a/neoq.go +++ b/neoq.go @@ -29,13 +29,14 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified") // per-handler basis. type Config struct { BackendInitializer BackendInitializer - BackendAuthPassword string // password with which to authenticate to the backend's data provider + BackendAuthPassword string // password with which to authenticate to the backend BackendConcurrency int // total number of backend processes available to process jobs ConnectionString string // a string containing connection details for the backend JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown + SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance) LogLevel logging.LogLevel // the log level of the default logger }