Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: otel instrumentation #134

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jsuar/go-cron-descriptor/pkg/crondescriptor"
"github.com/robfig/cron"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/exp/slices"
"golang.org/x/exp/slog"
)
Expand All @@ -50,6 +52,11 @@ const (
AND run_after <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 1`
PendingJobCountQuery = `SELECT COUNT(*)
FROM neoq_jobs
WHERE queue = $1
AND status NOT IN ('processed')
AND run_after <= NOW()`
FutureJobQuery = `SELECT id,fingerprint,queue,status,deadline,payload,retries,max_retries,run_after,ran_at,created_at,error
FROM neoq_jobs
WHERE queue = $1
Expand Down Expand Up @@ -96,6 +103,7 @@ type PgBackend struct {
logger logging.Logger // backend-wide logger
mu *sync.RWMutex // protects concurrent access to fields on PgBackend
pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs
metricPack neoq.MetricPack // a collection opentelemetry counters and gauges for telemetry reporting
}

// Backend initializes a new postgres-backed neoq backend
Expand Down Expand Up @@ -139,6 +147,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
mu: &sync.RWMutex{},
listenCancelCh: make(chan context.CancelFunc, 1),
listenConnDown: make(chan bool),
metricPack: neoq.MetricPack{},
}

// Set all options
Expand Down Expand Up @@ -187,6 +196,14 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
}
}

// initialize otel meters for observability
if p.config.OpentelemetryMeterProvider != nil {
err = p.metricPack.Initialize(p.config.OpentelemetryMeterProvider)
if err != nil {
p.logger.Error("unable to initialize open telemetry metrics", slog.Any("error", err))
}
}

// monitor handlers for changes and LISTEN when new queues are added
go p.listenerManager(ctx)

Expand Down Expand Up @@ -458,6 +475,12 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
)
}

if p.metricPack.DepthCounter != nil {
p.metricPack.DepthCounter.Add(ctx, 1, metric.WithAttributes(
attribute.Key("queue").String(job.Queue),
))
}

return jobID, nil
}

Expand Down Expand Up @@ -836,6 +859,20 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
return
}

pendingCount, err := p.countPendingJobs(ctx, conn, queue)
if err != nil {
p.logger.Error("failed to count pending jobs",
slog.String("queue", queue),
slog.Any("error", err),
)
}
p.logger.Debug("pending jobs", slog.Int64("count", pendingCount), slog.String("queue", queue))
if p.metricPack.DepthCounter != nil {
p.metricPack.DepthCounter.Add(ctx, pendingCount, metric.WithAttributes(
attribute.Key("queue").String(queue),
))
}

