Skip to content

Commit

Permalink
Make handler context vars backend agnostic (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro committed Mar 17, 2023
1 parent 784ea57 commit 07a336a
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 73 deletions.
11 changes: 7 additions & 4 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memory

import (
"context"
"errors"
"fmt"
"os"
"sync"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/guregu/null"
"github.com/iancoleman/strcase" // TODO factor out
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" // TODO factor out
"github.com/pkg/errors"
"github.com/robfig/cron"
"golang.org/x/exp/slog"
)
Expand Down Expand Up @@ -226,6 +226,10 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
}

if err != nil {
if errors.Is(err, context.Canceled) {
return
}

m.logger.Error("job failed", err, "job_id", job.ID)
runAfter := internal.CalculateBackoff(job.Retries)
job.RunAfter = runAfter
Expand Down Expand Up @@ -277,16 +281,15 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context, queue string) {
}

func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) {
ctxv := handler.CtxVars{Job: job}
hctx := handler.WithContext(ctx, ctxv)
ctx = handler.WithJobContext(ctx, job)

// check if the job is being retried and increment retry count accordingly
if job.Status != internal.JobStatusNew {
job.Retries++
}

// execute the queue handler of this job
err = handler.Exec(hctx, h)
err = handler.Exec(ctx, h)
if err != nil {
job.Error = null.StringFrom(err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestBasicJobProcessing(t *testing.T) {
for {
select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job(s)")
err = jobs.ErrJobTimeout
case <-done:
doneCnt++
}
Expand Down
30 changes: 17 additions & 13 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ const (
setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0`
)

type contextKey struct{}

var (
txCtxVarKey contextKey
ErrCnxString = errors.New("invalid connecton string: see documentation for valid connection strings")
ErrDuplicateJobID = errors.New("duplicate job id")
ErrNoQueue = errors.New("no queue specified")
Expand Down Expand Up @@ -168,16 +171,14 @@ func WithTransactionTimeout(txTimeout int) config.Option {

// txFromContext gets the transaction from a context, if the transaction is already set
func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
if v, ok := ctx.Value(handler.CtxVarsKey).(handler.CtxVars); ok {
var tx pgx.Tx
var ok bool
if tx, ok = v.Tx.(pgx.Tx); !ok {
return nil, ErrNoTransactionInContext
}
return tx, nil
var ok bool
if t, ok = ctx.Value(txCtxVarKey).(pgx.Tx); ok {
return
}

return nil, ErrNoTransactionInContext
err = ErrNoTransactionInContext

return
}

// initializeDB initializes the tables, types, and indices necessary to operate Neoq
Expand Down Expand Up @@ -538,7 +539,11 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.ID)
}

if err == nil && time.Until(runAfter) > 0 {
if err != nil {
return
}

if time.Until(runAfter) > 0 {
p.mu.Lock()
p.futureJobs[job.ID] = runAfter
p.mu.Unlock()
Expand Down Expand Up @@ -733,6 +738,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed.
// 2. handleJob secondly calls the handler on the job, and finally updates the job's status
func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handler) (err error) {
var job *jobs.Job
var tx pgx.Tx
conn, err := p.pool.Acquire(ctx)
if err != nil {
Expand All @@ -746,15 +752,13 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID int64, h handler.Handle
}
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed

ctxv := handler.CtxVars{Tx: tx}
var job *jobs.Job
job, err = p.getPendingJob(ctx, tx, jobID)
if err != nil {
return
}

ctxv.Job = job
ctx = handler.WithContext(ctx, ctxv)
ctx = handler.WithJobContext(ctx, job)
ctx = context.WithValue(ctx, txCtxVarKey, tx)

// check if the job is being retried and increment retry count accordingly
if job.Status != internal.JobStatusNew {
Expand Down
54 changes: 19 additions & 35 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package postgres_test

import (
"context"
"fmt"
"errors"
"os"
"testing"
"time"
Expand All @@ -12,8 +12,6 @@ import (
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
"github.com/pkg/errors"
"golang.org/x/exp/slog"
)

var errPeriodicTimeout = errors.New("timed out waiting for periodic job")
Expand All @@ -22,9 +20,9 @@ var errPeriodicTimeout = errors.New("timed out waiting for periodic job")
// most basic configuration.
func TestBasicJobProcessing(t *testing.T) {
queue := "testing"
numJobs := 10
doneCnt := 0
done := make(chan bool)
defer close(done)

var timeoutTimer = time.After(5 * time.Second)

var connString = os.Getenv("TEST_DATABASE_URL")
Expand All @@ -50,35 +48,20 @@ func TestBasicJobProcessing(t *testing.T) {
t.Error(err)
}

go func() {
for i := 0; i < numJobs; i++ {
jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": fmt.Sprintf("hello world: %d", i),
},
})
if e != nil || jid == jobs.DuplicateJobID {
slog.Error("job was not enqueued. either it was duplicate or this error caused it:", e)
}
}
}()

for {
select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job(s)")
case <-done:
doneCnt++
}

if doneCnt >= numJobs {
break
}

if err != nil {
break
}
jid, e := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})
if e != nil || jid == jobs.DuplicateJobID {
t.Error(e)
}

select {
case <-timeoutTimer:
err = jobs.ErrJobTimeout
case <-done:
}

if err != nil {
Expand All @@ -87,6 +70,8 @@ func TestBasicJobProcessing(t *testing.T) {
}

func TestCron(t *testing.T) {
done := make(chan bool, 1)
defer close(done)
const cron = "* * * * * *"
var connString = os.Getenv("TEST_DATABASE_URL")
if connString == "" {
Expand All @@ -101,7 +86,6 @@ func TestCron(t *testing.T) {
}
defer nq.Shutdown(ctx)

var done = make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
done <- true
return
Expand Down
55 changes: 55 additions & 0 deletions examples/add_postgres_job/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"context"
"log"
"time"

"github.com/acaloiaro/neoq"
"github.com/acaloiaro/neoq/backends/postgres"
"github.com/acaloiaro/neoq/config"
"github.com/acaloiaro/neoq/handler"
"github.com/acaloiaro/neoq/jobs"
)

func main() {
var done = make(chan bool, 1)
const queue = "foobar"
ctx := context.Background()
nq, err := neoq.New(ctx,
config.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
neoq.WithBackend(postgres.Backend),
)
if err != nil {
log.Fatalf("error initializing postgres backend: %v", err)
}
defer nq.Shutdown(ctx)

h := handler.New(func(ctx context.Context) (err error) {
var j *jobs.Job
time.Sleep(1 * time.Second)
j, err = handler.JobFromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
done <- true
return
})

err = nq.Start(ctx, queue, h)
if err != nil {
log.Println("error listening to queue", err)
}

// Add a job that will execute 1 hour from now
jobID, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello, world",
},
})
if err != nil {
log.Fatalf("error adding job: %v", err)
}

log.Println("added job:", jobID)
<-done
}
30 changes: 10 additions & 20 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ const (
type contextKey struct{}

var (
CtxVarsKey contextKey
ErrContextHasNoJob = errors.New("context has no Job")
ErrNoHandlerForQueue = errors.New("no handler for queue")
// TODO this error is here because cyclic imports with neoq
JobCtxVarKey contextKey
ErrContextHasNoJob = errors.New("context has no Job")
ErrNoHandlerForQueue = errors.New("no handler for queue")
ErrNoProcessorForQueue = errors.New("no processor configured for queue")
)

Expand All @@ -35,16 +34,6 @@ type Handler struct {
QueueCapacity int64
}

// CtxVars are variables passed to every Handler context
type CtxVars struct {
Job *jobs.Job
// this is a bit hacky. Tx here contains a pgx.Tx for PgBackend, but because we're in the handlers package, and we don't
// want all neoq users to have pgx as a transitive dependency, we store Tx as any, and coerce it to a pgx.Tx inside
// the postgres backend
// TODO redesign HandlerCtxVars so it doesn't need to include a pgx.Tx
Tx any
}

// Option is function that sets optional configuration for Handlers
type Option func(w *Handler)

Expand Down Expand Up @@ -101,9 +90,9 @@ func New(f Func, opts ...Option) (h Handler) {
return
}

// WithContext creates a new context with the job and transaction set
func WithContext(ctx context.Context, v CtxVars) context.Context {
return context.WithValue(ctx, CtxVarsKey, v)
// WithJobContext creates a new context with the Job set
func WithJobContext(ctx context.Context, j *jobs.Job) context.Context {
return context.WithValue(ctx, JobCtxVarKey, j)
}

// Exec executes handler functions with a concrete time deadline
Expand Down Expand Up @@ -140,9 +129,10 @@ func Exec(ctx context.Context, handler Handler) (err error) {
}

// JobFromContext fetches the job from a context if the job context variable is already set
func JobFromContext(ctx context.Context) (*jobs.Job, error) {
if v, ok := ctx.Value(CtxVarsKey).(CtxVars); ok {
return v.Job, nil
func JobFromContext(ctx context.Context) (j *jobs.Job, err error) {
var ok bool
if j, ok = ctx.Value(JobCtxVarKey).(*jobs.Job); ok {
return
}

return nil, ErrContextHasNoJob
Expand Down
5 changes: 5 additions & 0 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ package jobs
import (
"crypto/md5" // nolint: gosec
"encoding/json"
"errors"
"fmt"
"io"
"time"

"github.com/guregu/null"
)

var (
ErrJobTimeout = errors.New("timed out waiting for job(s)")
)

const (
DuplicateJobID = -1
UnqueuedJobID = -2
Expand Down

0 comments on commit 07a336a

Please sign in to comment.