diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ffd6f583b991..55f94e89d2bf 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -42,8 +42,8 @@ use crate::pg_helpers::*; use crate::spec::*; use crate::spec_apply::ApplySpecPhase::{ CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser, - DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions, - RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, + DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension, + HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase, }; use crate::spec_apply::PerDatabasePhase; use crate::spec_apply::PerDatabasePhase::{ @@ -339,6 +339,15 @@ impl ComputeNode { self.state.lock().unwrap().status } + pub fn get_timeline_id(&self) -> Option { + self.state + .lock() + .unwrap() + .pspec + .as_ref() + .map(|s| s.timeline_id) + } + // Remove `pgdata` directory and create it again with right permissions. fn create_pgdata(&self) -> Result<()> { // Ignore removal error, likely it is a 'No such file or directory (os error 2)'. @@ -928,6 +937,54 @@ impl ComputeNode { .map(|role| (role.name.clone(), role)) .collect::>(); + // Check if we need to drop subscriptions before starting the endpoint. + // + // It is important to do this operation exactly once when endpoint starts on a new branch. + // Otherwise, we may drop not inherited, but newly created subscriptions. + // + // We cannot rely only on spec.drop_subscriptions_before_start flag, + // because if for some reason compute restarts inside VM, + // it will start again with the same spec and flag value. + // + // To handle this, we save the fact of the operation in the database + // in the neon_migration_test.drop_subscriptions_done table. + // If the table does not exist, we assume that the operation was never performed, so we must do it. + // If table exists, we check if the operation was performed on the current timelilne. + // + let mut drop_subscriptions_done = false; + + if spec.drop_subscriptions_before_start { + let timeline_id = self.get_timeline_id().expect("timeline_id must be set"); + let query = format!("select 1 from neon_migration.drop_subscriptions_done where timeline_id = '{}'", timeline_id); + + info!("Checking if drop_subscriptions_done was already performed for timeline_id: {}", timeline_id); + info!("Query: {}", query); + + drop_subscriptions_done = match + client.simple_query(&query).await { + Ok(result) => { + if let postgres::SimpleQueryMessage::Row(row) = &result[0] { + info!("drop_subscriptions_done: {:?}", row); + true + } else { + false + } + }, + Err(e) => + { + if e.to_string().contains("does not exist") { + false + } else { + // We don't expect any other error here, except for the schema/table not existing + // Is it safe to ignore them anyway? + // Worst case - we'll drop the subscriptions, we shouldn't have dropped. + false + } + } + } + }; + + let jwks_roles = Arc::new( spec.as_ref() .local_proxy_config @@ -1069,7 +1126,7 @@ impl ComputeNode { HandleAnonExtension, ]; - if spec.drop_subscriptions_before_start { + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { info!("adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); phases.push(DropLogicalSubscriptions); } @@ -1093,12 +1150,19 @@ impl ComputeNode { handle.await??; } - for phase in vec![ + let mut phases = vec![ HandleOtherExtensions, HandleNeonExtension, CreateAvailabilityCheck, DropRoles, - ] { + ]; + + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(FinalizeDropLogicalSubscriptions); + } + + for phase in phases { debug!("Applying phase {:?}", &phase); apply_operations( spec.clone(), @@ -1110,6 +1174,7 @@ impl ComputeNode { .await?; } + Ok::<(), anyhow::Error>(()) })?; diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 8f6fb05b056c..eef10013e2f4 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -63,6 +63,7 @@ pub enum ApplySpecPhase { HandleNeonExtension, CreateAvailabilityCheck, DropRoles, + FinalizeDropLogicalSubscriptions, } pub struct Operation { @@ -700,5 +701,9 @@ async fn get_operations<'a>( Ok(Box::new(operations)) } + ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation { + query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")), + comment: None, + }))), } } diff --git a/compute_tools/src/sql/finalize_drop_subscriptions.sql b/compute_tools/src/sql/finalize_drop_subscriptions.sql new file mode 100644 index 000000000000..2d2d8711c983 --- /dev/null +++ b/compute_tools/src/sql/finalize_drop_subscriptions.sql @@ -0,0 +1,21 @@ +DO $$ +BEGIN + IF NOT EXISTS( + SELECT 1 + FROM pg_catalog.pg_tables + WHERE tablename = 'drop_subscriptions_done' + AND schemaname = 'neon_migration' + ) + THEN + CREATE SCHEMA IF NOT EXISTS neon_migration; + ALTER SCHEMA neon_migration OWNER TO cloud_admin; + REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC; + + CREATE TABLE neon_migration.drop_subscriptions_done + (timeline_id text); + END IF; + + INSERT INTO neon_migration.drop_subscriptions_done + VALUES (current_setting('neon.timeline_id')); +END +$$ \ No newline at end of file diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index b9c9fd74e068..660bc19daf1c 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -84,6 +84,10 @@ def insert_data(pub, start): res = scur.fetchall() assert res[0][0] == n_records + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + # ensure that there are no subscriptions in this database scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub'") assert len(scur.fetchall()) == 0 @@ -127,6 +131,24 @@ def insert_data(pub, start): assert res[0][0] == n_records # ensure that new publication works as expected after compute restart + # first restart with drop_subscriptions_before_start=True + # to emulate the case when compute restarts within the VM with stale spec + sub_child_1.stop() + sub_child_1.respec( + skip_pg_catalog_updates=False, + drop_subscriptions_before_start=True, + ) + sub_child_1.start() + + with sub_child_1.cursor() as scur: + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + + # ensure that even though the flag is set, we didn't drop new subscription + scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") + assert len(scur.fetchall()) == 1 + sub_child_1.stop() sub_child_1.respec( skip_pg_catalog_updates=False, @@ -138,6 +160,10 @@ def insert_data(pub, start): insert_data(pub, n_records) n_records += n_records with sub_child_1.cursor() as scur: + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_id = {res}") + # ensure that there is a subscriptions in this database scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'") assert len(scur.fetchall()) == 1 @@ -155,3 +181,29 @@ def insert_data(pub, start): scur.execute("SELECT count(*) FROM t") res = scur.fetchall() assert res[0][0] == n_records + + # test that we can create a branch of a branch + env.create_branch( + "subscriber_child_2", + ancestor_branch_name="subscriber_child_1", + ) + sub_child_2 = env.endpoints.create("subscriber_child_2") + # Pass drop_subscriptions_before_start flag + sub_child_2.respec( + skip_pg_catalog_updates=False, + drop_subscriptions_before_start=True, + ) + sub_child_2.start() + + # ensure that subscriber_child_2 does not inherit subscription from child_1 + with sub_child_2.cursor() as scur: + # ensure that drop_subscriptions_done happened on this timeline as well + scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done") + res = scur.fetchall() + log.info(f"timeline_ids: {res}") + assert len(res) == 2 + + # ensure that there are no subscriptions in this database + scur.execute("SELECT count(*) FROM pg_catalog.pg_subscription") + res = scur.fetchall() + assert res[0][0] == 0