From 1f3e40e9b88fc61d7a19cea3b3fb49381da67272 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Wed, 13 Jul 2022 18:10:17 +0200 Subject: [PATCH] Remove message status dropped (#487) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * remove message status dropped * document behavior, fanout node return nacked message error * don't forward acks after a failed ack/nack * use cerrors * use cerrors.New * use LogOrReplace * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause --- pkg/pipeline/stream/destination.go | 9 +- pkg/pipeline/stream/destination_acker.go | 52 +++--- pkg/pipeline/stream/doc.go | 17 +- pkg/pipeline/stream/fanin.go | 3 +- pkg/pipeline/stream/fanout.go | 55 +++--- pkg/pipeline/stream/message.go | 134 ++++---------- pkg/pipeline/stream/message_test.go | 183 ++------------------ pkg/pipeline/stream/messagestatus_string.go | 5 +- pkg/pipeline/stream/metrics.go | 3 +- pkg/pipeline/stream/processor.go | 19 +- pkg/pipeline/stream/source.go | 3 +- pkg/pipeline/stream/source_acker.go | 3 +- 12 files changed, 119 insertions(+), 367 deletions(-) diff --git a/pkg/pipeline/stream/destination.go b/pkg/pipeline/stream/destination.go index 0c75f2179..23f775b3a 100644 --- a/pkg/pipeline/stream/destination.go +++ b/pkg/pipeline/stream/destination.go @@ -94,7 +94,14 @@ func (n *DestinationNode) Run(ctx context.Context) (err error) { writeTime := time.Now() err = n.Destination.Write(msg.Ctx, msg.Record) if err != nil { - n.AckerNode.ForgetAndDrop(msg) + // An error in Write is a fatal error, we probably won't be able to + // process any further messages because there is a problem in the + // communication with the plugin. We need to let the acker node know + // that it shouldn't wait to receive an ack for the message, we need + // to nack the message to not leave it open and then return the + // error to stop the pipeline. + n.AckerNode.Forget(msg) + _ = msg.Nack(err) return cerrors.Errorf("error writing to destination: %w", err) } n.ConnectorTimer.Update(time.Since(writeTime)) diff --git a/pkg/pipeline/stream/destination_acker.go b/pkg/pipeline/stream/destination_acker.go index 2e437d89c..317707ecb 100644 --- a/pkg/pipeline/stream/destination_acker.go +++ b/pkg/pipeline/stream/destination_acker.go @@ -23,6 +23,7 @@ import ( "github.com/conduitio/conduit/pkg/connector" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/multierror" "github.com/conduitio/conduit/pkg/plugin" "github.com/conduitio/conduit/pkg/record" ) @@ -70,13 +71,13 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { n.init() defer func() { - dropAllErr := n.teardown() + teardownErr := n.teardown(err) if err != nil { // we are already returning an error, just log this one - n.logger.Err(ctx, dropAllErr).Msg("acker node stopped without processing all messages") + n.logger.Err(ctx, teardownErr).Msg("acker node stopped without processing all messages") } else { - // return dropAllErr instead - err = dropAllErr + // return teardownErr instead + err = teardownErr } }() @@ -115,9 +116,8 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { // TODO make sure acks are called in the right order or this will block // forever. Right now we rely on connectors sending acks back in the - // correct order and this should generally be true, but we can't be - // completely sure and a badly written connector shouldn't provoke a - // deadlock. + // correct order and this should generally be true, but a badly written + // connector could provoke a deadlock, we could prevent that. err = n.handleAck(msg, err) if err != nil { return err @@ -125,38 +125,39 @@ func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { } } -// teardown will drop all messages still in the cache and return an error in +// teardown will nack all messages still in the cache and return an error in // case there were still unprocessed messages in the cache. -func (n *DestinationAckerNode) teardown() error { - var dropped int +func (n *DestinationAckerNode) teardown(reason error) error { + var nacked int + var err error n.cache.Range(func(pos record.Position, msg *Message) bool { - msg.Drop() - dropped++ + err = multierror.Append(err, msg.Nack(reason)) + nacked++ return true }) - if dropped > 0 { - return cerrors.Errorf("dropped %d messages when stopping acker node", dropped) + if err != nil { + return cerrors.Errorf("nacked %d messages when stopping destination acker node, some nacks failed: %w", nacked, err) + } + if nacked > 0 { + return cerrors.Errorf("nacked %d messages when stopping destination acker node", nacked) } return nil } // handleAck either acks or nacks the message, depending on the supplied error. -// If the nacking or acking fails, the message is dropped and the error is -// returned. +// If the nacking or acking fails the error is returned. func (n *DestinationAckerNode) handleAck(msg *Message, err error) error { switch { case err != nil: n.logger.Trace(msg.Ctx).Err(err).Msg("nacking message") err = msg.Nack(err) if err != nil { - msg.Drop() return cerrors.Errorf("error while nacking message: %w", err) } default: n.logger.Trace(msg.Ctx).Msg("acking message") err = msg.Ack() if err != nil { - msg.Drop() return cerrors.Errorf("error while acking message: %w", err) } } @@ -187,17 +188,10 @@ func (n *DestinationAckerNode) ExpectAck(msg *Message) error { return nil } -// ForgetAndDrop signals the handler that an ack for this message won't be -// received, and it should remove it from its cache. In case an ack for this -// message wasn't yet received it drops the message, otherwise it does nothing. -func (n *DestinationAckerNode) ForgetAndDrop(msg *Message) { - _, ok := n.cache.LoadAndDelete(msg.Record.Position) - if !ok { - // message wasn't found in the cache, looks like the message was already - // acked / nacked - return - } - msg.Drop() +// Forget signals the handler that an ack for this message won't be received, +// and it should remove it from its cache. +func (n *DestinationAckerNode) Forget(msg *Message) { + n.cache.LoadAndDelete(msg.Record.Position) } // Wait can be used to wait for the count of outstanding acks to drop to 0 or diff --git a/pkg/pipeline/stream/doc.go b/pkg/pipeline/stream/doc.go index eba22e8a9..39d9db9f7 100644 --- a/pkg/pipeline/stream/doc.go +++ b/pkg/pipeline/stream/doc.go @@ -28,18 +28,15 @@ A message can have of these statuses: it's passed around between the nodes. Acked Once a node successfully processes the message (e.g. it is sent to the destination or is filtered out by a processor) it is acked. - Nacked If some node fails to process the message it can nack the message - and once it's successfully nacked (e.g. sent to a dead letter queue) - it becomes nacked. - Dropped If a node experiences a non-recoverable error or has to stop running - without sending the message to the next node (e.g. force stop) it - can drop the message, then the message status changes to dropped. + Nacked If some node fails to process the message it nacks the message. In + that case a handler can pick it up to send it to a dead letter + queue. -In other words, once a node receives a message it has 4 options for how to +In other words, once a node receives a message it has 3 options for how to handle it: it can either pass it to the next node (message stays open), ack the -message and keep running, nack the message and keep running or drop the message -and stop running. This means that no message will be left in an open status when -the pipeline stops. +message and keep running if ack is successful, nack the message and keep running +if nack is successful. This means that no message will be left in an open status +when the pipeline stops. Nodes can register functions on the message which will be called when the status of a message changes. For more information see StatusChangeHandler. diff --git a/pkg/pipeline/stream/fanin.go b/pkg/pipeline/stream/fanin.go index 1b6b69144..230be0f29 100644 --- a/pkg/pipeline/stream/fanin.go +++ b/pkg/pipeline/stream/fanin.go @@ -76,8 +76,7 @@ func (n *FaninNode) Run(ctx context.Context) error { select { case <-ctx.Done(): - msg.Drop() - return ctx.Err() + return msg.Nack(ctx.Err()) case n.out <- msg: } } diff --git a/pkg/pipeline/stream/fanout.go b/pkg/pipeline/stream/fanout.go index c6c2aa9ec..e67779e90 100644 --- a/pkg/pipeline/stream/fanout.go +++ b/pkg/pipeline/stream/fanout.go @@ -92,8 +92,6 @@ func (n *FanoutNode) Run(ctx context.Context) error { return msg.Ack() case <-msg.Nacked(): return cerrors.New("message was nacked by another node") - case <-msg.Dropped(): - return ErrMessageDropped } }), ) @@ -104,31 +102,12 @@ func (n *FanoutNode) Run(ctx context.Context) error { return msg.Nack(reason) }), ) - newMsg.RegisterDropHandler( - // wrap drop handler to make sure msg is not overwritten - // by the time drop handler is called - n.wrapDropHandler(msg, func(msg *Message, reason error) { - defer func() { - if err := recover(); err != nil { - if cerrors.Is(err.(error), ErrUnexpectedMessageStatus) { - // the unexpected message status is expected (I know, right?) - // this rare case might happen if one downstream node first - // nacks the message and afterwards another node tries to drop - // the message - // this is a valid use case, the panic is trying to make us - // notice all other invalid use cases - return - } - panic(err) // re-panic - } - }() - msg.Drop() - }), - ) select { case <-ctx.Done(): - msg.Drop() + // we can ignore the error, it will show up in the + // original msg + _ = newMsg.Nack(ctx.Err()) return case n.out[i] <- newMsg: } @@ -139,6 +118,25 @@ func (n *FanoutNode) Run(ctx context.Context) error { // also there is no need to listen to ctx.Done because that's what // the go routines are doing already wg.Wait() + + // check if the context is still alive + if ctx.Err() != nil { + // context was closed - if the message was nacked there's a high + // chance it was nacked in this node by one of the goroutines + if msg.Status() == MessageStatusNacked { + // check if the message nack returned an error (Nack is + // idempotent and will return the same error as in the first + // call), return it if it returns an error + if err := msg.Nack(nil); err != nil { + return err + } + } + // the message is not nacked, it must have been sent to all + // downstream nodes just before the context got cancelled, we + // don't care about the message anymore, so we just return the + // context error + return ctx.Err() + } } } } @@ -161,15 +159,6 @@ func (n *FanoutNode) wrapNackHandler(origMsg *Message, f NackHandler) NackHandle } } -// wrapDropHandler modifies the drop handler, so it's called with the original -// message received by FanoutNode instead of the new message created by -// FanoutNode. -func (n *FanoutNode) wrapDropHandler(origMsg *Message, f DropHandler) DropHandler { - return func(_ *Message, reason error) { - f(origMsg, reason) - } -} - func (n *FanoutNode) Sub(in <-chan *Message) { if n.in != nil { panic("can't connect FanoutNode to more than one in") diff --git a/pkg/pipeline/stream/message.go b/pkg/pipeline/stream/message.go index 69be73212..2c8f0748e 100644 --- a/pkg/pipeline/stream/message.go +++ b/pkg/pipeline/stream/message.go @@ -26,18 +26,16 @@ import ( "github.com/conduitio/conduit/pkg/record" ) -// MessageStatus represents the state of the message (acked, nacked, dropped or open). +// MessageStatus represents the state of the message (acked, nacked or open). type MessageStatus int const ( MessageStatusAcked MessageStatus = iota MessageStatusNacked MessageStatusOpen - MessageStatusDropped ) var ( - ErrMessageDropped = cerrors.New("message is dropped") ErrUnexpectedMessageStatus = cerrors.New("unexpected message status") ) @@ -45,44 +43,40 @@ var ( type Message struct { // Ctx is the context in which the record was fetched. It should be used for // any function calls when processing the message. If the context is done - // the message should be dropped as soon as possible and not processed + // the message should be nacked as soon as possible and not processed // further. Ctx context.Context // Record represents a single record attached to the message. Record record.Record - // acked, nacked and dropped are channels used to capture acks, nacks and - // drops. When a message is acked, nacked or dropped the corresponding - // channel is closed. - acked chan struct{} - nacked chan struct{} - dropped chan struct{} + // acked and nacked and are channels used to capture acks and nacks. When a + // message is acked or nacked the corresponding channel is closed. + acked chan struct{} + nacked chan struct{} - // handler is executed when Ack, Nack or Drop is called. + // handler is executed when Ack or Nack is called. handler StatusChangeHandler // hasNackHandler is true if at least one nack handler was registered. hasNackHandler bool - // ackNackReturnValue is cached the first time Ack, Nack or Drop is executed. + // ackNackReturnValue is cached the first time Ack or Nack is executed. ackNackReturnValue error // initOnce is guarding the initialization logic of a message. initOnce sync.Once - // ackNackDropOnce is guarding the acking/nacking/dropping logic of a message. - ackNackDropOnce sync.Once - // handlerGuard guards fields ackHandlers and nackHandlers. - handlerGuard sync.Mutex + // ackNackOnce is guarding the acking/nacking logic of a message. + ackNackOnce sync.Once } type ( - // StatusChangeHandler is executed when a message status changes. The handlers - // are triggered by a call to either of these functions: Message.Nack, - // Message.Ack, Message.Drop. These functions will block until the handlers + // StatusChangeHandler is executed when a message status changes. The + // handlers are triggered by a call to either of these functions: + // Message.Nack, Message.Ack. These functions will block until the handlers // finish handling the message and will return the error returned by the // handlers. - // The function receives the message and the status change describing the old - // and new message status as well as the reason for the status change in case of - // a nack or drop. + // The function receives the message and the status change describing the + // old and new message status as well as the reason for the status change in + // case of a nack. StatusChangeHandler func(*Message, StatusChange) error // AckHandler is a variation of the StatusChangeHandler that is only called @@ -92,10 +86,6 @@ type ( // NackHandler is a variation of the StatusChangeHandler that is only called // when a message is nacked. For more info see StatusChangeHandler. NackHandler func(*Message, error) error - - // DropHandler is a variation of the StatusChangeHandler that is only called - // when a message is dropped. For more info see StatusChangeHandler. - DropHandler func(*Message, error) ) // StatusChange is passed to StatusChangeHandler when the status of a message @@ -104,7 +94,7 @@ type StatusChange struct { Old MessageStatus New MessageStatus // Reason contains the error that triggered the status change in case of a - // nack or drop. + // nack. Reason error } @@ -113,7 +103,6 @@ func (m *Message) init() { m.initOnce.Do(func() { m.acked = make(chan struct{}) m.nacked = make(chan struct{}) - m.dropped = make(chan struct{}) // initialize empty status handler m.handler = func(msg *Message, change StatusChange) error { return nil } }) @@ -131,8 +120,6 @@ func (m *Message) ID() string { // reverse order of how they were registered. func (m *Message) RegisterStatusHandler(h StatusChangeHandler) { m.init() - m.handlerGuard.Lock() - defer m.handlerGuard.Unlock() if m.Status() != MessageStatusOpen { panic(cerrors.Errorf("BUG: tried to register handler on message %s, it has already been handled", m.ID())) @@ -172,23 +159,7 @@ func (m *Message) RegisterNackHandler(h NackHandler) { m.hasNackHandler = true } -// RegisterDropHandler is used to register a function that will be called when -// the message is dropped. This function can only be called if the message -// status is open, otherwise it panics. -func (m *Message) RegisterDropHandler(h DropHandler) { - m.RegisterStatusHandler(func(msg *Message, change StatusChange) error { - if change.New != MessageStatusDropped { - return nil - } - h(msg, change.Reason) - return nil - }) -} - func (m *Message) notifyStatusHandlers(status MessageStatus, reason error) error { - m.handlerGuard.Lock() - defer m.handlerGuard.Unlock() - return m.handler(m, StatusChange{ Old: m.Status(), New: status, @@ -197,26 +168,19 @@ func (m *Message) notifyStatusHandlers(status MessageStatus, reason error) error } // Ack marks the message as acked, calls the corresponding status change -// handlers and closes the channel returned by Acked. If an ack handler returns -// an error, the message is dropped instead, which means that registered status -// change handlers are again notified about the drop and the channel returned by -// Dropped is closed instead. +// handlers and closes the channel returned by Acked. Errors from ack handlers +// get collected and returned as a single error. If Ack returns an error, the +// caller node should stop processing new messages and return the error. // Calling Ack after the message has already been nacked will panic, while -// subsequent calls to Ack on an acked or dropped message are a noop and return -// the same value. +// subsequent calls to Ack on an acked message are a noop and return the same +// value. func (m *Message) Ack() error { m.init() - m.ackNackDropOnce.Do(func() { + m.ackNackOnce.Do(func() { m.ackNackReturnValue = m.notifyStatusHandlers(MessageStatusAcked, nil) - if m.ackNackReturnValue != nil { - // unsuccessful ack, message is dropped - _ = m.notifyStatusHandlers(MessageStatusDropped, m.ackNackReturnValue) - close(m.dropped) - return - } close(m.acked) }) - if s := m.Status(); s != MessageStatusAcked && s != MessageStatusDropped { + if s := m.Status(); s != MessageStatusAcked { panic(cerrors.Errorf("BUG: message %s ack failed, status is %s: %w", m.ID(), s, ErrUnexpectedMessageStatus)) } return m.ackNackReturnValue @@ -224,55 +188,29 @@ func (m *Message) Ack() error { // Nack marks the message as nacked, calls the registered status change handlers // and closes the channel returned by Nacked. If no nack handlers were -// registered or a nack handler returns an error, the message is dropped -// instead, which means that registered status change handlers are again -// notified about the drop and the channel returned by Dropped is closed -// instead. +// registered Nack will return an error. Errors from nack handlers get collected +// and returned as a single error. If Nack returns an error, the caller node +// should stop processing new messages and return the error. // Calling Nack after the message has already been acked will panic, while -// subsequent calls to Nack on a nacked or dropped message are a noop and return -// the same value. +// subsequent calls to Nack on a nacked message are a noop and return the same +// value. func (m *Message) Nack(reason error) error { m.init() - m.ackNackDropOnce.Do(func() { + m.ackNackOnce.Do(func() { if !m.hasNackHandler { // we enforce at least one nack handler, otherwise nacks will go unnoticed m.ackNackReturnValue = cerrors.Errorf("no nack handler on message %s: %w", m.ID(), reason) } else { m.ackNackReturnValue = m.notifyStatusHandlers(MessageStatusNacked, reason) } - if m.ackNackReturnValue != nil { - // unsuccessful nack, message is dropped - _ = m.notifyStatusHandlers(MessageStatusDropped, m.ackNackReturnValue) - close(m.dropped) - return - } close(m.nacked) }) - if s := m.Status(); s != MessageStatusNacked && s != MessageStatusDropped { + if s := m.Status(); s != MessageStatusNacked { panic(cerrors.Errorf("BUG: message %s nack failed, status is %s: %w", m.ID(), s, ErrUnexpectedMessageStatus)) } return m.ackNackReturnValue } -// Drop marks the message as dropped, calls the registered status change -// handlers and closes the channel returned by Dropped. -// Calling Drop after the message has already been acked or nacked will panic, -// while subsequent calls to Drop on a dropped message are a noop. -func (m *Message) Drop() { - m.init() - m.ackNackDropOnce.Do(func() { - m.ackNackReturnValue = ErrMessageDropped - err := m.notifyStatusHandlers(MessageStatusDropped, m.ackNackReturnValue) - if err != nil { - panic(cerrors.Errorf("BUG: drop handlers should never return an error (message %s): %w", m.ID(), err)) - } - close(m.dropped) - }) - if s := m.Status(); s != MessageStatusDropped { - panic(cerrors.Errorf("BUG: message %s drop failed, status is %s: %w", m.ID(), s, ErrUnexpectedMessageStatus)) - } -} - // Acked returns a channel that's closed when the message has been acked. // Successive calls to Acked return the same value. This function can be used to // wait for a message to be acked without notifying the acker. @@ -289,14 +227,6 @@ func (m *Message) Nacked() <-chan struct{} { return m.nacked } -// Dropped returns a channel that's closed when the message has been dropped. -// Successive calls to Dropped return the same value. This function can be used -// to wait for a message to be dropped without notifying the dropper. -func (m *Message) Dropped() <-chan struct{} { - m.init() - return m.dropped -} - // Clone returns a cloned message with the same content but separate ack and // nack handling. func (m *Message) Clone() *Message { @@ -313,8 +243,6 @@ func (m *Message) Status() MessageStatus { return MessageStatusAcked case <-m.nacked: return MessageStatusNacked - case <-m.dropped: - return MessageStatusDropped default: return MessageStatusOpen } diff --git a/pkg/pipeline/stream/message_test.go b/pkg/pipeline/stream/message_test.go index 12c35b2f5..4f8445614 100644 --- a/pkg/pipeline/stream/message_test.go +++ b/pkg/pipeline/stream/message_test.go @@ -84,9 +84,8 @@ func TestMessage_Ack_WithFailingHandler(t *testing.T) { msg Message wantErr = cerrors.New("oops") - ackedMessageHandlerCallCount int - droppedMessageHandlerCallCount int - statusMessageHandlerCallCount int + ackedMessageHandlerCallCount int + statusMessageHandlerCallCount int ) { @@ -104,18 +103,11 @@ func TestMessage_Ack_WithFailingHandler(t *testing.T) { ackedMessageHandlerCallCount++ return nil }) - // fourth handler should be called twice, once for ack, once for drop + // fourth handler should be called once msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { statusMessageHandlerCallCount++ return nil }) - // drop handler should be called after the ack fails - msg.RegisterDropHandler(func(msg *Message, reason error) { - if ackedMessageHandlerCallCount != 2 { - t.Fatal("expected acked message handlers to already be called") - } - droppedMessageHandlerCallCount++ - }) // nack handler should not be called msg.RegisterNackHandler(func(*Message, error) error { t.Fatalf("did not expect nack handler to be called") @@ -129,26 +121,14 @@ func TestMessage_Ack_WithFailingHandler(t *testing.T) { if err != wantErr { t.Fatalf("ack expected error %v, got: %v", wantErr, err) } - assertMessageIsDropped(t, &msg) + assertMessageIsAcked(t, &msg) if ackedMessageHandlerCallCount != 2 { t.Fatalf("expected acked message handler to be called twice, got %d calls", ackedMessageHandlerCallCount) } - if droppedMessageHandlerCallCount != 1 { - t.Fatalf("expected dropped message handler to be called once, got %d calls", droppedMessageHandlerCallCount) - } - if statusMessageHandlerCallCount != 2 { - t.Fatalf("expected status message handler to be called twice, got %d calls", statusMessageHandlerCallCount) + if statusMessageHandlerCallCount != 1 { + t.Fatalf("expected status message handler to be called once, got %d calls", statusMessageHandlerCallCount) } } - - // nacking the message should return the same error - err := msg.Nack(cerrors.New("reason")) - if err != wantErr { - t.Fatalf("nack expected error %v, got %v", wantErr, err) - } - - // dropping the message shouldn't do anything - msg.Drop() } func TestMessage_Nack_WithoutHandler(t *testing.T) { @@ -161,20 +141,14 @@ func TestMessage_Nack_WithoutHandler(t *testing.T) { if err1 == nil { t.Fatal("nack expected error, got nil") } - assertMessageIsDropped(t, &msg) + assertMessageIsNacked(t, &msg) // nacking again should return the same error err2 := msg.Nack(cerrors.New("reason")) if err1 != err2 { t.Fatalf("nack expected error %v, got %v", err1, err2) } - assertMessageIsDropped(t, &msg) - - // acking the message should return the same error - err3 := msg.Ack() - if err1 != err3 { - t.Fatalf("ack expected error %v, got %v", err1, err3) - } + assertMessageIsNacked(t, &msg) } func TestMessage_Nack_WithHandler(t *testing.T) { @@ -218,9 +192,8 @@ func TestMessage_Nack_WithFailingHandler(t *testing.T) { msg Message wantErr = cerrors.New("oops") - nackedMessageHandlerCallCount int - droppedMessageHandlerCallCount int - statusMessageHandlerCallCount int + nackedMessageHandlerCallCount int + statusMessageHandlerCallCount int ) { @@ -238,18 +211,11 @@ func TestMessage_Nack_WithFailingHandler(t *testing.T) { nackedMessageHandlerCallCount++ return nil }) - // fourth handler should be called twice, once for ack, once for drop + // fourth handler should be called once msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { statusMessageHandlerCallCount++ return nil }) - // drop handler should be called after the nack fails - msg.RegisterDropHandler(func(msg *Message, reason error) { - if nackedMessageHandlerCallCount != 2 { - t.Fatal("expected nacked message handlers to already be called") - } - droppedMessageHandlerCallCount++ - }) // ack handler should not be called msg.RegisterAckHandler(func(*Message) error { t.Fatalf("did not expect ack handler to be called") @@ -263,90 +229,16 @@ func TestMessage_Nack_WithFailingHandler(t *testing.T) { if err != wantErr { t.Fatalf("nack expected error %v, got: %v", wantErr, err) } - assertMessageIsDropped(t, &msg) + assertMessageIsNacked(t, &msg) if nackedMessageHandlerCallCount != 2 { t.Fatalf("expected nacked message handler to be called twice, got %d calls", nackedMessageHandlerCallCount) } - if droppedMessageHandlerCallCount != 1 { - t.Fatalf("expected dropped message handler to be called once, got %d calls", droppedMessageHandlerCallCount) - } - if statusMessageHandlerCallCount != 2 { - t.Fatalf("expected status message handler to be called twice, got %d calls", statusMessageHandlerCallCount) - } - } - - // acking the message should return the same error - err := msg.Ack() - if err != wantErr { - t.Fatalf("ack expected error %v, got %v", wantErr, err) - } - - // dropping the message shouldn't do anything - msg.Drop() -} - -func TestMessage_Drop_WithoutHandler(t *testing.T) { - var msg Message - - assertMessageIsOpen(t, &msg) - - msg.Drop() - assertMessageIsDropped(t, &msg) - - // doing the same thing again shouldn't do anything - msg.Drop() - assertMessageIsDropped(t, &msg) -} - -func TestMessage_Drop_WithHandler(t *testing.T) { - var ( - msg Message - - droppedMessageHandlerCallCount int - statusMessageHandlerCallCount int - ) - - { - msg.RegisterDropHandler(func(msg *Message, reason error) { - droppedMessageHandlerCallCount++ - }) - // second handler should be called once for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { - statusMessageHandlerCallCount++ - return nil - }) - } - - // doing the same thing twice should have the same result - for i := 0; i < 2; i++ { - msg.Drop() - assertMessageIsDropped(t, &msg) - if droppedMessageHandlerCallCount != 1 { - t.Fatalf("expected dropped message handler to be called once, got %d calls", droppedMessageHandlerCallCount) - } if statusMessageHandlerCallCount != 1 { t.Fatalf("expected status message handler to be called once, got %d calls", statusMessageHandlerCallCount) } } } -func TestMessage_Drop_WithFailingHandler(t *testing.T) { - var msg Message - - // handler return error for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { - return cerrors.New("oops") - }) - - defer func() { - if recover() == nil { - t.Fatalf("expected msg.Drop to panic") - } - }() - - msg.Drop() -} - func TestMessage_StatusChangeTwice(t *testing.T) { assertAckPanics := func(msg *Message) { defer func() { @@ -364,16 +256,8 @@ func TestMessage_StatusChangeTwice(t *testing.T) { }() _ = msg.Nack(nil) } - assertDropPanics := func(msg *Message) { - defer func() { - if recover() == nil { - t.Fatalf("expected msg.Drop to panic") - } - }() - msg.Drop() - } - // nack or drop after the message is acked should panic + // nack after the message is acked should panic t.Run("acked message", func(t *testing.T) { var msg Message err := msg.Ack() @@ -381,7 +265,6 @@ func TestMessage_StatusChangeTwice(t *testing.T) { t.Fatalf("ack did not expect error, got %v", err) } assertNackPanics(&msg) - assertDropPanics(&msg) }) // registering a handler after the message is nacked should panic @@ -394,23 +277,6 @@ func TestMessage_StatusChangeTwice(t *testing.T) { t.Fatalf("ack did not expect error, got %v", err) } assertAckPanics(&msg) - assertDropPanics(&msg) - }) - - // registering a handler after the message is dropped should panic - t.Run("dropped message", func(t *testing.T) { - var msg Message - msg.Drop() - - err := msg.Ack() - if err != ErrMessageDropped { - t.Fatalf("expected %v, got %v", ErrMessageDropped, err) - } - - err = msg.Nack(nil) - if err != ErrMessageDropped { - t.Fatalf("expected %v, got %v", ErrMessageDropped, err) - } }) } @@ -431,14 +297,6 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { }() msg.RegisterNackHandler(func(*Message, error) error { return nil }) } - assertRegisterDropHandlerPanics := func(msg *Message) { - defer func() { - if recover() == nil { - t.Fatalf("expected msg.RegisterDropHandler to panic") - } - }() - msg.RegisterDropHandler(func(*Message, error) {}) - } // registering a handler after the message is acked should panic t.Run("acked message", func(t *testing.T) { @@ -449,7 +307,6 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { } assertRegisterAckHandlerPanics(&msg) assertRegisterNackHandlerPanics(&msg) - assertRegisterDropHandlerPanics(&msg) }) // registering a handler after the message is nacked should panic @@ -463,16 +320,6 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { } assertRegisterAckHandlerPanics(&msg) assertRegisterNackHandlerPanics(&msg) - assertRegisterDropHandlerPanics(&msg) - }) - - // registering a handler after the message is dropped should panic - t.Run("dropped message", func(t *testing.T) { - var msg Message - msg.Drop() - assertRegisterAckHandlerPanics(&msg) - assertRegisterNackHandlerPanics(&msg) - assertRegisterDropHandlerPanics(&msg) }) } @@ -487,7 +334,3 @@ func assertMessageIsNacked(t *testing.T, msg *Message) { func assertMessageIsOpen(t *testing.T, msg *Message) { assert.Equal(t, MessageStatusOpen, msg.Status()) } - -func assertMessageIsDropped(t *testing.T, msg *Message) { - assert.Equal(t, MessageStatusDropped, msg.Status()) -} diff --git a/pkg/pipeline/stream/messagestatus_string.go b/pkg/pipeline/stream/messagestatus_string.go index 8b339bc5e..d748cd0ba 100644 --- a/pkg/pipeline/stream/messagestatus_string.go +++ b/pkg/pipeline/stream/messagestatus_string.go @@ -11,12 +11,11 @@ func _() { _ = x[MessageStatusAcked-0] _ = x[MessageStatusNacked-1] _ = x[MessageStatusOpen-2] - _ = x[MessageStatusDropped-3] } -const _MessageStatus_name = "AckedNackedOpenDropped" +const _MessageStatus_name = "AckedNackedOpen" -var _MessageStatus_index = [...]uint8{0, 5, 11, 15, 22} +var _MessageStatus_index = [...]uint8{0, 5, 11, 15} func (i MessageStatus) String() string { if i < 0 || i >= MessageStatus(len(_MessageStatus_index)-1) { diff --git a/pkg/pipeline/stream/metrics.go b/pkg/pipeline/stream/metrics.go index b6dd3bb13..0fce43dd7 100644 --- a/pkg/pipeline/stream/metrics.go +++ b/pkg/pipeline/stream/metrics.go @@ -65,8 +65,7 @@ func (n *MetricsNode) Run(ctx context.Context) error { err = n.base.Send(ctx, n.logger, msg) if err != nil { - msg.Drop() - return err + return msg.Nack(err) } } } diff --git a/pkg/pipeline/stream/processor.go b/pkg/pipeline/stream/processor.go index 53e2c457a..74a68a3d6 100644 --- a/pkg/pipeline/stream/processor.go +++ b/pkg/pipeline/stream/processor.go @@ -55,28 +55,27 @@ func (n *ProcessorNode) Run(ctx context.Context) error { n.ProcessorTimer.Update(time.Since(executeTime)) if err != nil { // Check for Skipped records - if err == processor.ErrSkipRecord { + switch err { + case processor.ErrSkipRecord: // NB: Ack skipped messages since they've been correctly handled err := msg.Ack() if err != nil { return cerrors.Errorf("failed to ack skipped message: %w", err) } - continue - } - err = msg.Nack(err) - if err != nil { - msg.Drop() - return cerrors.Errorf("failed to execute processor: %w", err) + default: + err = msg.Nack(err) + if err != nil { + return cerrors.Errorf("error executing processor: %w", err) + } } - // nack was handled successfully, we recovered + // error was handled successfully, we recovered continue } msg.Record = rec err = n.base.Send(ctx, n.logger, msg) if err != nil { - msg.Drop() - return err + return msg.Nack(err) } } } diff --git a/pkg/pipeline/stream/source.go b/pkg/pipeline/stream/source.go index 9c827c5fc..cb5efbdb2 100644 --- a/pkg/pipeline/stream/source.go +++ b/pkg/pipeline/stream/source.go @@ -116,8 +116,7 @@ func (n *SourceNode) Run(ctx context.Context) (err error) { err = n.base.Send(ctx, n.logger, msg) if err != nil { - msg.Drop() - return err + return msg.Nack(err) } } } diff --git a/pkg/pipeline/stream/source_acker.go b/pkg/pipeline/stream/source_acker.go index 682151cf0..c64b587ae 100644 --- a/pkg/pipeline/stream/source_acker.go +++ b/pkg/pipeline/stream/source_acker.go @@ -65,8 +65,7 @@ func (n *SourceAckerNode) Run(ctx context.Context) error { err = n.base.Send(ctx, n.logger, msg) if err != nil { - msg.Drop() - return err + return msg.Nack(err) } } }