Skip to content

Commit

Permalink
document behavior, fanout node return nacked message error
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jun 24, 2022
1 parent d6c9641 commit f3273f3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
8 changes: 7 additions & 1 deletion pkg/pipeline/stream/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) {
writeTime := time.Now()
err = n.Destination.Write(msg.Ctx, msg.Record)
if err != nil {
// An error in Write is a fatal error, we probably won't be able to
// process any further messages because there is a problem in the
// communication with the plugin. We need to let the acker node know
// that it shouldn't wait to receive an ack for the message, we need
// to nack the message to not leave it open and then return the
// error to stop the pipeline.
n.AckerNode.Forget(msg)
_ = msg.Nack(err) // TODO think this through if it makes sense to return the error
_ = msg.Nack(err)
return cerrors.Errorf("error writing to destination: %w", err)
}
n.ConnectorTimer.Update(time.Since(writeTime))
Expand Down
5 changes: 2 additions & 3 deletions pkg/pipeline/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {

// TODO make sure acks are called in the right order or this will block
// forever. Right now we rely on connectors sending acks back in the
// correct order and this should generally be true, but we can't be
// completely sure and a badly written connector shouldn't provoke a
// deadlock.
// correct order and this should generally be true, but a badly written
// connector could provoke a deadlock, we could prevent that.
err = n.handleAck(msg, err)
if err != nil {
return err
Expand Down
23 changes: 22 additions & 1 deletion pkg/pipeline/stream/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func (n *FanoutNode) Run(ctx context.Context) error {

select {
case <-ctx.Done():
_ = msg.Nack(ctx.Err()) // TODO handle this, don't approve PR unless this is handled
// we can ignore the error, it will show up in the
// original msg
_ = newMsg.Nack(ctx.Err())
return
case n.out[i] <- newMsg:
}
Expand All @@ -116,6 +118,25 @@ func (n *FanoutNode) Run(ctx context.Context) error {
// also there is no need to listen to ctx.Done because that's what
// the go routines are doing already
wg.Wait()

// check if the context is still alive
if ctx.Err() != nil {
// context was closed - if the message was nacked there's a high
// chance it was nacked in this node by one of the goroutines
if msg.Status() == MessageStatusNacked {
// check if the message nack returned an error (Nack is
// idempotent and will return the same error as in the first
// call), return it if it returns an error
if err := msg.Nack(nil); err != nil {
return err
}
}
// the message is not nacked, it must have been sent to all
// downstream nodes just before the context got cancelled, we
// don't care about the message anymore, so we just return the
// context error
return ctx.Err()
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/pipeline/stream/source_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (n *SourceAckerNode) Run(ctx context.Context) error {

// enqueue message in semaphore
ticket := n.sem.Enqueue()
// TODO make sure that if an ack/nack fails we stop forwarding acks
n.registerAckHandler(msg, ticket)
n.registerNackHandler(msg, ticket)

Expand Down

0 comments on commit f3273f3

Please sign in to comment.