-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snowpipe: exactly once semantics #3060
Conversation
e5423ce
to
10a350f
Compare
f4c4952
to
afe38a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one @rockwotj! 🏆 I spotted a few small typos and such, but looks OK otherwise. I didn't try to play with it locally, but please let me know if you'd like me to test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐑 🚀
this is what is required for exactly once. We don't yet use it.
NOTE that since we can't ensure certain messages can go to a specific channel at the moment, this only really works with max_in_flight=1, which is probably fine for postgres, but another commit will support channel_name properly, so one can specify explicitly the mapping from data to channel.
This will help to re-use all this logic when we create the new output that specifies channel names explicitly.
To a seperate function so it can be used between different outputs.
To clarify it, instead of spreading it out all over, this also means the schema migration function can now be a free function
One that is responsible for coordination of schema evolution and other small pieces (like custom mappings). The purpose of this is to allow for another kind of inner output that can allow for a user to specifically set the channel name (instead of using a pool).
I'm not sure if this is 100% correct, but it will work for most cases.
See the examples on what this enables with a Redpanda/Kafka input (but not kafka_franz!).
This seems a bit clearer and has nice duality with the indexed pool
By holding a lock when doing this during WriteBatch, and not having the framework call Connect outside of pipeline creation, just handle it internally.
I think this is what was missing...
We should try not to always run a SQL query everytime we startup for cost reasons. Instead of running a query (which is likely flaky because of identifier normalization anyways), just open the channel lazily and catch the specific error for the table not existing, then create the table and retry.
Support 2 new properties in
snowflake_streaming
:offset_token
: A new property to support exactly once delivery: https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokenschannel_name
: The ability to explicitly assign a batch to a channel. The currentchannel_prefix
option doesn't support explicitly picking a channel, this allows exactly once from Kafka.