Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development #84

Merged
merged 5 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TRIGGER IF EXISTS announce_job ON neoq_jobs CASCADE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE OR REPLACE FUNCTION announce_job() RETURNS trigger AS $$
DECLARE
BEGIN
PERFORM pg_notify(CAST(NEW.queue AS text), CAST(NEW.id AS text));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER announce_job
AFTER INSERT ON neoq_jobs FOR EACH ROW
WHEN (NEW.run_after <= timezone('utc', NEW.created_at))
EXECUTE PROCEDURE announce_job();
40 changes: 27 additions & 13 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
// there is no limit to the amount of time a worker's transactions may be idle
query = setIdleInTxSessionTimeout
}

if !p.config.SynchronousCommit {
query = fmt.Sprintf("%s; SET synchronous_commit = 'off';", query)
}
_, err = conn.Exec(ctx, query)
return
}
Expand Down Expand Up @@ -184,6 +188,24 @@ func WithTransactionTimeout(txTimeout int) neoq.ConfigOption {
}
}

// WithSynchronousCommit enables postgres parameter `synchronous_commit`.
//
// By default, neoq runs with synchronous_commit disabled.
//
// Postgres incurrs significant transactional overhead from synchronously committing small transactions. Because
// neoq jobs must be enqueued individually, and payloads are generally quite small, synchronous_commit introduces
// significant overhead, but increases data durability.
//
// See https://www.postgresql.org/docs/current/wal-async-commit.html for details on the implications that this has for
// neoq jobs.
//
// Enabling synchronous commit results in an order of magnitude slowdown in enqueueing and processing jobs.
func WithSynchronousCommit(enabled bool) neoq.ConfigOption {
return func(c *neoq.Config) {
c.SynchronousCommit = enabled
}
}

// txFromContext gets the transaction from a context, if the transaction is already set
func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
var ok bool
Expand Down Expand Up @@ -271,15 +293,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
// Rollback is safe to call even if the tx is already closed, so if
// the tx commits successfully, this is a no-op
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

// Make sure RunAfter is set to a non-zero value if not provided by the caller
// if already set, schedule the future job
now := time.Now().UTC()
if job.RunAfter.IsZero() {
p.logger.Debug("RunAfter not set, job will run immediately after being enqueued")
job.RunAfter = now
}

jobID, err = p.enqueueJob(ctx, tx, job)
if err != nil {
var pgErr *pgconn.PgError
Expand All @@ -300,10 +313,8 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}
p.logger.Debug("job added to queue:", "job_id", jobID)

// notify listeners that a new job has arrived if it's not a future job
if job.RunAfter.Equal(now) {
p.announceJob(ctx, job.Queue, jobID)
} else {
// add future jobs to the future job list
if job.RunAfter.After(time.Now().UTC()) {
p.mu.Lock()
p.futureJobs[jobID] = job.RunAfter
p.mu.Unlock()
Expand Down Expand Up @@ -607,6 +618,9 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {

// announceJob announces jobs to queue listeners.
//
// When jobs are inserted into the neoq_jobs table, a trigger announces the new job's arrival. This function is to be
// used for announcing jobs that have not been recently inserted into the neoq_jobs table.
//
// Announced jobs are executed by the first worker to respond to the announcement.
func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
conn, err := p.pool.Acquire(ctx)
Expand Down
3 changes: 2 additions & 1 deletion neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified")
// per-handler basis.
type Config struct {
BackendInitializer BackendInitializer
BackendAuthPassword string // password with which to authenticate to the backend's data provider
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs
IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
}

Expand Down