diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql new file mode 100644 index 0000000..c28d4b1 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.down.sql @@ -0,0 +1 @@ +DROP TRIGGER IF EXISTS announce_job ON neoq_jobs CASCADE; \ No newline at end of file diff --git a/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql new file mode 100644 index 0000000..865d442 --- /dev/null +++ b/backends/postgres/migrations/20230928141356_create-new-job-trigger.up.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE FUNCTION announce_job() RETURNS trigger AS $$ +DECLARE +BEGIN + PERFORM pg_notify(CAST(NEW.queue AS text), CAST(NEW.id AS text)); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER announce_job + AFTER INSERT ON neoq_jobs FOR EACH ROW + WHEN (NEW.run_after <= timezone('utc', NEW.created_at)) + EXECUTE PROCEDURE announce_job(); \ No newline at end of file diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 1d76b71..7889ea5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -149,6 +149,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err // there is no limit to the amount of time a worker's transactions may be idle query = setIdleInTxSessionTimeout } + + if !p.config.SynchronousCommit { + query = fmt.Sprintf("%s; SET synchronous_commit = 'off';", query) + } _, err = conn.Exec(ctx, query) return } @@ -184,6 +188,24 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption { } } +// WithSynchronousCommit enables postgres parameter `synchronous_commit`. +// +// By default, neoq runs with synchronous_commit disabled. +// +// Postgres incurrs significant transactional overhead from synchronously committing small transactions. Because +// neoq jobs must be enqueued individually, and payloads are generally quite small, synchronous_commit introduces +// significant overhead, but increases data durability. +// +// See https://www.postgresql.org/docs/current/wal-async-commit.html for details on the implications that this has for +// neoq jobs. +// +// Enabling synchronous commit results in an order of magnitude slowdown in enqueueing and processing jobs. +func WithSynchronousCommit(enabled bool) neoq.ConfigOption { + return func(c *neoq.Config) { + c.SynchronousCommit = enabled + } +} + // txFromContext gets the transaction from a context, if the transaction is already set func txFromContext(ctx context.Context) (t pgx.Tx, err error) { var ok bool @@ -271,15 +293,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e // Rollback is safe to call even if the tx is already closed, so if // the tx commits successfully, this is a no-op defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed - - // Make sure RunAfter is set to a non-zero value if not provided by the caller - // if already set, schedule the future job - now := time.Now().UTC() - if job.RunAfter.IsZero() { - p.logger.Debug("RunAfter not set, job will run immediately after being enqueued") - job.RunAfter = now - } - jobID, err = p.enqueueJob(ctx, tx, job) if err != nil { var pgErr *pgconn.PgError @@ -300,10 +313,8 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } p.logger.Debug("job added to queue:", "job_id", jobID) - // notify listeners that a new job has arrived if it's not a future job - if job.RunAfter.Equal(now) { - p.announceJob(ctx, job.Queue, jobID) - } else { + // add future jobs to the future job list + if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() p.futureJobs[jobID] = job.RunAfter p.mu.Unlock() @@ -607,6 +618,9 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { // announceJob announces jobs to queue listeners. // +// When jobs are inserted into the neoq_jobs table, a trigger announces the new job's arrival. This function is to be +// used for announcing jobs that have not been recently inserted into the neoq_jobs table. +// // Announced jobs are executed by the first worker to respond to the announcement. func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { conn, err := p.pool.Acquire(ctx) diff --git a/neoq.go b/neoq.go index b79397c..3749506 100644 --- a/neoq.go +++ b/neoq.go @@ -29,13 +29,14 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified") // per-handler basis. type Config struct { BackendInitializer BackendInitializer - BackendAuthPassword string // password with which to authenticate to the backend's data provider + BackendAuthPassword string // password with which to authenticate to the backend BackendConcurrency int // total number of backend processes available to process jobs ConnectionString string // a string containing connection details for the backend JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown + SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance) LogLevel logging.LogLevel // the log level of the default logger }