-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: dangling open transaction for cdc-only, feat: add nats connector #81
base: main
Are you sure you want to change the base?
Conversation
feat: add nats connector (for cdc events) For the first part, fix START_REPLICATION cannot run inside a transaction block for CDC only, which is caused by PostgresSource::new opening a transaction, but never closing it for CDC only. This can be remedied by committing that transaction and offering an explicit start_transaction function to be called in cases of not CDC only. For the 2nd part, introduce a NATS connector (WIP, this should probably create or get a stream in Jetstream for persistence and needs some configuration options) so we can propagate out CDC events onto a message broker Refs: supabase#80
@@ -54,6 +55,10 @@ impl<Src: Source, Snk: BatchSink> BatchDataPipeline<Src, Snk> { | |||
copied_tables: &HashSet<TableId>, | |||
) -> Result<(), PipelineError<Src::Error, Snk::Error>> { | |||
let start = Instant::now(); | |||
self.source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix for the transaction, by starting an explicit transaction here instead of relying on the constructor of PostgresSource to open a transaction.
@@ -69,6 +69,7 @@ impl PostgresSource { | |||
let (table_names, publication) = | |||
Self::get_table_names_and_publication(&replication_client, table_names_from).await?; | |||
let table_schemas = replication_client.get_table_schemas(&table_names).await?; | |||
replication_client.commit_txn().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix for #80, by making sure we don't have a dangling transaction going out of the constructor. I'm not sure whether we need the transaction at all here, but erred on the side of caution.
@cawfeecoder thanks for the PR. Could you please break the fix for #80 into a separate PR as it helps in reviewing. |
What kind of change does this PR introduce?
Bug fix and Feature (happy to break these into 2 seperate PRs)
What is the current behavior?
For the fix, #80. The existing behavior leaves a dangling transaction
open when calling PostgresSource::new that it expects BatchDataPipeline::copy_tables to close
What is the new behavior?
BatchDataPipeline::copy_tables explicitly requests it's own transaction and then closes it when completed. This avoids CDC only complaining about the above error since we don't use transactions during CDC only now.
I've also added a WIP NATS connector to publish out changes on a NATS message broker.
Additional context