Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last position handling #504

Merged
merged 62 commits into from
Jul 26, 2022
Merged

Last position handling #504

merged 62 commits into from
Jul 26, 2022

Conversation

lovromazgon
Copy link
Member

@lovromazgon lovromazgon commented Jul 4, 2022

Description

Updates Conduit to respect the last position sent by a connector when Stop is called and forwards the last position to the destination when calling Stop. This change touches many parts, from the pipeline nodes, the source and destination connectors (internal entities) and the plugin adapters (gRPC and builtin).

The DestinationAckerNode was rewritten and simplified as part of this change - it now works like any other node, meaning that it is part of the pipeline and stands after DestinationNode. This also means that DestinationNode is not the last node in a pipeline anymore, it writes the record to the destination connector and then sends the message to the acker node, which makes sure acks/nacks are handled correctly.

Another important semantic change is that DestinationAckerNode now stores open messages in a queue and expects to get acknowledgments back in the same order as the order of messages sent to the destination. This essentially means that Conduit no longer requires positions to be unique across all connectors (fixes #517).

This change also changes the semantics of how a stream is closed between Conduit and the connector. The closing is now completely in the hands of Conduit since it knows for sure when no more messages will flow through the stream and closes both sides when it is done. For more info see the pipeline semantics doc.

Depends on #487.

Fixes #485
This is the last part that closes #389.

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed. (hard to do with so many interdependent parts)

Base automatically changed from lovro/remove-message-dropped to lovro/stability July 13, 2022 16:10
@lovromazgon lovromazgon force-pushed the lovro/last-position branch from f347b85 to 8ad3b02 Compare July 22, 2022 14:14
Copy link
Contributor

@hariso hariso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm half way through the PR, but I'll post the questions I have so far.: )

pkg/pipeline/stream/base.go Outdated Show resolved Hide resolved
pkg/pipeline/stream/base.go Outdated Show resolved Hide resolved
}
}

openMsgTracker.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we're going to get into this deferred function, only if there was an error writing a record. Is it possible that this waits indefinitely, because, for example, the destination is stuck?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, the deferred function will be called regardless of the error that's returned (in other words - it's called always when the node stops). Now, is it possible that it waits indefinitely? I think this would be possible, yes, if the connector just starts hanging (i.e. it's still running, looks alive, not really broken, but just hangs). On the other hand, if the connector stops because of an error or panic, then DestinationAckerNode will nack all messages that didn't get acked/nacked by the connector, thus unblocking this line.

