Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for recovery callbacks #127

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity = defaultMemQueueCapacity
}

h.RecoverCallback = m.config.RecoveryCallback

m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

Expand Down Expand Up @@ -173,6 +175,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue
h.RecoverCallback = m.config.RecoveryCallback

err = m.Start(ctx, h)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
p.logger.Debug("starting job processing", slog.String("queue", h.Queue))
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
h.RecoverCallback = p.config.RecoveryCallback
p.handlers[h.Queue] = h
p.mu.Unlock()

Expand Down Expand Up @@ -475,6 +476,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
h.Queue = queue
h.RecoverCallback = p.config.RecoveryCallback

ctx, cancel := context.WithCancel(ctx)
p.mu.Lock()
Expand Down
58 changes: 58 additions & 0 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,3 +1074,61 @@ func TestJobWithPastDeadline(t *testing.T) {
t.Errorf("job should have resulted in a status of 'failed', but its status is %s", status)
}
}

func TestHandlerRecoveryCallback(t *testing.T) {
connString, _ := prepareAndCleanupDB(t)
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
recoveryFuncCalled := make(chan bool, 1)
defer close(recoveryFuncCalled)

ctx := context.Background()
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
recoveryFuncCalled <- true
return
}))

if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(ctx context.Context) (err error) {
panic("abort mission!")
})
h.WithOptions(
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

// process jobs on the test queue
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": "hello world",
},
})
if err != nil || jid == jobs.DuplicateJobID {
t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err)
}

select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job") // nolint: goerr113
return
case <-recoveryFuncCalled:
break
}

if err != nil {
t.Error(err)
}
}
3 changes: 3 additions & 0 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string

// Start starts processing jobs with the specified queue and handler
func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) {
h.RecoverCallback = b.config.RecoveryCallback

b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) {
taskID := t.ResultWriter().TaskID()
var p map[string]any
Expand Down Expand Up @@ -268,6 +270,7 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
h.Queue = queue
h.RecoverCallback = b.config.RecoveryCallback

err = b.Start(ctx, h)
if err != nil {
Expand Down
67 changes: 67 additions & 0 deletions backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,70 @@ result_loop:
t.Error(err)
}
}

func TestHandlerRecoveryCallback(t *testing.T) {
const queue = "testing"
timeoutTimer := time.After(5 * time.Second)
recoveryFuncCalled := make(chan bool, 1)
defer close(recoveryFuncCalled)
ctx := context.Background()

connString := os.Getenv("TEST_REDIS_URL")
if connString == "" {
t.Skip("Skipping: TEST_REDIS_URL not set")
return
}

password := os.Getenv("REDIS_PASSWORD")
nq, err := neoq.New(
ctx,
neoq.WithBackend(Backend),
neoq.WithLogLevel(logging.LogLevelDebug),
WithAddr(connString),
WithPassword(password),
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
recoveryFuncCalled <- true
return
}),
)
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(queue, func(ctx context.Context) (err error) {
panic("abort mission!")
})
h.WithOptions(
handler.JobTimeout(500*time.Millisecond),
handler.Concurrency(1),
)

// process jobs on the test queue
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

jid, err := nq.Enqueue(ctx, &jobs.Job{
Queue: queue,
Payload: map[string]interface{}{
"message": fmt.Sprintf("hello world %d", internal.RandInt(10000000)),
},
})
if err != nil || jid == jobs.DuplicateJobID {
t.Fatal("job was not enqueued. either it was duplicate or this error caused it:", err)
}

select {
case <-timeoutTimer:
err = errors.New("timed out waiting for job") // nolint: goerr113
return
case <-recoveryFuncCalled:
break
}

if err != nil {
t.Error(err)
}
}
8 changes: 4 additions & 4 deletions gomod2nix.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ schema = 3
version = "v0.0.0-20221227161230-091c0ba34f0a"
hash = "sha256-rBtUw15WPPDp2eulHXH5e2zCIed1OPFYwlCpgDOnGRM="
[mod."github.com/jackc/pgx/v5"]
version = "v5.3.1"
hash = "sha256-0v6gXZIirv80mlnUx3ycxB2/TLvv3rUnm98Ke1ZjYDQ="
version = "v5.5.4"
hash = "sha256-T4nYUbDDiyN7v6BRhEkPJ9slatzUMrEyoGAyjfK9syI="
[mod."github.com/jackc/puddle/v2"]
version = "v2.2.0"
hash = "sha256-S9Ldac+a4auQt99hToXZ/WSuUhcEk/A5aDgQAb48B8M="
version = "v2.2.1"
hash = "sha256-Edf8SLT/8l+xfHm9IjUGxs1MHtic2VgRyfqb6OzGA9k="
[mod."github.com/jsuar/go-cron-descriptor"]
version = "v0.1.0"
hash = "sha256-zbADYCEzVcOlvemQa+Ly+6mRcCu3qsFxyeTd9jzZj38="
Expand Down
69 changes: 49 additions & 20 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"log"
"runtime"
"runtime/debug"
"strings"
"time"

