-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Last position handling #504
Conversation
f347b85
to
8ad3b02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm half way through the PR, but I'll post the questions I have so far.: )
} | ||
} | ||
|
||
openMsgTracker.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we're going to get into this deferred function, only if there was an error writing a record. Is it possible that this waits indefinitely, because, for example, the destination is stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, the deferred function will be called regardless of the error that's returned (in other words - it's called always when the node stops). Now, is it possible that it waits indefinitely? I think this would be possible, yes, if the connector just starts hanging (i.e. it's still running, looks alive, not really broken, but just hangs). On the other hand, if the connector stops because of an error or panic, then DestinationAckerNode
will nack all messages that didn't get acked/nacked by the connector, thus unblocking this line.
If we wanted to fix the possibility of the pipeline getting stuck we would need to provide an option to "force stop" the pipeline (i.e. we wouldn't care what the connector is doing and just kill it). This would work for standalone plugins since we can kill the subprocess, while there's no way for us to kill built-in plugins (not that I know of at least), best we could do is to stop the nodes and "detach" from the plugin goroutines (leave them running in the background).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to fix the possibility of the pipeline getting stuck
IMHO, that would be a good thing to do, not necessarily in this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should do that, though this is probably a bit lower on the priority list.
// openMsgTracker tracks open messages until they are acked or nacked | ||
var openMsgTracker OpenMessagesTracker | ||
defer func() { | ||
stopErr := n.Destination.Stop(connectorCtx, lastPosition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see if I got this right: connectorCtx is special only because we can cancel it, because above this we have:
connectorCtx, cancel := context.WithCancel(context.Background())
defer cancel()
The above defer (with cancel()) will happen after this defer (with stop, wait for msgs, teardown). So IIUC, the usage of connectorCtx here is basically ineffective?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got everything right, except the conclusion 😄 First thing to note is that Run
receives a context that will be canceled if any node in the pipeline stops running because of an error. Second thing - when we call n.Destination.Open
we pass it a context, if that context gets canceled the stream will get closed immediately. Also any calls to the connector need to happen with an open context (e.g. Stop
and Teardown
), otherwise it's considered that the call was canceled (I expect gRPC wouldn't even deliver a call with a closed context).
So essentially we create a new context connectorCtx
to be sure it will stay open until the node actually stops running, so we can gracefully stop the connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification!
First thing to note is that
Run
receives a context that will be canceled if any node in the pipeline stops running because of an error.
I believe it would be nice to have this in a comment in the code, it's useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conduit/pkg/pipeline/stream/node.go
Lines 30 to 60 in 1793c51
// Run first verifies if the Node is set up correctly and either returns a | |
// descriptive error or starts processing messages. Processing should stop | |
// as soon as the supplied context is done. If an error occurs while | |
// processing messages, the processing should stop and the error should be | |
// returned. If processing stopped because the context was canceled, the | |
// function should return ctx.Err(). | |
// Run has different responsibilities, depending on the node type: | |
// * PubNode has to start producing new messages into the outgoing channel. | |
// The context supplied to Run has to be attached to all messages. Each | |
// message will be either acked or nacked by downstream nodes, it's the | |
// responsibility of PubNode to handle these acks/nacks if applicable. | |
// The outgoing channel has to be closed when Run returns, regardless of | |
// the return value. | |
// * SubNode has to start listening to messages sent to the incoming | |
// channel. It has to use the context supplied in the message for calls | |
// to other functions (imagine the message context as a request context). | |
// It is the responsibility of SubNode to ack or nack a message if it's | |
// processed correctly or with an error. If the incoming channel is | |
// closed, then Run should stop and return nil. | |
// * PubSubNode has to start listening to incoming messages, process them | |
// and forward them to the outgoing channel. The node should not ack/nack | |
// forwarded messages. If a message is dropped and not forwarded to the | |
// outgoing channel (i.e. filters), the message should be acked. If an | |
// error is encountered while processing the message, the message has to | |
// be nacked and Run should return with an error. If the incoming channel | |
// is closed, then Run should stop and return nil. The outgoing channel | |
// has to be closed when Run returns, regardless of the return value. | |
// The incoming message pointers need to be forwarded, as upstream nodes | |
// could be waiting for acks/nacks on that exact pointer. If the node | |
// forwards a new message (not the exact pointer it received), then it | |
// needs to forward any acks/nacks to the original message pointer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I believe would be useful in a comment (be it the one above or wherever it makes sense), is the note that the context will be cancelled if any pipeline node stops running. But it's a nitpick level.: )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a clarification 👍
is.Fail() // expected node to stop running | ||
case <-nodeDone: | ||
// all good | ||
} | ||
|
||
ackHandlerWg.Wait() // all ack handler should be called by now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that, for example, due to a bug in the acker, this wait simply hangs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, that's how we would spot a failing test in this instance. I know, we could add a timeout, it is a hassle though and I feel it would needlessly complicate the test 😕 Maybe if we had those timeout utilities we wanted to add to the SDK, then we wouldn't have to reinvent the wheel every time we need a timeout for a waitgroup and this would be much simpler to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there already an issue for adding the timeout utilities to the SDK? IMHO, we shouldn't allow the test to hang, but I also completely understand that it's a hassle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially the issue for adding the utilities to the SDK ConduitIO/conduit-connector-sdk#26. Although even if we add them to the SDK, we would need to add them to Conduit separately (into the foundation
package), since we shouldn't depend on utilities from the SDK in Conduit.
* Semaphore (#451) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * use cerrors.New * improve benchmarks * fix linter error * add comments * simplify implementation * Source Acker Node (#483) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * don't forward acks after a failed ack/nack * use cerrors * use cerrors.New * use LogOrReplace * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * Remove message status dropped (#487) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * remove message status dropped * document behavior, fanout node return nacked message error * don't forward acks after a failed ack/nack * use cerrors * use cerrors.New * use LogOrReplace * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * Last position handling (#504) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * remove message status dropped * document behavior, fanout node return nacked message error * don't forward acks after a failed ack/nack * use cerrors * update plugin interface * update standalone plugin implementation * update builtin plugin implementation * update connector * update nodes * change plugin semantics, close stream on teardown * refactor stream, reuse it in source and destination * lock stream when stopping * create control message for source stop * forward last position to destination * update connector SDK, fix race condition in source node * make Conduit in charge of closing connector streams * Change plugin semantics around teardown - internal connector entity is now in charge of closing the stream instead of plugin. * Map known gRPC errors to internal type (context.Canceled). * Rewrite DestinationAckerNode to be a regular node staning after DestinationNode, receiving messages and triggering ack receiving. This makes the structure simpler and in line with all other nodes. * Create OpenMessagesTracker to simplify tracking open messages in SourceNode and DestinationNode. * destination acker tests * use cerrors.New * use LogOrReplace * use LogOrReplace * make signal channel buffered * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * make it possible only to inject control messages * improve destination acker caching test * remove TODO comment * update comment
Description
Updates Conduit to respect the last position sent by a connector when
Stop
is called and forwards the last position to the destination when callingStop
. This change touches many parts, from the pipeline nodes, the source and destination connectors (internal entities) and the plugin adapters (gRPC and builtin).The
DestinationAckerNode
was rewritten and simplified as part of this change - it now works like any other node, meaning that it is part of the pipeline and stands afterDestinationNode
. This also means thatDestinationNode
is not the last node in a pipeline anymore, it writes the record to the destination connector and then sends the message to the acker node, which makes sure acks/nacks are handled correctly.Another important semantic change is that
DestinationAckerNode
now stores open messages in a queue and expects to get acknowledgments back in the same order as the order of messages sent to the destination. This essentially means that Conduit no longer requires positions to be unique across all connectors (fixes #517).This change also changes the semantics of how a stream is closed between Conduit and the connector. The closing is now completely in the hands of Conduit since it knows for sure when no more messages will flow through the stream and closes both sides when it is done. For more info see the pipeline semantics doc.
Depends on #487.
Fixes #485
This is the last part that closes #389.
Quick checks: