diff --git a/Cargo.lock b/Cargo.lock
index d9ac167042ad..9e0e34399669 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1274,6 +1274,7 @@ dependencies = [
"chrono",
"clap",
"compute_api",
+ "fail",
"flate2",
"futures",
"hyper 0.14.30",
@@ -1732,9 +1733,9 @@ checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c"
[[package]]
name = "diesel"
-version = "2.2.3"
+version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "65e13bab2796f412722112327f3e575601a3e9cdcbe426f0d30dbf43f3f5dc71"
+checksum = "ccf1bedf64cdb9643204a36dd15b19a6ce8e7aa7f7b105868e9f1fad5ffa7d12"
dependencies = [
"bitflags 2.4.1",
"byteorder",
@@ -4493,9 +4494,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
-version = "0.4.8"
+version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
+checksum = "f6cc05d7ea95200187117196eee9edd0644424911821aeb28a18ce60ea0b8793"
dependencies = [
"vcpkg",
]
diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile
index 5e7b4e8287ce..06aaf9e7f423 100644
--- a/compute/compute-node.Dockerfile
+++ b/compute/compute-node.Dockerfile
@@ -1285,7 +1285,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
#
-# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries
+# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
@@ -1295,7 +1295,7 @@ ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
-RUN cd compute_tools && mold -run cargo build --locked --profile release-line-debug-size-lto
+RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
#########################################################################################
#
@@ -1338,20 +1338,6 @@ RUN set -e \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=
-#########################################################################################
-#
-# Compile the Neon-specific `local_proxy` binary
-#
-#########################################################################################
-FROM $REPOSITORY/$IMAGE:$TAG AS local_proxy
-ARG BUILD_TAG
-ENV BUILD_TAG=$BUILD_TAG
-
-USER nonroot
-# Copy entire project to get Cargo.* files with proper dependencies for the whole project
-COPY --chown=nonroot . .
-RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin local_proxy
-
#########################################################################################
#
# Layers "postgres-exporter" and "sql-exporter"
@@ -1491,7 +1477,7 @@ COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini
# local_proxy and its config
-COPY --from=local_proxy --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
+COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy
# Metrics exporter binaries and configuration files
diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml
index c0c390caef81..9525b278183f 100644
--- a/compute_tools/Cargo.toml
+++ b/compute_tools/Cargo.toml
@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
-testing = []
+testing = ["fail/failpoints"]
[dependencies]
base64.workspace = true
@@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
+fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs
index bb248734a8cf..26ae25ec2047 100644
--- a/compute_tools/src/bin/compute_ctl.rs
+++ b/compute_tools/src/bin/compute_ctl.rs
@@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
+use utils::failpoint_support;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
+ let scenario = failpoint_support::init();
+
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
@@ -100,6 +103,8 @@ fn main() -> Result<()> {
maybe_delay_exit(delay_exit);
+ scenario.teardown();
+
deinit_and_exit(wait_pg_result);
}
@@ -419,9 +424,13 @@ fn start_postgres(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
- // before we release the mutex, fetch the swap size (if any) for later.
- let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
- let disk_quota_bytes = state.pspec.as_ref().unwrap().spec.disk_quota_bytes;
+ // before we release the mutex, fetch some parameters for later.
+ let &ComputeSpec {
+ swap_size_bytes,
+ disk_quota_bytes,
+ disable_lfc_resizing,
+ ..
+ } = &state.pspec.as_ref().unwrap().spec;
drop(state);
// Launch remaining service threads
@@ -526,11 +535,18 @@ fn start_postgres(
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();
+ // don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
+ let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
+ None
+ } else {
+ file_cache_connstr.cloned()
+ };
+
let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
- pgconnstr: file_cache_connstr.cloned(),
+ pgconnstr,
addr: vm_monitor_addr.clone(),
})),
token.clone(),
diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs
index d72a04f2f979..78f6033429e9 100644
--- a/compute_tools/src/compute.rs
+++ b/compute_tools/src/compute.rs
@@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");
- let mut client = conf.connect(NoTls)?;
- handle_migrations(&mut client).context("apply_config handle_migrations")
+ match conf.connect(NoTls) {
+ Ok(mut client) => {
+ if let Err(e) = handle_migrations(&mut client) {
+ error!("Failed to run migrations: {}", e);
+ }
+ }
+ Err(e) => {
+ error!(
+ "Failed to connect to the compute for running migrations: {}",
+ e
+ );
+ }
+ };
});
Ok::<(), anyhow::Error>(())
diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs
index 7fa6426d8f9e..a4b1a63e6d5d 100644
--- a/compute_tools/src/http/api.rs
+++ b/compute_tools/src/http/api.rs
@@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
+use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
+use utils::failpoint_support::failpoints_handler;
+use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
@@ -310,6 +313,18 @@ async fn routes(req: Request
, compute: &Arc) -> Response {
+ match failpoints_handler(req, CancellationToken::new()).await {
+ Ok(r) => r,
+ Err(ApiError::BadRequest(e)) => {
+ render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
+ }
+ Err(_) => {
+ render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
+ }
+ }
+ }
+
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs
index 22ab145edaec..1f3de65806a8 100644
--- a/compute_tools/src/migration.rs
+++ b/compute_tools/src/migration.rs
@@ -1,13 +1,16 @@
use anyhow::{Context, Result};
+use fail::fail_point;
use postgres::Client;
use tracing::info;
+/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
+ /// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);
@@ -15,6 +18,7 @@ impl<'m> MigrationRunner<'m> {
Self { client, migrations }
}
+ /// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
@@ -25,37 +29,61 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}
+ /// Update the neon_migration.migration_id value
+ ///
+ /// This function has a fail point called compute-migration, which can be
+ /// used if you would like to fail the application of a series of migrations
+ /// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
- let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
+ // We use this fail point in order to check that failing in the
+ // middle of applying a series of migrations fails in an expected
+ // manner
+ if cfg!(feature = "testing") {
+ let fail = (|| {
+ fail_point!("compute-migration", |fail_migration_id| {
+ migration_id == fail_migration_id.unwrap().parse::().unwrap()
+ });
+
+ false
+ })();
+
+ if fail {
+ return Err(anyhow::anyhow!(format!(
+ "migration {} was configured to fail because of a failpoint",
+ migration_id
+ )));
+ }
+ }
self.client
- .simple_query(&setval)
+ .query(
+ "UPDATE neon_migration.migration_id SET id = $1",
+ &[&migration_id],
+ )
.context("run_migrations update id")?;
Ok(())
}
- fn prepare_migrations(&mut self) -> Result<()> {
- let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
- self.client.simple_query(query)?;
-
- let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
- self.client.simple_query(query)?;
-
- let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
- self.client.simple_query(query)?;
-
- let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
- self.client.simple_query(query)?;
-
- let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
- self.client.simple_query(query)?;
+ /// Prepare the migrations the target database for handling migrations
+ fn prepare_database(&mut self) -> Result<()> {
+ self.client
+ .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
+ self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
+ self.client.simple_query(
+ "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
+ )?;
+ self.client
+ .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
+ self.client
+ .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
Ok(())
}
+ /// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
- self.prepare_migrations()?;
+ self.prepare_database()?;
let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
@@ -69,6 +97,11 @@ impl<'m> MigrationRunner<'m> {
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));
+
+ // Even though we are skipping the migration, updating the
+ // migration ID should help keep logic easy to understand when
+ // trying to understand the state of a cluster.
+ self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
@@ -87,7 +120,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;
- // Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;
self.client
diff --git a/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql b/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql
new file mode 100644
index 000000000000..0c81cef1c41d
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0001-neon_superuser_bypass_rls.sql
@@ -0,0 +1,9 @@
+DO $$
+DECLARE
+ bypassrls boolean;
+BEGIN
+ SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
+ IF NOT bypassrls THEN
+ RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0002-alter_roles.sql b/compute_tools/src/migrations/tests/0002-alter_roles.sql
new file mode 100644
index 000000000000..433f7b34f7df
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0002-alter_roles.sql
@@ -0,0 +1,25 @@
+DO $$
+DECLARE
+ role record;
+BEGIN
+ FOR role IN
+ SELECT rolname AS name, rolinherit AS inherit
+ FROM pg_roles
+ WHERE pg_has_role(rolname, 'neon_superuser', 'member')
+ LOOP
+ IF NOT role.inherit THEN
+ RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
+ END IF;
+ END LOOP;
+
+ FOR role IN
+ SELECT rolname AS name, rolbypassrls AS bypassrls
+ FROM pg_roles
+ WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
+ AND NOT starts_with(rolname, 'pg_')
+ LOOP
+ IF role.bypassrls THEN
+ RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
+ END IF;
+ END LOOP;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql
new file mode 100644
index 000000000000..b164d61295ec
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0003-grant_pg_create_subscription_to_neon_superuser.sql
@@ -0,0 +1,10 @@
+DO $$
+BEGIN
+ IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
+ RETURN;
+ END IF;
+
+ IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql
new file mode 100644
index 000000000000..acb8dd417d26
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql
@@ -0,0 +1,19 @@
+DO $$
+DECLARE
+ monitor record;
+BEGIN
+ SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
+ admin_option AS admin
+ INTO monitor
+ FROM pg_auth_members
+ WHERE roleid = 'pg_monitor'::regrole
+ AND member = 'pg_monitor'::regrole;
+
+ IF NOT monitor.member THEN
+ RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
+ END IF;
+
+ IF NOT monitor.admin THEN
+ RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql
new file mode 100644
index 000000000000..f99101bd6544
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0005-grant_all_on_tables_to_neon_superuser.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql
new file mode 100644
index 000000000000..f99101bd6544
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0006-grant_all_on_sequences_to_neon_superuser.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql b/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql
new file mode 100644
index 000000000000..f99101bd6544
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0007-grant_all_on_tables_to_neon_superuser_with_grant_option.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql b/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql
new file mode 100644
index 000000000000..f99101bd6544
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0008-grant_all_on_sequences_to_neon_superuser_with_grant_option.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql b/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql
new file mode 100644
index 000000000000..f99101bd6544
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0009-revoke_replication_for_previously_allowed_roles.sql
@@ -0,0 +1,2 @@
+-- This test was never written becuase at the time migration tests were added
+-- the accompanying migration was already skipped.
diff --git a/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql
new file mode 100644
index 000000000000..af7f50e95d18
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0010-grant_snapshot_synchronization_funcs_to_neon_superuser.sql
@@ -0,0 +1,13 @@
+DO $$
+DECLARE
+ can_execute boolean;
+BEGIN
+ SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
+ INTO can_execute
+ FROM pg_proc
+ WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
+ AND pronamespace = 'pg_catalog'::regnamespace;
+ IF NOT can_execute THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
+ END IF;
+END $$;
diff --git a/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql
new file mode 100644
index 000000000000..e55dcdc3b60e
--- /dev/null
+++ b/compute_tools/src/migrations/tests/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql
@@ -0,0 +1,13 @@
+DO $$
+DECLARE
+ can_execute boolean;
+BEGIN
+ SELECT has_function_privilege('neon_superuser', oid, 'execute')
+ INTO can_execute
+ FROM pg_proc
+ WHERE proname = 'pg_show_replication_origin_status'
+ AND pronamespace = 'pg_catalog'::regnamespace;
+ IF NOT can_execute THEN
+ RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
+ END IF;
+END $$;
diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs
index 5ebf842813f1..5e47ec48117a 100644
--- a/control_plane/src/endpoint.rs
+++ b/control_plane/src/endpoint.rs
@@ -585,6 +585,7 @@ impl Endpoint {
features: self.features.clone(),
swap_size_bytes: None,
disk_quota_bytes: None,
+ disable_lfc_resizing: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs
index 6d9c353cda1b..54d6a1d38f7f 100644
--- a/libs/compute_api/src/spec.rs
+++ b/libs/compute_api/src/spec.rs
@@ -67,6 +67,15 @@ pub struct ComputeSpec {
#[serde(default)]
pub disk_quota_bytes: Option,
+ /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
+ /// the initial size of LFC.
+ ///
+ /// This is intended for use when the LFC size is being overridden from the default but
+ /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
+ /// LFC sizing.
+ #[serde(default)]
+ pub disable_lfc_resizing: Option,
+
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option>,
diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs
index 870684b399c6..701ba2d42cb9 100644
--- a/libs/utils/src/failpoint_support.rs
+++ b/libs/utils/src/failpoint_support.rs
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
-/// Declare a failpoint that can use the `pause` failpoint action.
+/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
@@ -181,7 +181,7 @@ pub async fn failpoints_handler(
) -> Result, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
- "Cannot manage failpoints because storage was compiled without failpoints support"
+ "Cannot manage failpoints because neon was compiled without failpoints support"
)));
}
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 2e4c47c6e40f..90017b25f250 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -2604,9 +2604,15 @@ impl Tenant {
WaitCompletionError::NotInitialized(
e, // If the queue is already stopped, it's a shutdown error.
) if e.is_stopping() => CreateTimelineError::ShuttingDown,
- e => CreateTimelineError::Other(e.into()),
- })
- .context("wait for timeline initial uploads to complete")?;
+ WaitCompletionError::NotInitialized(_) => {
+ // This is a bug: we should never try to wait for uploads before initializing the timeline
+ debug_assert!(false);
+ CreateTimelineError::Other(anyhow::anyhow!("timeline not initialized"))
+ }
+ WaitCompletionError::UploadQueueShutDownOrStopped => {
+ CreateTimelineError::ShuttingDown
+ }
+ })?;
// The creating task is responsible for activating the timeline.
// We do this after `wait_completion()` so that we don't spin up tasks that start
diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs
index 3bfbfb506180..15628a964596 100644
--- a/pageserver/src/tenant/timeline/offload.rs
+++ b/pageserver/src/tenant/timeline/offload.rs
@@ -1,5 +1,7 @@
use std::sync::Arc;
+use pageserver_api::models::TenantState;
+
use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard};
use super::Timeline;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -70,6 +72,15 @@ pub(crate) async fn offload_timeline(
{
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
+ if matches!(
+ tenant.current_state(),
+ TenantState::Stopping { .. } | TenantState::Broken { .. }
+ ) {
+ // Cancel the operation if the tenant is shutting down. Do this while the
+ // timelines_offloaded lock is held to prevent a race with Tenant::shutdown
+ // for defusing the lock
+ return Err(OffloadError::Cancelled);
+ }
offloaded_timelines.insert(
timeline.timeline_id,
Arc::new(
diff --git a/proxy/README.md b/proxy/README.md
index 8d850737bede..4b98342d7275 100644
--- a/proxy/README.md
+++ b/proxy/README.md
@@ -102,23 +102,39 @@ User can pass several optional headers that will affect resulting json.
2. `Neon-Array-Mode: true`. Return postgres rows as arrays instead of objects. That is more compact representation and also helps in some edge
cases where it is hard to use rows represented as objects (e.g. when several fields have the same name).
+## Test proxy locally
-## Using SNI-based routing on localhost
-
-Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:
+Proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so we can use *.localtest.me` which resolves to `127.0.0.1`.
+Let's create self-signed certificate by running:
```sh
openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.localtest.me"
```
-start proxy
-
+Then we need to build proxy with 'testing' feature and run, e.g.:
```sh
-./target/debug/proxy -c server.crt -k server.key
+RUST_LOG=proxy cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://proxy:password@endpoint.localtest.me:5432/postgres' --is-private-access-proxy true -c server.crt -k server.key
```
-and connect to it
+We will also need to have a postgres instance. Assuming that we have setted up docker we can set it up as follows:
+```sh
+docker run \
+ --detach \
+ --name proxy-postgres \
+ --env POSTGRES_PASSWORD=proxy-postgres \
+ --publish 5432:5432 \
+ postgres:17-bookworm
+```
+Next step is setting up auth table and schema as well as creating role (without the JWT table):
```sh
-PGSSLROOTCERT=./server.crt psql 'postgres://my-cluster-42.localtest.me:1234?sslmode=verify-full'
+docker exec -it proxy-postgres psql -U postgres -c "CREATE SCHEMA IF NOT EXISTS neon_control_plane"
+docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
+docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';"
```
+
+Now from client you can start a new session:
+
+```sh
+PGSSLROOTCERT=./server.crt psql "postgresql://proxy:password@endpoint.localtest.me:4432/postgres?sslmode=verify-full"
+```
\ No newline at end of file
diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs
index 80b93b6c4fdb..671305a3005a 100644
--- a/proxy/src/redis/notifications.rs
+++ b/proxy/src/redis/notifications.rs
@@ -40,6 +40,27 @@ pub(crate) enum Notification {
AllowedIpsUpdate {
allowed_ips_update: AllowedIpsUpdate,
},
+ #[serde(
+ rename = "/block_public_or_vpc_access_updated",
+ deserialize_with = "deserialize_json_string"
+ )]
+ BlockPublicOrVpcAccessUpdated {
+ block_public_or_vpc_access_updated: BlockPublicOrVpcAccessUpdated,
+ },
+ #[serde(
+ rename = "/allowed_vpc_endpoints_updated_for_org",
+ deserialize_with = "deserialize_json_string"
+ )]
+ AllowedVpcEndpointsUpdatedForOrg {
+ allowed_vpc_endpoints_updated_for_org: AllowedVpcEndpointsUpdatedForOrg,
+ },
+ #[serde(
+ rename = "/allowed_vpc_endpoints_updated_for_projects",
+ deserialize_with = "deserialize_json_string"
+ )]
+ AllowedVpcEndpointsUpdatedForProjects {
+ allowed_vpc_endpoints_updated_for_projects: AllowedVpcEndpointsUpdatedForProjects,
+ },
#[serde(
rename = "/password_updated",
deserialize_with = "deserialize_json_string"
@@ -52,6 +73,24 @@ pub(crate) enum Notification {
pub(crate) struct AllowedIpsUpdate {
project_id: ProjectIdInt,
}
+
+#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
+pub(crate) struct BlockPublicOrVpcAccessUpdated {
+ project_id: ProjectIdInt,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
+pub(crate) struct AllowedVpcEndpointsUpdatedForOrg {
+ // TODO: change type once the implementation is more fully fledged.
+ // See e.g. https://github.com/neondatabase/neon/pull/10073.
+ account_id: ProjectIdInt,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
+pub(crate) struct AllowedVpcEndpointsUpdatedForProjects {
+ project_ids: Vec,
+}
+
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct PasswordUpdate {
project_id: ProjectIdInt,
@@ -165,7 +204,11 @@ impl MessageHandler {
}
}
}
- Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
+ Notification::AllowedIpsUpdate { .. }
+ | Notification::PasswordUpdate { .. }
+ | Notification::BlockPublicOrVpcAccessUpdated { .. }
+ | Notification::AllowedVpcEndpointsUpdatedForOrg { .. }
+ | Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
invalidate_cache(self.cache.clone(), msg.clone());
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
Metrics::get()
@@ -178,6 +221,8 @@ impl MessageHandler {
.redis_events_count
.inc(RedisEventsCount::PasswordUpdate);
}
+ // TODO: add additional metrics for the other event types.
+
// It might happen that the invalid entry is on the way to be cached.
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
@@ -204,6 +249,15 @@ fn invalidate_cache(cache: Arc, msg: Notification) {
password_update.role_name,
),
Notification::Cancel(_) => unreachable!("cancel message should be handled separately"),
+ Notification::BlockPublicOrVpcAccessUpdated { .. } => {
+ // https://github.com/neondatabase/neon/pull/10073
+ }
+ Notification::AllowedVpcEndpointsUpdatedForOrg { .. } => {
+ // https://github.com/neondatabase/neon/pull/10073
+ }
+ Notification::AllowedVpcEndpointsUpdatedForProjects { .. } => {
+ // https://github.com/neondatabase/neon/pull/10073
+ }
}
}
diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml
index 2f5d266567e7..5f3319512d67 100644
--- a/storage_controller/Cargo.toml
+++ b/storage_controller/Cargo.toml
@@ -43,13 +43,13 @@ scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
-diesel = { version = "2.1.4", features = [
+diesel = { version = "2.2.6", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
-diesel_migrations = { version = "2.1.0" }
+diesel_migrations = { version = "2.2.0" }
r2d2 = { version = "0.8.10" }
utils = { path = "../libs/utils/" }
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index c0c5bc371aed..222cb9fdd409 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -3572,6 +3572,11 @@ impl Service {
.iter()
.any(|i| i.generation.is_none() || i.generation_pageserver.is_none())
{
+ let shard_generations = generations
+ .into_iter()
+ .map(|i| (i.tenant_shard_id, (i.generation, i.generation_pageserver)))
+ .collect::>();
+
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
let locked = self.inner.read().unwrap();
@@ -3582,6 +3587,28 @@ impl Service {
PlacementPolicy::Attached(_) => {
// This shard is meant to be attached: the caller is not wrong to try and
// use this function, but we can't service the request right now.
+ let Some(generation) = shard_generations.get(shard_id) else {
+ // This can only happen if there is a split brain controller modifying the database. This should
+ // never happen when testing, and if it happens in production we can only log the issue.
+ debug_assert!(false);
+ tracing::error!("Shard {shard_id} not found in generation state! Is another rogue controller running?");
+ continue;
+ };
+ let (generation, generation_pageserver) = generation;
+ if let Some(generation) = generation {
+ if generation_pageserver.is_none() {
+ // This is legitimate only in a very narrow window where the shard was only just configured into
+ // Attached mode after being created in Secondary or Detached mode, and it has had its generation
+ // set but not yet had a Reconciler run (reconciler is the only thing that sets generation_pageserver).
+ tracing::warn!("Shard {shard_id} generation is set ({generation:?}) but generation_pageserver is None, reconciler not run yet?");
+ }
+ } else {
+ // This should never happen: a shard with no generation is only permitted when it was created in some state
+ // other than PlacementPolicy::Attached (and generation is always written to DB before setting Attached in memory)
+ debug_assert!(false);
+ tracing::error!("Shard {shard_id} generation is None, but it is in PlacementPolicy::Attached mode!");
+ continue;
+ }
}
PlacementPolicy::Secondary | PlacementPolicy::Detached => {
return Err(ApiError::Conflict(format!(
diff --git a/test_runner/conftest.py b/test_runner/conftest.py
index 887bfef478b8..9e32469d69cc 100644
--- a/test_runner/conftest.py
+++ b/test_runner/conftest.py
@@ -8,6 +8,7 @@
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
+ "fixtures.compute_migrations",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
diff --git a/test_runner/fixtures/compute_migrations.py b/test_runner/fixtures/compute_migrations.py
new file mode 100644
index 000000000000..ea99785af092
--- /dev/null
+++ b/test_runner/fixtures/compute_migrations.py
@@ -0,0 +1,34 @@
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING
+
+import pytest
+
+from fixtures.paths import BASE_DIR
+
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+ from pathlib import Path
+
+COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
+COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"
+
+COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
+NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)
+
+
+@pytest.fixture(scope="session")
+def compute_migrations_dir() -> Iterator[Path]:
+ """
+ Retrieve the path to the compute migrations directory.
+ """
+ yield COMPUTE_MIGRATIONS_DIR
+
+
+@pytest.fixture(scope="session")
+def compute_migrations_test_dir() -> Iterator[Path]:
+ """
+ Retrieve the path to the compute migrations test directory.
+ """
+ yield COMPUTE_MIGRATIONS_TEST_DIR
diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py
index 1cd9158c688c..aa0d95fe802a 100644
--- a/test_runner/fixtures/endpoint/http.py
+++ b/test_runner/fixtures/endpoint/http.py
@@ -55,3 +55,17 @@ def metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text
+
+ def configure_failpoints(self, *args: tuple[str, str]) -> None:
+ body: list[dict[str, str]] = []
+
+ for fp in args:
+ body.append(
+ {
+ "name": fp[0],
+ "action": fp[1],
+ }
+ )
+
+ res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
+ res.raise_for_status()
diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py
index a85a1914553e..adbd6414a7eb 100644
--- a/test_runner/fixtures/neon_cli.py
+++ b/test_runner/fixtures/neon_cli.py
@@ -522,14 +522,15 @@ def endpoint_start(
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
- allow_multiple=False,
+ allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
+ env: dict[str, str] | None = None,
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
"start",
]
- extra_env_vars = {}
+ extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 9f78ad120b91..a0c642163d9d 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -54,6 +54,7 @@
TimelineArchivalState,
TimelineId,
)
+from fixtures.compute_migrations import NUM_COMPUTE_MIGRATIONS
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.h2server import H2Server
from fixtures.log_helper import log
@@ -3855,6 +3856,7 @@ def start(
safekeepers: list[int] | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
+ env: dict[str, str] | None = None,
) -> Self:
"""
Start the Postgres instance.
@@ -3875,6 +3877,7 @@ def start(
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
+ env=env,
)
self._running.release(1)
self.log_config_value("shared_buffers")
@@ -3988,14 +3991,17 @@ def respec_deep(self, **kwargs: Any) -> None:
log.info("Updating compute spec to: %s", json.dumps(data_dict, indent=4))
json.dump(data_dict, file, indent=4)
- # Please note: Migrations only run if pg_skip_catalog_updates is false
- def wait_for_migrations(self, num_migrations: int = 11):
+ def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None:
+ """
+ Wait for all compute migrations to be ran. Remember that migrations only
+ run if "pg_skip_catalog_updates" is set in the compute spec to false.
+ """
with self.cursor() as cur:
def check_migrations_done():
cur.execute("SELECT id FROM neon_migration.migration_id")
migration_id: int = cur.fetchall()[0][0]
- assert migration_id >= num_migrations
+ assert migration_id >= wait_for
wait_until(check_migrations_done)
diff --git a/test_runner/fixtures/paths.py b/test_runner/fixtures/paths.py
index 80777d65e9b0..fc4fb3629bb0 100644
--- a/test_runner/fixtures/paths.py
+++ b/test_runner/fixtures/paths.py
@@ -21,8 +21,8 @@
BASE_DIR = Path(__file__).parents[2]
-COMPUTE_CONFIG_DIR = BASE_DIR / "compute" / "etc"
DEFAULT_OUTPUT_DIR: str = "test_output"
+COMPUTE_CONFIG_DIR = BASE_DIR / "compute" / "etc"
def get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str | None = None) -> Path:
diff --git a/test_runner/regress/test_compute_migrations.py b/test_runner/regress/test_compute_migrations.py
new file mode 100644
index 000000000000..803702a6f8ac
--- /dev/null
+++ b/test_runner/regress/test_compute_migrations.py
@@ -0,0 +1,90 @@
+from __future__ import annotations
+
+from pathlib import Path
+from typing import TYPE_CHECKING, cast
+
+import pytest
+from fixtures.compute_migrations import COMPUTE_MIGRATIONS, NUM_COMPUTE_MIGRATIONS
+
+if TYPE_CHECKING:
+ from fixtures.neon_fixtures import NeonEnv
+
+
+def test_compute_migrations_retry(neon_simple_env: NeonEnv, compute_migrations_dir: Path):
+ """
+ Test that compute_ctl can recover from migration failures next time it
+ starts, and that the persisted migration ID is correct in such cases.
+ """
+ env = neon_simple_env
+
+ endpoint = env.endpoints.create("main")
+ endpoint.respec(skip_pg_catalog_updates=False)
+
+ for i in range(1, NUM_COMPUTE_MIGRATIONS + 1):
+ endpoint.start(env={"FAILPOINTS": f"compute-migration=return({i})"})
+
+ # Make sure that the migrations ran
+ endpoint.wait_for_migrations(wait_for=i - 1)
+
+ # Confirm that we correctly recorded that in the
+ # neon_migration.migration_id table
+ with endpoint.cursor() as cur:
+ cur.execute("SELECT id FROM neon_migration.migration_id")
+ migration_id = cast("int", cur.fetchall()[0][0])
+ assert migration_id == i - 1
+
+ endpoint.stop()
+
+ endpoint.start()
+
+ # Now wait for the rest of the migrations
+ endpoint.wait_for_migrations()
+
+ with endpoint.cursor() as cur:
+ cur.execute("SELECT id FROM neon_migration.migration_id")
+ migration_id = cast("int", cur.fetchall()[0][0])
+ assert migration_id == NUM_COMPUTE_MIGRATIONS
+
+ for i, m in enumerate(COMPUTE_MIGRATIONS, start=1):
+ migration_query = (compute_migrations_dir / m).read_text(encoding="utf-8")
+ if not migration_query.startswith("-- SKIP"):
+ pattern = rf"Skipping migration id={i}"
+ else:
+ pattern = rf"Running migration id={i}"
+
+ endpoint.log_contains(pattern)
+
+
+@pytest.mark.parametrize(
+ "migration",
+ (pytest.param((i, m), id=str(i)) for i, m in enumerate(COMPUTE_MIGRATIONS, start=1)),
+)
+def test_compute_migrations_e2e(
+ neon_simple_env: NeonEnv,
+ compute_migrations_dir: Path,
+ compute_migrations_test_dir: Path,
+ migration: tuple[int, str],
+):
+ """
+ Test that the migrations perform as advertised.
+ """
+ env = neon_simple_env
+
+ migration_id = migration[0]
+ migration_filename = migration[1]
+
+ migration_query = (compute_migrations_dir / migration_filename).read_text(encoding="utf-8")
+ if migration_query.startswith("-- SKIP"):
+ pytest.skip("The migration is marked as SKIP")
+
+ endpoint = env.endpoints.create("main")
+ endpoint.respec(skip_pg_catalog_updates=False)
+
+ # Stop applying migrations after the one we want to test, so that we can
+ # test the state of the cluster at the given migration ID
+ endpoint.start(env={"FAILPOINTS": f"compute-migration=return({migration_id + 1})"})
+
+ endpoint.wait_for_migrations(wait_for=migration_id)
+
+ check_query = (compute_migrations_test_dir / migration_filename).read_text(encoding="utf-8")
+ endpoint.safe_psql(check_query)
diff --git a/test_runner/regress/test_migrations.py b/test_runner/regress/test_migrations.py
deleted file mode 100644
index 7211619a9906..000000000000
--- a/test_runner/regress/test_migrations.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from __future__ import annotations
-
-import time
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from fixtures.neon_fixtures import NeonEnv
-
-
-def test_migrations(neon_simple_env: NeonEnv):
- env = neon_simple_env
-
- endpoint = env.endpoints.create("main")
- endpoint.respec(skip_pg_catalog_updates=False)
- endpoint.start()
-
- num_migrations = 11
- endpoint.wait_for_migrations(num_migrations=num_migrations)
-
- with endpoint.cursor() as cur:
- cur.execute("SELECT id FROM neon_migration.migration_id")
- migration_id = cur.fetchall()
- assert migration_id[0][0] == num_migrations
-
- endpoint.stop()
- endpoint.start()
- # We don't have a good way of knowing that the migrations code path finished executing
- # in compute_ctl in the case that no migrations are being run
- time.sleep(1)
- with endpoint.cursor() as cur:
- cur.execute("SELECT id FROM neon_migration.migration_id")
- migration_id = cur.fetchall()
- assert migration_id[0][0] == num_migrations
diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py
index 198e4f046074..220c42853103 100644
--- a/test_runner/regress/test_storage_scrubber.py
+++ b/test_runner/regress/test_storage_scrubber.py
@@ -266,7 +266,9 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
for shard in shards:
ps = env.get_tenant_pageserver(shard)
assert ps is not None
- ps.http_client().timeline_compact(shard, timeline_id, force_image_layer_creation=True)
+ ps.http_client().timeline_compact(
+ shard, timeline_id, force_image_layer_creation=True, wait_until_uploaded=True
+ )
ps.http_client().timeline_gc(shard, timeline_id, 0)
# We will use a min_age_secs=1 threshold for deletion, let it pass
diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py
index 87579f9e9280..9b3a48add90b 100644
--- a/test_runner/regress/test_timeline_archive.py
+++ b/test_runner/regress/test_timeline_archive.py
@@ -398,6 +398,7 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
# Offloading is off by default at time of writing: remove this line when it's on by default
neon_env_builder.pageserver_config_override = "timeline_offloading = true"
+ neon_env_builder.storage_controller_config = {"heartbeat_interval": "100msec"}
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# We will exercise migrations, so need multiple pageservers