Skip to content

Commit

Permalink
batcher: allow skipping ack on flush errors
Browse files Browse the repository at this point in the history
- Skip acks if ErrorHandler returns ErrDontAck
  • Loading branch information
dfuentes committed Nov 5, 2024
1 parent b1176c0 commit d658cd4
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
17 changes: 16 additions & 1 deletion x/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
"github.com/segmentio/ksuid"
)

// ErrDontAck should be returned by ErrorHandlers when they wish to
// signal to the batcher to skip acking a message as delivered, but
// continue to process. For example, if an error is retryable and
// will be retried upstream at the source if an ack is not received
// before some timeout.
var ErrDontAck = errors.New("Destination encountered a retryable error")

// Flusher is the core interface that the user of this package must implement
// to get the batching functionality.
// It takes a slice of messages and returns an error if the flush fails. It's
Expand Down Expand Up @@ -353,8 +360,16 @@ func (d *Destination[T]) doflush(ctx context.Context, msgs []kawa.Message[T], ac
slog.Debug("flush err", "error", err)
err := d.errorHandler.HandleError(ctx, err, msgs)
if err != nil {
// If error handler returns ErrDontAck, this means we want the
// batcher to continue running, but to skip acknowledging the delivery
// of the affected messages
if errors.Is(err, ErrDontAck) {
return
}

// Otherwise, if error handler returns an error, then we exit by exposing
// the error upstream
d.flusherr <- err
// if error handler returns an error, then we exit
return
}
}
Expand Down
37 changes: 37 additions & 0 deletions x/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,41 @@ func TestBatcherErrors(t *testing.T) {
err = <-errc
assert.ErrorIs(t, err, flushErr)
})

t.Run("Don't ack messages if flush handler returns ErrRetryable", func(t *testing.T) {
var retryHandler = func(ctx context.Context, err error, msgs []kawa.Message[string]) error {
return ErrDontAck
}
bat := NewDestination[string](
FlushFunc[string](ff),
ErrorFunc[string](retryHandler),
FlushLength(1),
FlushParallelism(1),
)
errc := make(chan error)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)

go func(c context.Context, ec chan error) {
ec <- bat.Run(c)
}(ctx, errc)

messages := []kawa.Message[string]{
{Value: "one"},
{Value: "two"},
{Value: "three"},
{Value: "ten"},
}

ackCount := 0
err := bat.Send(ctx, func() { ackCount += 1 }, messages...)
assert.NoError(t, err)
time.Sleep(50 * time.Millisecond)
cancel()

err = <-errc
assert.ErrorIs(t, err, nil)

assert.Equal(t, 0, ackCount)
})
}

0 comments on commit d658cd4

Please sign in to comment.