"golang.org/x/exp/slog"
)

const (
Expand All @@ -24,13 +25,23 @@ var (
// Func is a function that Handlers execute for every Job on a queue
type Func func(ctx context.Context) error

// RecoveryCallback is a function to be called when fatal errors/panics occur in Handlers
type RecoveryCallback func(ctx context.Context, err error) (erro error)

// DefaultRecoveryCallback is the function that gets called by default when handlers panic
func DefaultRecoveryCallback(_ context.Context, _ error) (err error) {
slog.Error("recovering from a panic in the job handler", slog.Any("stack", string(debug.Stack())))
return nil
}

// Handler handles jobs on a queue
type Handler struct {
Handle Func
Concurrency int
JobTimeout time.Duration
QueueCapacity int64
Queue string
Handle Func
Concurrency int
JobTimeout time.Duration
QueueCapacity int64
Queue string
RecoverCallback RecoveryCallback // function called when fatal handler errors occur
}

// Option is function that sets optional configuration for Handlers
Expand Down Expand Up @@ -75,6 +86,13 @@ func Queue(queue string) Option {
}
}

// RecoverCallback configures the handler with a recovery function to be called when fatal errors occur in Handlers
func RecoverCallback(f RecoveryCallback) Option {
return func(h *Handler) {
h.RecoverCallback = f
}
}

// New creates new queue handlers for specific queues. This function is to be usued to create new Handlers for
// non-periodic jobs (most jobs). Use [NewPeriodic] to initialize handlers for periodic jobs.
func New(queue string, f Func, opts ...Option) (h Handler) {
Expand Down Expand Up @@ -104,6 +122,24 @@ func NewPeriodic(f Func, opts ...Option) (h Handler) {
return
}

func errorFromPanic(x any) (err error) {
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) //nolint: gomnd
}

// Include the file and line number info in the error, if runtime.Caller returned ok.
if ok {
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113
} else {
err = fmt.Errorf("panic: %v", x) // nolint: goerr113
}

return
}

// Exec executes handler functions with a concrete timeout
func Exec(ctx context.Context, handler Handler) (err error) {
timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout)
Expand All @@ -115,22 +151,15 @@ func Exec(ctx context.Context, handler Handler) (err error) {
go func(ctx context.Context) {
defer func() {
if x := recover(); x != nil {
log.Printf("recovering from a panic in the job handler:\n%s", string(debug.Stack()))
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) //nolint: gomnd
}

// Include the file and line number info in the error, if runtime.Caller returned ok.
if ok {
errCh <- fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113
} else {
errCh <- fmt.Errorf("panic: %v", x) // nolint: goerr113
err = errorFromPanic(x)
errCh <- err
if handler.RecoverCallback != nil {
err = handler.RecoverCallback(ctx, err)
if err != nil {
slog.Error("handler recovery callback also failed while recovering from panic", slog.Any("error", err))
}
}
}

done <- true
}()

Expand Down
31 changes: 21 additions & 10 deletions neoq.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ var ErrBackendNotSpecified = errors.New("a backend must be specified")
// per-handler basis.
type Config struct {
BackendInitializer BackendInitializer
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that goroutines schedule for future jobs
IdleTransactionTimeout int // the number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
BackendAuthPassword string // password with which to authenticate to the backend
BackendConcurrency int // total number of backend processes available to process jobs
ConnectionString string // a string containing connection details for the backend
JobCheckInterval time.Duration // the interval of time between checking for new future/retry jobs
FutureJobWindow time.Duration // time duration between current time and job.RunAfter that future jobs get scheduled
IdleTransactionTimeout int // number of milliseconds PgBackend transaction may idle before the connection is killed
ShutdownTimeout time.Duration // duration to wait for jobs to finish during shutdown
SynchronousCommit bool // Postgres: Enable synchronous commits (increases durability, decreases performance)
LogLevel logging.LogLevel // the log level of the default logger
PGConnectionTimeout time.Duration // the amount of time to wait for a connection to become available before timing out
RecoveryCallback handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
}

// ConfigOption is a function that sets optional backend configuration
Expand All @@ -49,6 +50,7 @@ func NewConfig() *Config {
return &Config{
FutureJobWindow: DefaultFutureJobWindow,
JobCheckInterval: DefaultJobCheckInterval,
RecoveryCallback: handler.DefaultRecoveryCallback,
}
}

Expand Down Expand Up @@ -117,6 +119,15 @@ func WithBackend(initializer BackendInitializer) ConfigOption {
}
}

// WithRecoveryCallback configures neoq with a function to be called when fatal errors occur in job Handlers.
//
// Recovery callbacks are useful for reporting errors to error loggers and collecting error metrics
func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption {
return func(c *Config) {
c.RecoveryCallback = cb
}
}

// WithJobCheckInterval configures the duration of time between checking for future jobs
func WithJobCheckInterval(interval time.Duration) ConfigOption {
return func(c *Config) {
Expand Down
Loading
Loading