Skip to content

Commit

Permalink
allow multiple acks to be returned from connector (#1912)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Oct 15, 2024
1 parent f92354e commit 322c078
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions pkg/lifecycle/stream/destination_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"sync"

"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/gammazero/deque"
Expand Down Expand Up @@ -122,6 +123,8 @@ func (n *DestinationAckerNode) worker(
errChan <- err
}

var acks []connector.DestinationAck

defer close(errChan)
for range signalChan {
// signal is received when a new message is in the queue
Expand All @@ -136,22 +139,29 @@ func (n *DestinationAckerNode) worker(
msg := n.queue.PopFront()
n.m.Unlock()

acks, err := n.Destination.Ack(ctx)
if err != nil {
handleError(msg, cerrors.Errorf("error while fetching acks: %w", err))
return
}
for _, ack := range acks {
if !bytes.Equal(msg.Record.Position, ack.Position) {
handleError(msg, cerrors.Errorf("received unexpected ack, expected position %q but got %q", msg.Record.Position, ack.Position))
return
}
err = n.handleAck(msg, ack.Error)
if len(acks) == 0 {
// Ack can return multiple acks, store them and check the position
// for the current message
var err error
acks, err = n.Destination.Ack(ctx)
if err != nil {
errChan <- err
handleError(msg, cerrors.Errorf("error while fetching acks: %w", err))
return
}
}

ack := acks[0]
acks = acks[1:]

if !bytes.Equal(msg.Record.Position, ack.Position) {
handleError(msg, cerrors.Errorf("received unexpected ack, expected position %q but got %q", msg.Record.Position, ack.Position))
return
}
err := n.handleAck(msg, ack.Error)
if err != nil {
errChan <- err
return
}
}
}
}
Expand Down

0 comments on commit 322c078

Please sign in to comment.