Skip to content

Commit

Permalink
add new compute_ctl spec flag
Browse files Browse the repository at this point in the history
. If it is set, drop all the subscriptions from the compute node
before it starts. To avoid race on compute start, use new GUC neon.disable_logical_replication_subscribers
to temporarily disable logical replication workers until we drop the subscriptions.
  • Loading branch information
lubennikovaav committed Jan 9, 2025
1 parent aa7a97b commit be663bd
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 128 deletions.
33 changes: 24 additions & 9 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ use crate::spec_apply::ApplySpecPhase::{
};
use crate::spec_apply::PerDatabasePhase;
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropSubscriptionsForDeletedDatabases,
HandleAnonExtension,
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
use crate::sync_sk::{check_if_synced, ping_safekeeper};
Expand Down Expand Up @@ -996,7 +995,7 @@ impl ComputeNode {
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropSubscriptionsForDeletedDatabases].to_vec(),
[DropLogicalSubscriptions].to_vec(),
);

Ok(spawn(fut))
Expand Down Expand Up @@ -1064,19 +1063,25 @@ impl ComputeNode {
}

let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];

if spec.drop_subscriptions_before_start {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}

let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
]
.to_vec(),
phases,
);

Ok(spawn(fut))
Expand Down Expand Up @@ -1463,6 +1468,16 @@ impl ComputeNode {
Ok(())
},
)?;

let postgresql_conf_path = pgdata_path.join("postgresql.conf");
if config::line_in_file(
&postgresql_conf_path,
"neon.disable_logical_replication_subscribers=false",
)? {
info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false");
} else {
info!("postgresql.conf is up-to-date");
}
self.pg_reload_conf()?;
}
self.post_apply_config()?;
Expand Down
7 changes: 7 additions & 0 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ pub fn write_postgres_conf(

writeln!(file, "neon.extension_server_port={}", extension_server_port)?;

if spec.drop_subscriptions_before_start {
writeln!(file, "neon.disable_logical_replication_subscribers=true")?;
} else {
// be explicit about the default value
writeln!(file, "neon.disable_logical_replication_subscribers=false")?;
}

// This is essential to keep this line at the end of the file,
// because it is intended to override any settings above.
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;
Expand Down
6 changes: 3 additions & 3 deletions compute_tools/src/spec_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum PerDatabasePhase {
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
DropSubscriptionsForDeletedDatabases,
DropLogicalSubscriptions,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -331,7 +331,7 @@ async fn get_operations<'a>(
// NB: there could be other db states, which prevent us from dropping
// the database. For example, if db is used by any active subscription
// or replication slot.
// Such cases are handled in the DropSubscriptionsForDeletedDatabases
// Such cases are handled in the DropLogicalSubscriptions
// phase. We do all the cleanup before actually dropping the database.
let drop_db_query: String = format!(
"DROP DATABASE IF EXISTS {} WITH (FORCE)",
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn get_operations<'a>(
}
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
match subphase {
PerDatabasePhase::DropSubscriptionsForDeletedDatabases => {
PerDatabasePhase::DropLogicalSubscriptions => {
match &db {
DB::UserDB(db) => {
let drop_subscription_query: String = format!(
Expand Down
1 change: 1 addition & 0 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.pg_version,
mode,
!args.update_catalog,
false,
)?;
}
EndpointCmd::Start(args) => {
Expand Down
19 changes: 19 additions & 0 deletions control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
}

Expand Down Expand Up @@ -143,6 +144,7 @@ impl ComputeControlPlane {
pg_version: u32,
mode: ComputeMode,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
Expand All @@ -162,6 +164,7 @@ impl ComputeControlPlane {
// with this we basically test a case of waking up an idle compute, where
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
features: vec![],
});

Expand All @@ -177,6 +180,7 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
features: vec![],
})?,
)?;
Expand Down Expand Up @@ -240,6 +244,7 @@ pub struct Endpoint {
// Optimizations
skip_pg_catalog_updates: bool,

drop_subscriptions_before_start: bool,
// Feature flags
features: Vec<ComputeFeature>,
}
Expand Down Expand Up @@ -291,6 +296,7 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
})
}
Expand Down Expand Up @@ -607,6 +613,18 @@ impl Endpoint {
restrict_conn: false,
invalid: false,
}]
}
// This is a hack to enable testing of the drop_subscriptions_before_start feature.
// Why don't we just always set the database vector properly here
// so that the code matched real cplane logic?
else if self.drop_subscriptions_before_start {
vec![Database {
name: PgIdent::from_str("postgres").unwrap(),
owner: PgIdent::from_str("cloud_admin").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
}]
} else {
Vec::new()
},
Expand All @@ -625,6 +643,7 @@ impl Endpoint {
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
Expand Down
7 changes: 7 additions & 0 deletions libs/compute_api/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ pub struct ComputeSpec {
/// enough spare connections for reconfiguration process to succeed.
#[serde(default = "default_reconfigure_concurrency")]
pub reconfigure_concurrency: usize,

/// If set to true, the compute_ctl will drop all subscriptions before starting the
/// compute. This is needed when we start an endpoint on a branch, so that child
/// would not compete with parent branch subscriptions
/// over the same replication content from publisher.
#[serde(default)] // Default false
pub drop_subscriptions_before_start: bool,
}

/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
Expand Down
7 changes: 2 additions & 5 deletions pgxn/neon/logical_replication_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,8 @@ LogicalSlotsMonitorMain(Datum main_arg)
XLogRecPtr cutoff_lsn;

/* In case of a SIGHUP, just reload the configuration. */
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

ProcessConfigFile(PGC_SIGHUP);

/* Get the cutoff LSN */
cutoff_lsn = get_snapshots_cutoff_lsn();
Expand Down
10 changes: 10 additions & 0 deletions pgxn/neon/neon.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "access/xlogrecovery.h"
#endif
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/proc.h"
Expand Down Expand Up @@ -434,6 +435,15 @@ _PG_init(void)

restore_running_xacts_callback = RestoreRunningXactsFromClog;

DefineCustomBoolVariable(
"neon.disable_logical_replication_subscribers",
"Disables incomming logical replication",
NULL,
&disable_logical_replication_subscribers,
false,
PGC_SIGHUP,
0,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"neon.allow_replica_misconfig",
Expand Down
Loading

0 comments on commit be663bd

Please sign in to comment.