Skip to content

Commit

Permalink
feat: Hold connections and transactions for less time
Browse files Browse the repository at this point in the history
Previously pg backend would hold connections/transactions during the
execution of job handlers. So, e.g. if a job ran for 30 seconds, a
transaction and its connection would be held for the entirety of those
30 seconds.

The reason it was originally implemented this way is because originally
the vision was to pass every job a `tx` that could be used throughout
the job. If the job failed, its `tx` would be rolled back in neoq
so that user would not have to handle rollbacks and connection handling
themselves, were they to perform database operations in their jobs.
But ultimately, that is a lot of hand-holding at the cost of a lot of
resources, for a use case that is both unlikely and does not work as
soon as the user's application and neoq tables are in different
databases.

This commit reverses that poor decision and opts to improve performance
over giving users an ergonomic transaction handling API.
  • Loading branch information
acaloiaro committed Sep 2, 2023
1 parent 6ea1bb6 commit bc8df98
Showing 1 changed file with 39 additions and 55 deletions.
94 changes: 39 additions & 55 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ const (
setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0`
)

type contextKey struct{}

var (
txCtxVarKey contextKey
shutdownJobID = "-1" // job ID announced when triggering a shutdown
shutdownAnnouncementAllowance = 100 // ms
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
Expand Down Expand Up @@ -184,18 +181,6 @@ func WithTransactionTimeout(txTimeout int) config.Option {
}
}

// txFromContext gets the transaction from a context, if the transaction is already set
func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
var ok bool
if t, ok = ctx.Value(txCtxVarKey).(pgx.Tx); ok {
return
}

err = ErrNoTransactionInContext

return
}

// initializeDB initializes the tables, types, and indices necessary to operate Neoq
//
//nolint:funlen,gocyclo,cyclop
Expand Down Expand Up @@ -453,6 +438,7 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job,
// processed at least one more time.
// nolint: cyclop
func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
var tx pgx.Tx
status := internal.JobStatusProcessed
errMsg := ""

Expand All @@ -467,13 +453,22 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
return fmt.Errorf("error getting job from context: %w", err)
}

var tx pgx.Tx
if tx, err = txFromContext(ctx); err != nil {
return fmt.Errorf("error getting tx from context: %w", err)
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to update job", "error", err)
return
}
defer conn.Release()

tx, err = conn.Begin(ctx)
if err != nil {
return
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

if job.Retries >= job.MaxRetries {
err = p.moveToDeadQueue(ctx, tx, job, jobErr)
err = tx.Commit(ctx)
return
}

Expand All @@ -486,11 +481,17 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4"
_, err = tx.Exec(ctx, qstr, time.Now().UTC(), errMsg, status, job.ID)
}

if err != nil {
return
}

err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "error", err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
}

if time.Until(runAfter) > 0 {
p.mu.Lock()
p.futureJobs[fmt.Sprint(job.ID)] = runAfter
Expand Down Expand Up @@ -642,18 +643,9 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {

func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) {
jobsCh = make(chan string)

conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", err)
return
}

go func(ctx context.Context) {
defer conn.Release()

for {
jobID, err := p.getPendingJobID(ctx, conn, queue)
jobID, err := p.getPendingJobID(ctx, queue)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) || errors.Is(err, context.Canceled) {
break
Expand All @@ -675,23 +667,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// 2. handleJob secondly calls the handler on the job, and finally updates the job's status
func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) {
var job *jobs.Job
var tx pgx.Tx
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
}
defer conn.Release()

tx, err = conn.Begin(ctx)
job, err = p.getPendingJob(ctx, jobID)
if err != nil {
return
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

job, err = p.getPendingJob(ctx, tx, jobID)
if err != nil {
return
}
ctx = withJobContext(ctx, job)

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
Expand All @@ -700,9 +681,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
return
}

ctx = withJobContext(ctx, job)
ctx = context.WithValue(ctx, txCtxVarKey, tx)

// check if the job is being retried and increment retry count accordingly
if job.Status != internal.JobStatusNew {
job.Retries++
Expand All @@ -720,13 +698,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
return err
}

err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "error", err, "job_id", job.ID)
return fmt.Errorf("%s %w", errMsg, err)
}

return nil
}

Expand Down Expand Up @@ -794,8 +765,14 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
conn.Release()
}

func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) {
row, err := tx.Query(ctx, PendingJobQuery, jobID)
func (p *PgBackend) getPendingJob(ctx context.Context, jobID string) (job *jobs.Job, err error) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
return
}
defer conn.Release()

row, err := conn.Query(ctx, PendingJobQuery, jobID)
if err != nil {
return
}
Expand All @@ -808,7 +785,14 @@ func (p *PgBackend) getPendingJob(ctx context.Context, tx pgx.Tx, jobID string)
return
}

func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, queue string) (jobID string, err error) {
func (p *PgBackend) getPendingJobID(ctx context.Context, queue string) (jobID string, err error) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", err)
return
}
defer conn.Release()

err = conn.QueryRow(ctx, PendingJobIDQuery, queue).Scan(&jobID)
return
}
Expand Down

0 comments on commit bc8df98

Please sign in to comment.