From 820370e06dbe4533cc0bad216fe2c0c4666dda87 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 29 Sep 2023 11:14:16 -0600 Subject: [PATCH] feat: Default to disable synchronous postgres commit --- backends/postgres/postgres_backend.go | 22 ++++++++++++++++++++++ neoq.go | 3 ++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index cb1fc22..7889ea5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -149,6 +149,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err // there is no limit to the amount of time a worker's transactions may be idle query = setIdleInTxSessionTimeout } + + if !p.config.SynchronousCommit { + query = fmt.Sprintf("%s; SET synchronous_commit = 'off';", query) + } _, err = conn.Exec(ctx, query) return } @@ -184,6 +188,24 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption { } } +// WithSynchronousCommit enables postgres parameter `synchronous_commit`. +// +// By default, neoq runs with synchronous_commit disabled. +// +// Postgres incurrs significant transactional overhead from synchronously committing small transactions. Because +// neoq jobs must be enqueued individually, and payloads are generally quite small, synchronous_commit introduces +// significant overhead, but increases data durability. +// +// See https://www.postgresql.org/docs/current/wal-async-commit.html for details on the implications that this has for +// neoq jobs. +// +// Enabling synchronous commit results in an order of magnitude slowdown in enqueueing and processing jobs. +func WithSynchronousCommit(enabled bool) neoq.ConfigOption { + return func(c *neoq.Config) { + c.SynchronousCommit = enabled + } +} + // txFromContext gets the transaction from a context, if the transaction is already set func txFromContext(ctx context.Context) (t pgx.Tx, err error) { var ok bool diff --git a/neoq.go b/neoq.go index b79397c..3749506 100644 --- a/neoq.go +++ b/neoq.go @@ -29,13 +29,14 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified") // per-handler basis. type Config struct { BackendInitializer BackendInitializer - BackendAuthPassword string // password with which to authenticate to the backend's data provider + BackendAuthPassword string // password with which to authenticate to the backend BackendConcurrency int // total number of backend processes available to process jobs ConnectionString string // a string containing connection details for the backend JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown + SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance) LogLevel logging.LogLevel // the log level of the default logger }