Skip to content

Commit

Permalink
[prism] Catch panics in primary execution goroutines. (#32210)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Aug 15, 2024
1 parent 2d14076 commit 92087f2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
}()
// Watermark evaluation goroutine.
go func() {
defer func() {
// In case of panics in bundle generation, fail and cancel the job.
if e := recover(); e != nil {
upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v", e))
}
}()
defer close(runStageCh)

// If we have a test stream, clear out existing refreshes, so the test stream can
Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ type stage struct {
OutputsToCoders map[string]engine.PColInfo
}

func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error {
func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s)
}
}()
slog.Debug("Execute: starting bundle", "bundle", rb)

var b *worker.B
Expand Down

0 comments on commit 92087f2

Please sign in to comment.