Skip to content

Commit

Permalink
Merge branch 'main' into erik/assert-upload-index
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker authored Jan 3, 2025
2 parents 3923371 + e9d30ed commit c8d7226
Show file tree
Hide file tree
Showing 37 changed files with 504 additions and 103 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 3 additions & 17 deletions compute/compute-node.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \

#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` and `fast_import` binaries
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
Expand All @@ -1295,7 +1295,7 @@ ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && mold -run cargo build --locked --profile release-line-debug-size-lto
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy

#########################################################################################
#
Expand Down Expand Up @@ -1338,20 +1338,6 @@ RUN set -e \
&& make -j $(nproc) dist_man_MANS= \
&& make install dist_man_MANS=

#########################################################################################
#
# Compile the Neon-specific `local_proxy` binary
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS local_proxy
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG

USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin local_proxy

#########################################################################################
#
# Layers "postgres-exporter" and "sql-exporter"
Expand Down Expand Up @@ -1491,7 +1477,7 @@ COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini

# local_proxy and its config
COPY --from=local_proxy --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy
RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy

# Metrics exporter binaries and configuration files
Expand Down
3 changes: 2 additions & 1 deletion compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
testing = ["fail/failpoints"]

[dependencies]
base64.workspace = true
Expand All @@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
Expand Down
24 changes: 20 additions & 4 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;

// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";

fn main() -> Result<()> {
let scenario = failpoint_support::init();

let (build_tag, clap_args) = init()?;

// enable core dumping for all child processes
Expand Down Expand Up @@ -100,6 +103,8 @@ fn main() -> Result<()> {

maybe_delay_exit(delay_exit);

scenario.teardown();

deinit_and_exit(wait_pg_result);
}

Expand Down Expand Up @@ -419,9 +424,13 @@ fn start_postgres(
"running compute with features: {:?}",
state.pspec.as_ref().unwrap().spec.features
);
// before we release the mutex, fetch the swap size (if any) for later.
let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
let disk_quota_bytes = state.pspec.as_ref().unwrap().spec.disk_quota_bytes;
// before we release the mutex, fetch some parameters for later.
let &ComputeSpec {
swap_size_bytes,
disk_quota_bytes,
disable_lfc_resizing,
..
} = &state.pspec.as_ref().unwrap().spec;
drop(state);

// Launch remaining service threads
Expand Down Expand Up @@ -526,11 +535,18 @@ fn start_postgres(
// This token is used internally by the monitor to clean up all threads
let token = CancellationToken::new();

// don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
None
} else {
file_cache_connstr.cloned()
};

let vm_monitor = rt.as_ref().map(|rt| {
rt.spawn(vm_monitor::start(
Box::leak(Box::new(vm_monitor::Args {
cgroup: cgroup.cloned(),
pgconnstr: file_cache_connstr.cloned(),
pgconnstr,
addr: vm_monitor_addr.clone(),
})),
token.clone(),
Expand Down
15 changes: 13 additions & 2 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");

let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});

Ok::<(), anyhow::Error>(())
Expand Down
15 changes: 15 additions & 0 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::failpoint_support::failpoints_handler;
use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
Expand Down Expand Up @@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
match failpoints_handler(req, CancellationToken::new()).await {
Ok(r) => r,
Err(ApiError::BadRequest(e)) => {
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
}
Err(_) => {
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
Expand Down
70 changes: 51 additions & 19 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::Client;
use tracing::info;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}

impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);

Self { client, migrations }
}

/// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
Expand All @@ -25,37 +29,61 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}

/// Update the neon_migration.migration_id value
///
/// This function has a fail point called compute-migration, which can be
/// used if you would like to fail the application of a series of migrations
/// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});

false
})();

if fail {
return Err(anyhow::anyhow!(format!(
"migration {} was configured to fail because of a failpoint",
migration_id
)));
}
}

self.client
.simple_query(&setval)
.query(
"UPDATE neon_migration.migration_id SET id = $1",
&[&migration_id],
)
.context("run_migrations update id")?;

Ok(())
}

fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;

let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;

let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;

let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;

let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;
/// Prepare the migrations the target database for handling migrations
fn prepare_database(&mut self) -> Result<()> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
self.client.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
)?;
self.client
.simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
self.client
.simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;

Ok(())
}

/// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
self.prepare_database()?;

let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
Expand All @@ -69,6 +97,11 @@ impl<'m> MigrationRunner<'m> {

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
Expand All @@ -87,7 +120,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;

// Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;

self.client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;
END $$;
25 changes: 25 additions & 0 deletions compute_tools/src/migrations/tests/0002-alter_roles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DO $$
DECLARE
role record;
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
END IF;
END LOOP;

FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;

IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;
Loading

0 comments on commit c8d7226

Please sign in to comment.