go func(ctx context.Context) {
defer conn.Release()

Expand Down Expand Up @@ -868,12 +905,31 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
// nolint: cyclop
func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
var job *jobs.Job
var jobErr error
var tx pgx.Tx
conn, err := p.acquire(ctx)
if err != nil {
return
}
defer conn.Release()

defer func() {
conn.Release()
if job == nil {
return
}

telemetryAttrs := metric.WithAttributes(
attribute.Key("queue").String(job.Queue),
)
// the job ended with an error, incrementing the failing counter
if jobErr != nil && p.metricPack.FailureCounter != nil {
p.metricPack.DepthCounter.Add(ctx, 1, telemetryAttrs)
}
// the job ended in success, increment success metrics
if jobErr == nil && p.metricPack.SuccessCounter != nil {
p.metricPack.SuccessCounter.Add(ctx, 1, telemetryAttrs)
}
}()

tx, err = conn.Begin(ctx)
if err != nil {
Expand Down Expand Up @@ -912,7 +968,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) {
job.Retries++
}

var jobErr error
h, ok := p.handlers[job.Queue]
if !ok {
p.logger.Error("received a job for which no handler is configured",
Expand Down Expand Up @@ -1035,6 +1090,11 @@ func (p *PgBackend) getPendingJobID(ctx context.Context, conn *pgxpool.Conn, que
return
}

func (p *PgBackend) countPendingJobs(ctx context.Context, conn *pgxpool.Conn, queue string) (count int64, err error) {
err = conn.QueryRow(ctx, PendingJobCountQuery, queue).Scan(&count)
return
}

// acquire acquires connections from the connection pool with a timeout
//
// the purpose of this function is to skirt pgxpool's default blocking behavior with connection acquisition preemption
Expand Down
2 changes: 1 addition & 1 deletion backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ func TestHandlerRecoveryCallback(t *testing.T) {
nq, err := neoq.New(ctx,
neoq.WithBackend(postgres.Backend),
postgres.WithConnectionString(connString),
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
neoq.WithRecoveryCallback(func(ctx context.Context, _ error, _ string) (err error) {
recoveryFuncCalled <- true
return
}))
Expand Down
2 changes: 1 addition & 1 deletion backends/redis/redis_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestHandlerRecoveryCallback(t *testing.T) {
neoq.WithLogLevel(logging.LogLevelDebug),
WithAddr(connString),
WithPassword(password),
neoq.WithRecoveryCallback(func(ctx context.Context, _ error) (err error) {
neoq.WithRecoveryCallback(func(ctx context.Context, _ error, _ string) (err error) {
recoveryFuncCalled <- true
return
}),
Expand Down
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ require (
github.com/jsuar/go-cron-descriptor v0.1.0
github.com/pkg/errors v0.9.1
github.com/robfig/cron v1.2.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
)

require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-redis/redis/v8 v8.11.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand All @@ -29,12 +34,14 @@ require (
github.com/lib/pq v1.10.2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
31 changes: 23 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-redis/redis/v8 v8.11.2 h1:WqlSpAwz8mxDSMCvbyz1Mkiqe0LE5OY4j3lgkvu1Ts0=
github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down Expand Up @@ -58,12 +63,12 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/guregu/null v4.0.0+incompatible h1:4zw0ckM7ECd6FNNddc3Fu4aty9nTlpkkzH7dPn4/4Gw=
github.com/guregu/null v4.0.0+incompatible/go.mod h1:ePGpQaN9cw0tj45IR5E5ehMvsFlLlQZAkkOXZurJ3NM=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -134,10 +139,20 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08=
go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
Expand Down Expand Up @@ -202,8 +217,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
29 changes: 25 additions & 4 deletions gomod2nix.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ schema = 3
[mod."github.com/dgryski/go-rendezvous"]
version = "v0.0.0-20200823014737-9f7001d12a5f"
hash = "sha256-n/7xo5CQqo4yLaWMSzSN1Muk/oqK6O5dgDOFWapeDUI="
[mod."github.com/go-logr/logr"]
version = "v1.4.2"
hash = "sha256-/W6qGilFlZNTb9Uq48xGZ4IbsVeSwJiAMLw4wiNYHLI="
[mod."github.com/go-logr/stdr"]
version = "v1.2.2"
hash = "sha256-rRweAP7XIb4egtT1f2gkz4sYOu7LDHmcJ5iNsJUd0sE="
[mod."github.com/go-redis/redis/v8"]
version = "v8.11.2"
hash = "sha256-3ltdzFcxgUtm6YlNX9C3vHOU/WzPiu687JXWmY4+h/w="
Expand All @@ -17,8 +23,8 @@ schema = 3
version = "v1.5.2"
hash = "sha256-IVwooaIo46iq7euSSVWTBAdKd+2DUaJ67MtBao1DpBI="
[mod."github.com/google/uuid"]
version = "v1.3.0"
hash = "sha256-QoR55eBtA94T2tBszyxfDtO7/pjZZSGb5vm7U0Xhs0Y="
version = "v1.6.0"
hash = "sha256-VWl9sqUzdOuhW0KzQlv0gwwUQClYkmZwSydHG2sALYw="
[mod."github.com/guregu/null"]
version = "v4.0.0+incompatible"
hash = "sha256-VhFHFjDVv5mnyLGefthH2ViaPsPab4NOSSW7XqzBpfI="
Expand Down Expand Up @@ -67,6 +73,21 @@ schema = 3
[mod."github.com/spf13/cast"]
version = "v1.3.1"
hash = "sha256-wT6IRl4ZFvzeyLlv7l/GJt39shnRRYHzxyDTh9AhaFE="
[mod."go.opentelemetry.io/otel"]
version = "v1.28.0"
hash = "sha256-bilBBr2cuADs9bQ7swnGLTuC7h0DooU6BQtrQqMqIjs="
[mod."go.opentelemetry.io/otel/metric"]
version = "v1.28.0"
hash = "sha256-k3p1lYcvrODwIkZo/j2jvCoDFUelz4yVJEEVdUKUmGU="
[mod."go.opentelemetry.io/otel/sdk"]
version = "v1.28.0"
hash = "sha256-X48fV4A9vgxfjBpmUIQumod2nyI+tUc5lnhFkeKBRsc="
[mod."go.opentelemetry.io/otel/sdk/metric"]
version = "v1.28.0"
hash = "sha256-u9V9HVohaEdJe1f9FiPKZIpUu90hbYK16LYLk4oQPe0="
[mod."go.opentelemetry.io/otel/trace"]
version = "v1.28.0"
hash = "sha256-8uxmlm0/5VGoWegxwy0q8NgeY+pyicSoV08RkvD9Q98="
[mod."go.uber.org/atomic"]
version = "v1.10.0"
hash = "sha256-E6UEDc1eh/cLUFd+J86cDesQ0B8wEv/DdaAVKb+x2t8="
Expand All @@ -86,8 +107,8 @@ schema = 3
version = "v0.2.0"
hash = "sha256-hKk9zsy2aXY7R0qGFZhGOVvk5qD17f6KHEuK4rGpTsg="
[mod."golang.org/x/sys"]
version = "v0.15.0"
hash = "sha256-n7TlABF6179RzGq3gctPDKDPRtDfnwPdjNCMm8ps2KY="
version = "v0.21.0"
hash = "sha256-gapzPWuEqY36V6W2YhIDYR49sEvjJRd7bSuf9K1f4JY="
[mod."golang.org/x/text"]
version = "v0.14.0"
hash = "sha256-yh3B0tom1RfzQBf1RNmfdNWF1PtiqxV41jW1GVS6JAg="
Expand Down
8 changes: 4 additions & 4 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ var (
type Func func(ctx context.Context) error

// RecoveryCallback is a function to be called when fatal errors/panics occur in Handlers
type RecoveryCallback func(ctx context.Context, err error) (erro error)
type RecoveryCallback func(ctx context.Context, err error, stackTrace string) (erro error)

// DefaultRecoveryCallback is the function that gets called by default when handlers panic
func DefaultRecoveryCallback(_ context.Context, _ error) (err error) {
slog.Error("recovering from a panic in the job handler", slog.Any("stack", string(debug.Stack())))
func DefaultRecoveryCallback(_ context.Context, recoveredError error, stack string) (err error) {
slog.Error("recovering from a panic in the job handler", slog.Any("error", recoveredError), slog.Any("stack", stack))
return nil
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func Exec(ctx context.Context, handler Handler) (err error) {
err = errorFromPanic(x)
errCh <- err
if handler.RecoverCallback != nil {
err = handler.RecoverCallback(ctx, err)
err = handler.RecoverCallback(ctx, err, string(debug.Stack()))
if err != nil {
slog.Error("handler recovery callback also failed while recovering from panic", slog.Any("error", err))
}
Expand Down
Loading
Loading