diff --git a/go/runtime/capture.go b/go/runtime/capture.go index 94616fdc48..5b23653b57 100644 --- a/go/runtime/capture.go +++ b/go/runtime/capture.go @@ -84,8 +84,13 @@ func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err } // Wait for all watches to perform their first update. for _, watch := range c.watches { - if err := <-watch.UpdateCh(); err != nil { - return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err) + select { + case err := <-watch.UpdateCh(): + if err != nil { + return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err) + } + case <-c.term.ctx.Done(): + return pf.Checkpoint{}, c.term.ctx.Err() } } diff --git a/go/runtime/derive.go b/go/runtime/derive.go index f8c1ebe028..04983841be 100644 --- a/go/runtime/derive.go +++ b/go/runtime/derive.go @@ -90,8 +90,13 @@ func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err flow.CollectionWatchRequest(d.term.taskSpec), nil, ) - if err := <-d.watch.UpdateCh(); err != nil { - return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err) + select { + case err := <-d.watch.UpdateCh(): + if err != nil { + return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err) + } + case <-d.term.ctx.Done(): + return pf.Checkpoint{}, d.term.ctx.Err() } var requestExt = &pr.DeriveRequestExt{ diff --git a/go/runtime/testing.go b/go/runtime/testing.go index 567bbd8048..87472b9033 100644 --- a/go/runtime/testing.go +++ b/go/runtime/testing.go @@ -47,8 +47,13 @@ func NewFlowTesting(ctx context.Context, inner *FlowConsumer, ajc *client.Append // Start watch over all journals. // This is reasonable only because we're running within a temporary data-plane. var watch = client.NewWatchedList(ctx, ajc, pb.ListRequest{}, nil) - if err := <-watch.UpdateCh(); err != nil { - return nil, fmt.Errorf("staring journal watch: %w", err) + select { + case err := <-watch.UpdateCh(): + if err != nil { + return nil, fmt.Errorf("initializing journal watch: %w", err) + } + case <-ctx.Done(): + return nil, ctx.Err() } svc, err := bindings.NewTaskService(