Skip to content

Commit

Permalink
go/runtime: monitor for cancellation while awaiting initial watch upd…
Browse files Browse the repository at this point in the history
…ates
  • Loading branch information
jgraettinger committed Sep 5, 2024
1 parent 9264264 commit 7438c9e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
9 changes: 7 additions & 2 deletions go/runtime/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ func (c *Capture) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err
}
// Wait for all watches to perform their first update.
for _, watch := range c.watches {
if err := <-watch.UpdateCh(); err != nil {
return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err)
select {
case err := <-watch.UpdateCh():
if err != nil {
return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err)
}
case <-c.term.ctx.Done():
return pf.Checkpoint{}, c.term.ctx.Err()
}
}

Expand Down
9 changes: 7 additions & 2 deletions go/runtime/derive.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ func (d *Derive) RestoreCheckpoint(shard consumer.Shard) (_ pf.Checkpoint, _err
flow.CollectionWatchRequest(d.term.taskSpec),
nil,
)
if err := <-d.watch.UpdateCh(); err != nil {
return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err)
select {
case err := <-d.watch.UpdateCh():
if err != nil {
return pf.Checkpoint{}, fmt.Errorf("initializing journal watch: %w", err)
}
case <-d.term.ctx.Done():
return pf.Checkpoint{}, d.term.ctx.Err()
}

var requestExt = &pr.DeriveRequestExt{
Expand Down
9 changes: 7 additions & 2 deletions go/runtime/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ func NewFlowTesting(ctx context.Context, inner *FlowConsumer, ajc *client.Append
// Start watch over all journals.
// This is reasonable only because we're running within a temporary data-plane.
var watch = client.NewWatchedList(ctx, ajc, pb.ListRequest{}, nil)
if err := <-watch.UpdateCh(); err != nil {
return nil, fmt.Errorf("staring journal watch: %w", err)
select {
case err := <-watch.UpdateCh():
if err != nil {
return nil, fmt.Errorf("initializing journal watch: %w", err)
}
case <-ctx.Done():
return nil, ctx.Err()
}

svc, err := bindings.NewTaskService(
Expand Down

0 comments on commit 7438c9e

Please sign in to comment.