From 0105403be94df2809804fe99302ccf2f0859f202 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Tue, 16 Apr 2024 13:39:15 -0600 Subject: [PATCH] feat: add support for recovery callbacks --- backends/memory/memory_backend.go | 3 + backends/postgres/postgres_backend.go | 2 + backends/postgres/postgres_backend_test.go | 58 ++++++++++++++++++ backends/redis/redis_backend.go | 3 + backends/redis/redis_backend_test.go | 67 +++++++++++++++++++++ gomod2nix.toml | 8 +-- handler/handler.go | 69 +++++++++++++++------- neoq.go | 31 ++++++---- neoq_test.go | 54 +++++++++++++++++ 9 files changed, 261 insertions(+), 34 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 0528cb1..3a074db 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -135,6 +135,8 @@ func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) { queueCapacity = defaultMemQueueCapacity } + h.RecoverCallback = m.config.RecoveryCallback + m.handlers.Store(h.Queue, h) m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity)) @@ -173,6 +175,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H m.cancelFuncs = append(m.cancelFuncs, cancel) m.mu.Unlock() h.Queue = queue + h.RecoverCallback = m.config.RecoveryCallback err = m.Start(ctx, h) if err != nil { diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 36774dd..c8bdbc5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -434,6 +434,7 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { p.logger.Debug("starting job processing", slog.String("queue", h.Queue)) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) + h.RecoverCallback = p.config.RecoveryCallback p.handlers[h.Queue] = h p.mu.Unlock() @@ -475,6 +476,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) h.Queue = queue + h.RecoverCallback = p.config.RecoveryCallback ctx, cancel := context.WithCancel(ctx) p.mu.Lock() diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 9a8ee48..2198f16 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -1074,3 +1074,61 @@ func TestJobWithPastDeadline(t *testing.T) { t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status) } } + +func TestHandlerRecoveryCallback(t *testing.T) { + connString, _ := prepareAndCleanupDB(t) + const queue = "testing" + timeoutTimer := time.After(5 * time.Second) + recoveryFuncCalled := make(chan bool, 1) + defer close(recoveryFuncCalled) + + ctx := context.Background() + nq, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) { + recoveryFuncCalled <- true + return + })) + + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(ctx context.Context) (err error) { + panic("abort mission!") + }) + h.WithOptions( + handler.JobTimeout(500*time.Millisecond), + handler.Concurrency(1), + ) + + // process jobs on the test queue + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + jid, err := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + if err != nil || jid == jobs.DuplicateJobID { + t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err) + } + + select { + case <-timeoutTimer: + err = errors.New("timed out waiting for job") // nolint: goerr113 + return + case <-recoveryFuncCalled: + break + } + + if err != nil { + t.Error(err) + } +} diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index 2a45c56..a463673 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -206,6 +206,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string // Start starts processing jobs with the specified queue and handler func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { + h.RecoverCallback = b.config.RecoveryCallback + b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) { taskID := t.ResultWriter().TaskID() var p map[string]any @@ -268,6 +270,7 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) h.Queue = queue + h.RecoverCallback = b.config.RecoveryCallback err = b.Start(ctx, h) if err != nil { diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index aefdb9c..c069ee6 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -441,3 +441,70 @@ result_loop: t.Error(err) } } + +func TestHandlerRecoveryCallback(t *testing.T) { + const queue = "testing" + timeoutTimer := time.After(5 * time.Second) + recoveryFuncCalled := make(chan bool, 1) + defer close(recoveryFuncCalled) + ctx := context.Background() + + connString := os.Getenv("TEST_REDIS_URL") + if connString == "" { + t.Skip("Skipping: TEST_REDIS_URL not set") + return + } + + password := os.Getenv("REDIS_PASSWORD") + nq, err := neoq.New( + ctx, + neoq.WithBackend(Backend), + neoq.WithLogLevel(logging.LogLevelDebug), + WithAddr(connString), + WithPassword(password), + neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) { + recoveryFuncCalled <- true + return + }), + ) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(ctx context.Context) (err error) { + panic("abort mission!") + }) + h.WithOptions( + handler.JobTimeout(500*time.Millisecond), + handler.Concurrency(1), + ) + + // process jobs on the test queue + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + jid, err := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": fmt.Sprintf("hello world %d", internal.RandInt(10000000)), + }, + }) + if err != nil || jid == jobs.DuplicateJobID { + t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err) + } + + select { + case <-timeoutTimer: + err = errors.New("timed out waiting for job") // nolint: goerr113 + return + case <-recoveryFuncCalled: + break + } + + if err != nil { + t.Error(err) + } +} diff --git a/gomod2nix.toml b/gomod2nix.toml index f9311e8..b071390 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -44,11 +44,11 @@ schema = 3 version = "v0.0.0-20221227161230-091c0ba34f0a" hash = "sha256-rBtUw15WPPDp2eulHXH5e2zCIed1OPFYwlCpgDOnGRM=" [mod."github.com/jackc/pgx/v5"] - version = "v5.3.1" - hash = "sha256-0v6gXZIirv80mlnUx3ycxB2/TLvv3rUnm98Ke1ZjYDQ=" + version = "v5.5.4" + hash = "sha256-T4nYUbDDiyN7v6BRhEkPJ9slatzUMrEyoGAyjfK9syI=" [mod."github.com/jackc/puddle/v2"] - version = "v2.2.0" - hash = "sha256-S9Ldac+a4auQt99hToXZ/WSuUhcEk/A5aDgQAb48B8M=" + version = "v2.2.1" + hash = "sha256-Edf8SLT/8l+xfHm9IjUGxs1MHtic2VgRyfqb6OzGA9k=" [mod."github.com/jsuar/go-cron-descriptor"] version = "v0.1.0" hash = "sha256-zbADYCEzVcOlvemQa+Ly+6mRcCu3qsFxyeTd9jzZj38=" diff --git a/handler/handler.go b/handler/handler.go index 9e76ab2..d15c0b8 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" - "log" "runtime" "runtime/debug" "strings" "time" + + "golang.org/x/exp/slog" ) const ( @@ -24,13 +25,23 @@ var ( // Func is a function that Handlers execute for every Job on a queue type Func func(ctx context.Context) error +// RecoveryCallback is a function to be called when fatal errors/panics occur in Handlers +type RecoveryCallback func(ctx context.Context, err error) (erro error) + +// DefaultRecoveryCallback is the function that gets called by default when handlers panic +func DefaultRecoveryCallback(_ context.Context, _ error) (err error) { + slog.Error("recovering from a panic in the job handler", slog.Any("stack", string(debug.Stack()))) + return nil +} + // Handler handles jobs on a queue type Handler struct { - Handle Func - Concurrency int - JobTimeout time.Duration - QueueCapacity int64 - Queue string + Handle Func + Concurrency int + JobTimeout time.Duration + QueueCapacity int64 + Queue string + RecoverCallback RecoveryCallback // function called when fatal handler errors occur } // Option is function that sets optional configuration for Handlers @@ -75,6 +86,13 @@ func Queue(queue string) Option { } } +// RecoverCallback configures the handler with a recovery function to be called when fatal errors occur in Handlers +func RecoverCallback(f RecoveryCallback) Option { + return func(h *Handler) { + h.RecoverCallback = f + } +} + // New creates new queue handlers for specific queues. This function is to be usued to create new Handlers for // non-periodic jobs (most jobs). Use [NewPeriodic] to initialize handlers for periodic jobs. func New(queue string, f Func, opts ...Option) (h Handler) { @@ -104,6 +122,24 @@ func NewPeriodic(f Func, opts ...Option) (h Handler) { return } +func errorFromPanic(x any) (err error) { + _, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself) + if ok && strings.Contains(file, "runtime/") { + // The panic came from the runtime, most likely due to incorrect + // map/slice usage. The parent frame should have the real trigger. + _, file, line, ok = runtime.Caller(2) //nolint: gomnd + } + + // Include the file and line number info in the error, if runtime.Caller returned ok. + if ok { + err = fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113 + } else { + err = fmt.Errorf("panic: %v", x) // nolint: goerr113 + } + + return +} + // Exec executes handler functions with a concrete timeout func Exec(ctx context.Context, handler Handler) (err error) { timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout) @@ -115,22 +151,15 @@ func Exec(ctx context.Context, handler Handler) (err error) { go func(ctx context.Context) { defer func() { if x := recover(); x != nil { - log.Printf("recovering from a panic in the job handler:\n%s", string(debug.Stack())) - _, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself) - if ok && strings.Contains(file, "runtime/") { - // The panic came from the runtime, most likely due to incorrect - // map/slice usage. The parent frame should have the real trigger. - _, file, line, ok = runtime.Caller(2) //nolint: gomnd - } - - // Include the file and line number info in the error, if runtime.Caller returned ok. - if ok { - errCh <- fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113 - } else { - errCh <- fmt.Errorf("panic: %v", x) // nolint: goerr113 + err = errorFromPanic(x) + errCh <- err + if handler.RecoverCallback != nil { + err = handler.RecoverCallback(ctx, err) + if err != nil { + slog.Error("handler recovery callback also failed while recovering from panic", slog.Any("error", err)) + } } } - done <- true }() diff --git a/neoq.go b/neoq.go index f1022f1..9e65646 100644 --- a/neoq.go +++ b/neoq.go @@ -29,16 +29,17 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified") // per-handler basis. type Config struct { BackendInitializer BackendInitializer - BackendAuthPassword string // password with which to authenticate to the backend - BackendConcurrency int // total number of backend processes available to process jobs - ConnectionString string // a string containing connection details for the backend - JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs - FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs - IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed - ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown - SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance) - LogLevel logging.LogLevel // the log level of the default logger - PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out + BackendAuthPassword string // password with which to authenticate to the backend + BackendConcurrency int // total number of backend processes available to process jobs + ConnectionString string // a string containing connection details for the backend + JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs + FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled + IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed + ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown + SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance) + LogLevel logging.LogLevel // the log level of the default logger + PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out + RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance } // ConfigOption is a function that sets optional backend configuration @@ -49,6 +50,7 @@ func NewConfig() *Config { return &Config{ FutureJobWindow: DefaultFutureJobWindow, JobCheckInterval: DefaultJobCheckInterval, + RecoveryCallback: handler.DefaultRecoveryCallback, } } @@ -117,6 +119,15 @@ func WithBackend(initializer BackendInitializer) ConfigOption { } } +// WithRecoveryCallback configures neoq with a function to be called when fatal errors occur in job Handlers. +// +// Recovery callbacks are useful for reporting errors to error loggers and collecting error metrics +func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption { + return func(c *Config) { + c.RecoveryCallback = cb + } +} + // WithJobCheckInterval configures the duration of time between checking for future jobs func WithJobCheckInterval(interval time.Duration) ConfigOption { return func(c *Config) { diff --git a/neoq_test.go b/neoq_test.go index ebe6e44..e4a8996 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -244,3 +244,57 @@ results_loop: t.Error(err) } } + +func TestHandlerRecoveryCallback(t *testing.T) { + const queue = "testing" + timeoutTimer := time.After(5 * time.Second) + recoveryFuncCalled := make(chan bool, 1) + + ctx := context.Background() + nq, err := neoq.New(ctx, + neoq.WithBackend(memory.Backend), + neoq.WithRecoveryCallback(func(_ context.Context, _ error) (err error) { + recoveryFuncCalled <- true + return + })) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(ctx context.Context) (err error) { + panic("abort mission!") + }) + h.WithOptions( + handler.JobTimeout(500*time.Millisecond), + handler.Concurrency(1), + ) + + // process jobs on the test queue + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + jid, err := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + }) + if err != nil || jid == jobs.DuplicateJobID { + t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err) + } + + select { + case <-timeoutTimer: + err = errors.New("timed out waiting for job") // nolint: goerr113 + return + case <-recoveryFuncCalled: + break + } + + if err != nil { + t.Error(err) + } +}