If we wanted to fix the possibility of the pipeline getting stuck we would need to provide an option to "force stop" the pipeline (i.e. we wouldn't care what the connector is doing and just kill it). This would work for standalone plugins since we can kill the subprocess, while there's no way for us to kill built-in plugins (not that I know of at least), best we could do is to stop the nodes and "detach" from the plugin goroutines (leave them running in the background).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to fix the possibility of the pipeline getting stuck

IMHO, that would be a good thing to do, not necessarily in this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should do that, though this is probably a bit lower on the priority list.

// openMsgTracker tracks open messages until they are acked or nacked
var openMsgTracker OpenMessagesTracker
defer func() {
stopErr := n.Destination.Stop(connectorCtx, lastPosition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I got this right: connectorCtx is special only because we can cancel it, because above this we have:

connectorCtx, cancel := context.WithCancel(context.Background())
defer cancel()

The above defer (with cancel()) will happen after this defer (with stop, wait for msgs, teardown). So IIUC, the usage of connectorCtx here is basically ineffective?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got everything right, except the conclusion 😄 First thing to note is that Run receives a context that will be canceled if any node in the pipeline stops running because of an error. Second thing - when we call n.Destination.Open we pass it a context, if that context gets canceled the stream will get closed immediately. Also any calls to the connector need to happen with an open context (e.g. Stop and Teardown), otherwise it's considered that the call was canceled (I expect gRPC wouldn't even deliver a call with a closed context).

So essentially we create a new context connectorCtx to be sure it will stay open until the node actually stops running, so we can gracefully stop the connector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification!

First thing to note is that Run receives a context that will be canceled if any node in the pipeline stops running because of an error.

I believe it would be nice to have this in a comment in the code, it's useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Run first verifies if the Node is set up correctly and either returns a
// descriptive error or starts processing messages. Processing should stop
// as soon as the supplied context is done. If an error occurs while
// processing messages, the processing should stop and the error should be
// returned. If processing stopped because the context was canceled, the
// function should return ctx.Err().
// Run has different responsibilities, depending on the node type:
// * PubNode has to start producing new messages into the outgoing channel.
// The context supplied to Run has to be attached to all messages. Each
// message will be either acked or nacked by downstream nodes, it's the
// responsibility of PubNode to handle these acks/nacks if applicable.
// The outgoing channel has to be closed when Run returns, regardless of
// the return value.
// * SubNode has to start listening to messages sent to the incoming
// channel. It has to use the context supplied in the message for calls
// to other functions (imagine the message context as a request context).
// It is the responsibility of SubNode to ack or nack a message if it's
// processed correctly or with an error. If the incoming channel is
// closed, then Run should stop and return nil.
// * PubSubNode has to start listening to incoming messages, process them
// and forward them to the outgoing channel. The node should not ack/nack
// forwarded messages. If a message is dropped and not forwarded to the
// outgoing channel (i.e. filters), the message should be acked. If an
// error is encountered while processing the message, the message has to
// be nacked and Run should return with an error. If the incoming channel
// is closed, then Run should stop and return nil. The outgoing channel
// has to be closed when Run returns, regardless of the return value.
// The incoming message pointers need to be forwarded, as upstream nodes
// could be waiting for acks/nacks on that exact pointer. If the node
// forwards a new message (not the exact pointer it received), then it
// needs to forward any acks/nacks to the original message pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I believe would be useful in a comment (be it the one above or wherever it makes sense), is the note that the context will be cancelled if any pipeline node stops running. But it's a nitpick level.: )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a clarification 👍

pkg/pipeline/stream/destination_acker.go Show resolved Hide resolved
pkg/pipeline/stream/destination_acker_test.go Show resolved Hide resolved
is.Fail() // expected node to stop running
case <-nodeDone:
// all good
}

ackHandlerWg.Wait() // all ack handler should be called by now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that, for example, due to a bug in the acker, this wait simply hangs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, that's how we would spot a failing test in this instance. I know, we could add a timeout, it is a hassle though and I feel it would needlessly complicate the test 😕 Maybe if we had those timeout utilities we wanted to add to the SDK, then we wouldn't have to reinvent the wheel every time we need a timeout for a waitgroup and this would be much simpler to add.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there already an issue for adding the timeout utilities to the SDK? IMHO, we shouldn't allow the test to hang, but I also completely understand that it's a hassle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially the issue for adding the utilities to the SDK ConduitIO/conduit-connector-sdk#26. Although even if we add them to the SDK, we would need to add them to Conduit separately (into the foundation package), since we shouldn't depend on utilities from the SDK in Conduit.

pkg/pipeline/stream/source.go Show resolved Hide resolved
pkg/pipeline/stream/base.go Outdated Show resolved Hide resolved
pkg/plugin/acceptance_testing.go Show resolved Hide resolved
@lovromazgon lovromazgon merged commit 44d5f00 into lovro/stability Jul 26, 2022
@lovromazgon lovromazgon deleted the lovro/last-position branch July 26, 2022 15:44
lovromazgon added a commit that referenced this pull request Jul 26, 2022
* Semaphore (#451)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* use cerrors.New

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* Source Acker Node (#483)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Remove message status dropped (#487)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Last position handling (#504)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* update plugin interface

* update standalone plugin implementation

* update builtin plugin implementation

* update connector

* update nodes

* change plugin semantics, close stream on teardown

* refactor stream, reuse it in source and destination

* lock stream when stopping

* create control message for source stop

* forward last position to destination

* update connector SDK, fix race condition in source node

* make Conduit in charge of closing connector streams

* Change plugin semantics around teardown - internal connector entity is
  now in charge of closing the stream instead of plugin.
* Map known gRPC errors to internal type (context.Canceled).
* Rewrite DestinationAckerNode to be a regular node staning after
  DestinationNode, receiving messages and triggering ack receiving. This
  makes the structure simpler and in line with all other nodes.
* Create OpenMessagesTracker to simplify tracking open messages in
  SourceNode and DestinationNode.

* destination acker tests

* use cerrors.New

* use LogOrReplace

* use LogOrReplace

* make signal channel buffered

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* make it possible only to inject control messages

* improve destination acker caching test

* remove TODO comment

* update comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants