From 8e200c1f4c47dac3fa2ddee94f72d3cc5a85b050 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 15 Jan 2025 19:28:42 +0000 Subject: [PATCH] Ensure that we drop subscriptions exactly once when endpoint starts on a new branch. It is essential, because itherwise, we may drop not only inherited, but newly created subscriptions. We cannot rely only on spec.drop_subscriptions_before_start flag, because if for some reason compute restarts inside VM, it will start again with the same spec and flag value. To handle this, we save the fact of the operation in the database in the neon_migration_test.drop_subscriptions_done table. If the table does not exist, we assume that the operation was never performed, so we must do it. If table exists, we check if the operation was performed on the current timelilne. Also adjust the test test_subscriber_branching.py to cover this case. --- compute_tools/src/compute.rs | 75 +++++++++++++++++-- compute_tools/src/spec_apply.rs | 5 ++ .../src/sql/finalize_drop_subscriptions.sql | 21 ++++++ .../regress/test_subscriber_branching.py | 52 +++++++++++++ 4 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 compute_tools/src/sql/finalize_drop_subscriptions.sql diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ffd6f583b991..55f94e89d2bf 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -42,8 +42,8 @@ use crate::pg_helpers::*; use crate::spec::*; use crate::spec_apply::ApplySpecPhase::{ CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser, - DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, - RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, + DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension, + HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ @@ -339,6 +339,15 @@ impl ComputeNode { self.state.lock().unwrap().status } + pub fn get_timeline_id(&self) -> Option { + self.state + .lock() + .unwrap() + .pspec + .as_ref() + .map(|s| s.timeline_id) + } + // Remove `pgdata` directory and create it again with right permissions. fn create_pgdata(&self) -> Result<()> { // Ignore removal error, likely it is a 'No such file or directory (os error 2)'. @@ -928,6 +937,54 @@ impl ComputeNode { .map(|role| (role.name.clone(), role)) .collect::>(); + // Check if we need to drop subscriptions before starting the endpoint. + // + // It is important to do this operation exactly once when endpoint starts on a new branch. + // Otherwise, we may drop not inherited, but newly created subscriptions. + // + // We cannot rely only on spec.drop_subscriptions_before_start flag, + // because if for some reason compute restarts inside VM, + // it will start again with the same spec and flag value. + // + // To handle this, we save the fact of the operation in the database + // in the neon_migration_test.drop_subscriptions_done table. + // If the table does not exist, we assume that the operation was never performed, so we must do it. + // If table exists, we check if the operation was performed on the current timelilne. + // + let mut drop_subscriptions_done = false; + + if spec.drop_subscriptions_before_start { + let timeline_id = self.get_timeline_id().expect("timeline_id must be set"); + let query = format!("select 1 from neon_migration.drop_subscriptions_done where timeline_id = '{}'", timeline_id); + + info!("Checking if drop_subscriptions_done was already performed for timeline_id: {}", timeline_id); + info!("Query: {}", query); + + drop_subscriptions_done = match + client.simple_query(&query).await { + Ok(result) => { + if let postgres::SimpleQueryMessage::Row(row) = &result[0] { + info!("drop_subscriptions_done: {:?}", row); + true + } else { + false + } + }, + Err(e) => + { + if e.to_string().contains("does not exist") { + false + } else { + // We don't expect any other error here, except for the schema/table not existing + // Is it safe to ignore them anyway? + // Worst case - we'll drop the subscriptions, we shouldn't have dropped. + false + } + } + } + }; + + let jwks_roles = Arc::new( spec.as_ref() .local_proxy_config @@ -1069,7 +1126,7 @@ impl ComputeNode { HandleAnonExtension, ]; - if spec.drop_subscriptions_before_start { + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { info!("adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); phases.push(DropLogicalSubscriptions); } @@ -1093,12 +1150,19 @@ impl ComputeNode { handle.await??; } - for phase in vec![ + let mut phases = vec![ HandleOtherExtensions, HandleNeonExtension, CreateAvailabilityCheck, DropRoles, - ] { + ]; + + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(FinalizeDropLogicalSubscriptions); + } + + for phase in phases { debug!("Applying phase {:?}", &phase); apply_operations( spec.clone(), @@ -1110,6 +1174,7 @@ impl ComputeNode { .await?; } + Ok::<(), anyhow::Error>(()) })?; diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 8f6fb05b056c..eef10013e2f4 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -63,6 +63,7 @@ pub enum ApplySpecPhase { HandleNeonExtension, CreateAvailabilityCheck, DropRoles, + FinalizeDropLogicalSubscriptions, } pub struct Operation { @@ -700,5 +701,9 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), + comment: None, + }))), } } diff --git a/compute_tools/src/sql/finalize_drop_subscriptions.sql b/compute_tools/src/sql/finalize_drop_subscriptions.sql new file mode 100644 index 000000000000..2d2d8711c983 --- /dev/null +++ b/compute_tools/src/sql/finalize_drop_subscriptions.sql @@ -0,0 +1,21 @@ +DO $$ +BEGIN + IF NOT EXISTS( + SELECT 1 + FROM pg_catalog.pg_tables + WHERE tablename = 'drop_subscriptions_done' + AND schemaname = 'neon_migration' + ) + THEN + CREATE SCHEMA IF NOT EXISTS neon_migration; + ALTER SCHEMA neon_migration OWNER TO cloud_admin; + REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC; + + CREATE TABLE neon_migration.drop_subscriptions_done + (timeline_id text); + END IF; + + INSERT INTO neon_migration.drop_subscriptions_done + VALUES (current_setting('neon.timeline_id')); +END +$$ \ No newline at end of file diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index b9c9fd74e068..660bc19daf1c 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -84,6 +84,10 @@ def insert_data(pub, start): res = scur.fetchall() assert res[0][0] == n_records + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + # ensure that there are no subscriptions in this database scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub'") assert len(scur.fetchall()) == 0 @@ -127,6 +131,24 @@ def insert_data(pub, start): assert res[0][0] == n_records # ensure that new publication works as expected after compute restart + # first restart with drop_subscriptions_before_start=True + # to emulate the case when compute restarts within the VM with stale spec + sub_child_1.stop() + sub_child_1.respec( + skip_pg_catalog_updates=False, + drop_subscriptions_before_start=True, + ) + sub_child_1.start() + + with sub_child_1.cursor() as scur: + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + + # ensure that even though the flag is set, we didn't drop new subscription + scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") + assert len(scur.fetchall()) == 1 + sub_child_1.stop() sub_child_1.respec( skip_pg_catalog_updates=False, @@ -138,6 +160,10 @@ def insert_data(pub, start): insert_data(pub, n_records) n_records += n_records with sub_child_1.cursor() as scur: + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + # ensure that there is a subscriptions in this database scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") assert len(scur.fetchall()) == 1 @@ -155,3 +181,29 @@ def insert_data(pub, start): scur.execute("SELECT count(*) FROM t") res = scur.fetchall() assert res[0][0] == n_records + + # test that we can create a branch of a branch + env.create_branch( + "subscriber_child_2", + ancestor_branch_name="subscriber_child_1", + ) + sub_child_2 = env.endpoints.create("subscriber_child_2") + # Pass drop_subscriptions_before_start flag + sub_child_2.respec( + skip_pg_catalog_updates=False, + drop_subscriptions_before_start=True, + ) + sub_child_2.start() + + # ensure that subscriber_child_2 does not inherit subscription from child_1 + with sub_child_2.cursor() as scur: + # ensure that drop_subscriptions_done happened on this timeline as well + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_ids: {res}") + assert len(res) == 2 + + # ensure that there are no subscriptions in this database + scur.execute("SELECT count(*) FROM pg_catalog.pg_subscription") + res = scur.fetchall() + assert res[0][0] == 0