Skip to content

Commit

Permalink
Ensure that we drop subscriptions exactly once
Browse files Browse the repository at this point in the history
when endpoint starts on a new branch.

It is essential, because itherwise, we may drop
not only inherited, but newly created subscriptions.

We cannot rely only on spec.drop_subscriptions_before_start flag,
because if for some reason compute restarts inside VM,
it will start again with the same spec and flag value.

To handle this, we save the fact of the operation in the database
in the neon_migration_test.drop_subscriptions_done table.
If the table does not exist, we assume that the operation was never performed, so we must do it.
If table exists, we check if the operation was performed on the current timelilne.

Also adjust the test test_subscriber_branching.py to cover this case.
  • Loading branch information
lubennikovaav committed Jan 15, 2025
1 parent b7b80c3 commit 8e200c1
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 5 deletions.
75 changes: 70 additions & 5 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::pg_helpers::*;
use crate::spec::*;
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSuperUser,
DropInvalidDatabases, DropRoles, HandleNeonExtension, HandleOtherExtensions,
RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, HandleNeonExtension,
HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase;
use crate::spec_apply::PerDatabasePhase::{
Expand Down Expand Up @@ -339,6 +339,15 @@ impl ComputeNode {
self.state.lock().unwrap().status
}

pub fn get_timeline_id(&self) -> Option<TimelineId> {
self.state
.lock()
.unwrap()
.pspec
.as_ref()
.map(|s| s.timeline_id)
}

// Remove `pgdata` directory and create it again with right permissions.
fn create_pgdata(&self) -> Result<()> {
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
Expand Down Expand Up @@ -928,6 +937,54 @@ impl ComputeNode {
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();

// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon_migration_test.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;

if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().expect("timeline_id must be set");
let query = format!("select 1 from neon_migration.drop_subscriptions_done where timeline_id = '{}'", timeline_id);

info!("Checking if drop_subscriptions_done was already performed for timeline_id: {}", timeline_id);
info!("Query: {}", query);

drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
info!("drop_subscriptions_done: {:?}", row);
true
} else {
false
}
},
Err(e) =>
{
if e.to_string().contains("does not exist") {
false
} else {
// We don't expect any other error here, except for the schema/table not existing
// Is it safe to ignore them anyway?
// Worst case - we'll drop the subscriptions, we shouldn't have dropped.
false
}
}
}
};


let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
Expand Down Expand Up @@ -1069,7 +1126,7 @@ impl ComputeNode {
HandleAnonExtension,
];

if spec.drop_subscriptions_before_start {
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
Expand All @@ -1093,12 +1150,19 @@ impl ComputeNode {
handle.await??;
}

for phase in vec![
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension,
CreateAvailabilityCheck,
DropRoles,
] {
];

if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}

for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
Expand All @@ -1110,6 +1174,7 @@ impl ComputeNode {
.await?;
}


Ok::<(), anyhow::Error>(())
})?;

Expand Down
5 changes: 5 additions & 0 deletions compute_tools/src/spec_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum ApplySpecPhase {
HandleNeonExtension,
CreateAvailabilityCheck,
DropRoles,
FinalizeDropLogicalSubscriptions,
}

pub struct Operation {
Expand Down Expand Up @@ -700,5 +701,9 @@ async fn get_operations<'a>(

Ok(Box::new(operations))
}
ApplySpecPhase::FinalizeDropLogicalSubscriptions => Ok(Box::new(once(Operation {
query: String::from(include_str!("sql/finalize_drop_subscriptions.sql")),
comment: None,
}))),
}
}
21 changes: 21 additions & 0 deletions compute_tools/src/sql/finalize_drop_subscriptions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
DO $$
BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename = 'drop_subscriptions_done'
AND schemaname = 'neon_migration'
)
THEN
CREATE SCHEMA IF NOT EXISTS neon_migration;
ALTER SCHEMA neon_migration OWNER TO cloud_admin;
REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC;

CREATE TABLE neon_migration.drop_subscriptions_done
(timeline_id text);
END IF;

INSERT INTO neon_migration.drop_subscriptions_done
VALUES (current_setting('neon.timeline_id'));
END
$$
52 changes: 52 additions & 0 deletions test_runner/regress/test_subscriber_branching.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def insert_data(pub, start):
res = scur.fetchall()
assert res[0][0] == n_records

scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done")
res = scur.fetchall()
log.info(f"timeline_id = {res}")

# ensure that there are no subscriptions in this database
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub'")
assert len(scur.fetchall()) == 0
Expand Down Expand Up @@ -127,6 +131,24 @@ def insert_data(pub, start):
assert res[0][0] == n_records

# ensure that new publication works as expected after compute restart
# first restart with drop_subscriptions_before_start=True
# to emulate the case when compute restarts within the VM with stale spec
sub_child_1.stop()
sub_child_1.respec(
skip_pg_catalog_updates=False,
drop_subscriptions_before_start=True,
)
sub_child_1.start()

with sub_child_1.cursor() as scur:
scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done")
res = scur.fetchall()
log.info(f"timeline_id = {res}")

# ensure that even though the flag is set, we didn't drop new subscription
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'")
assert len(scur.fetchall()) == 1

sub_child_1.stop()
sub_child_1.respec(
skip_pg_catalog_updates=False,
Expand All @@ -138,6 +160,10 @@ def insert_data(pub, start):
insert_data(pub, n_records)
n_records += n_records
with sub_child_1.cursor() as scur:
scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done")
res = scur.fetchall()
log.info(f"timeline_id = {res}")

# ensure that there is a subscriptions in this database
scur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub_new'")
assert len(scur.fetchall()) == 1
Expand All @@ -155,3 +181,29 @@ def insert_data(pub, start):
scur.execute("SELECT count(*) FROM t")
res = scur.fetchall()
assert res[0][0] == n_records

# test that we can create a branch of a branch
env.create_branch(
"subscriber_child_2",
ancestor_branch_name="subscriber_child_1",
)
sub_child_2 = env.endpoints.create("subscriber_child_2")
# Pass drop_subscriptions_before_start flag
sub_child_2.respec(
skip_pg_catalog_updates=False,
drop_subscriptions_before_start=True,
)
sub_child_2.start()

# ensure that subscriber_child_2 does not inherit subscription from child_1
with sub_child_2.cursor() as scur:
# ensure that drop_subscriptions_done happened on this timeline as well
scur.execute("SELECT timeline_id from neon_migration.drop_subscriptions_done")
res = scur.fetchall()
log.info(f"timeline_ids: {res}")
assert len(res) == 2

# ensure that there are no subscriptions in this database
scur.execute("SELECT count(*) FROM pg_catalog.pg_subscription")
res = scur.fetchall()
assert res[0][0] == 0

0 comments on commit 8e200c1

Please sign in to comment.