SinkTask - Use RetriableException
to avoid losing messages
#57
+2
−1
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Hello 👋,
I hope this helps.
When the
client.send
fails during theput
operation, as I believe it's asynchronous, it might commit offsets before getting the error, so messages might be lost.As an example, this is what happens when using a wrong SQS URL

It consumes a few messages and then it stops.
According to the docs, returning a
RetriableException
rather thanRuntimeException
forces Kafka Connect to retry the operation, which also ensures that messages are not committed.This change fixes #3
A simple way of testing this is to create a new connector with a wrong SQS URL, the current code starts consuming and committing some offsets until it detects the exception and stops.
I tried this change in a fork and it fails before committing offsets, so no messages are lost.
I'm not sure if there are other ways of losing messages, but at least this improves the initial time when a wrong URL is provided. I'll be testing it more in the coming days as we want to use the connector :)