Skip to content

Commit

Permalink
storage controller: don't hold detached tenants in memory (#10264)
Browse files Browse the repository at this point in the history
## Problem

Typical deployments of neon have some tenants that stay in use
continuously, and a background churning population of tenants that are
created and then fall idle, and are configured to Detached state.
Currently, this churn of short lived tenants results in an
ever-increasing memory footprint.

Closes: #9712

## Summary of changes

- At startup, filter to only load shards that don't have Detached policy
- In process_result, check if a tenant's shards are all Detached and
observed=={}, and if so drop them from memory
- In tenant_location_conf and other tenant mutators, load the tenants'
shards on-demand if they are not present
  • Loading branch information
jcsp authored Jan 8, 2025
1 parent dc28424 commit 68d8acf
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 28 deletions.
8 changes: 8 additions & 0 deletions storage_controller/src/id_lock_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ where
}
}

pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default().clone();
let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
*guard.guard = Some(operation);
Some(guard)
}

/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
Expand Down
34 changes: 32 additions & 2 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub(crate) enum DatabaseOperation {
TenantGenerations,
ShardGenerations,
ListTenantShards,
LoadTenant,
InsertTenantShards,
UpdateTenantShard,
DeleteTenant,
Expand Down Expand Up @@ -330,11 +331,40 @@ impl Persistence {

/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
///
/// We exclude shards configured to be detached. During startup, if we see any attached locations
/// for such shards, they will automatically be detached as 'orphans'.
pub(crate) async fn load_active_tenant_shards(
&self,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
let query = tenant_shards.filter(
placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
);
let result = query.load::<TenantShardPersistence>(conn)?;

Ok(result)
},
)
.await
}

/// When restoring a previously detached tenant into memory, load it from the database
pub(crate) async fn load_tenant(
&self,
filter_tenant_id: TenantId,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::LoadTenant,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string()));
let result = query.load::<TenantShardPersistence>(conn)?;

Ok(result)
},
)
.await
Expand Down
195 changes: 170 additions & 25 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ enum TenantOperations {
TimelineArchivalConfig,
TimelineDetachAncestor,
TimelineGcBlockUnblock,
DropDetached,
}

#[derive(Clone, strum_macros::Display)]
Expand Down Expand Up @@ -416,8 +417,8 @@ pub struct Service {
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
/// Note that this state logically lives inside ServiceState, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceState. This could be optimized to
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,

Expand Down Expand Up @@ -1165,6 +1166,20 @@ impl Service {
}
}

// If we just finished detaching all shards for a tenant, it might be time to drop it from memory.
if tenant.policy == PlacementPolicy::Detached {
// We may only drop a tenant from memory while holding the exclusive lock on the tenant ID: this protects us
// from concurrent execution wrt a request handler that might expect the tenant to remain in memory for the
// duration of the request.
let guard = self.tenant_op_locks.try_exclusive(
tenant.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
);
if let Some(guard) = guard {
self.maybe_drop_tenant(tenant.tenant_shard_id.tenant_id, &mut locked, &guard);
}
}

// Maybe some other work can proceed now that this job finished.
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
Expand Down Expand Up @@ -1294,7 +1309,7 @@ impl Service {
.set(nodes.len() as i64);

tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.list_tenant_shards().await?;
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
tracing::info!(
"Loaded {} shards from database.",
tenant_shard_persistence.len()
Expand Down Expand Up @@ -1546,8 +1561,14 @@ impl Service {
// the pageserver API (not via this service), we will auto-create any missing tenant
// shards with default state.
let insert = {
let locked = self.inner.write().unwrap();
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
match self
.maybe_load_tenant(attach_req.tenant_shard_id.tenant_id, &_tenant_lock)
.await
{
Ok(_) => false,
Err(ApiError::NotFound(_)) => true,
Err(e) => return Err(e.into()),
}
};

if insert {
Expand Down Expand Up @@ -2439,6 +2460,99 @@ impl Service {
}
}

/// For APIs that might act on tenants with [`PlacementPolicy::Detached`], first check if
/// the tenant is present in memory. If not, load it from the database. If it is found
/// in neither location, return a NotFound error.
///
/// Caller must demonstrate they hold a lock guard, as otherwise two callers might try and load
/// it at the same time, or we might race with [`Self::maybe_drop_tenant`]
async fn maybe_load_tenant(
&self,
tenant_id: TenantId,
_guard: &TracingExclusiveGuard<TenantOperations>,
) -> Result<(), ApiError> {
let present_in_memory = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
.is_some()
};

if present_in_memory {
return Ok(());
}

let tenant_shards = self.persistence.load_tenant(tenant_id).await?;
if tenant_shards.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
));
}

// TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running
// compute, so no benefit to making AZ sticky across detaches.

let mut locked = self.inner.write().unwrap();
tracing::info!(
"Loaded {} shards for tenant {}",
tenant_shards.len(),
tenant_id
);

locked.tenants.extend(tenant_shards.into_iter().map(|p| {
let intent = IntentState::new();
let shard =
TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database");

// Sanity check: when loading on-demand, we should always be loaded something Detached
debug_assert!(shard.policy == PlacementPolicy::Detached);
if shard.policy != PlacementPolicy::Detached {
tracing::error!(
"Tenant shard {} loaded on-demand, but has non-Detached policy {:?}",
shard.tenant_shard_id,
shard.policy
);
}

(shard.tenant_shard_id, shard)
}));

Ok(())
}

/// If all shards for a tenant are detached, and in a fully quiescent state (no observed locations on pageservers),
/// and have no reconciler running, then we can drop the tenant from memory. It will be reloaded on-demand
/// if we are asked to attach it again (see [`Self::maybe_load_tenant`]).
///
/// Caller must demonstrate they hold a lock guard, as otherwise it is unsafe to drop a tenant from
/// memory while some other function might assume it continues to exist while not holding the lock on Self::inner.
fn maybe_drop_tenant(
&self,
tenant_id: TenantId,
locked: &mut std::sync::RwLockWriteGuard<ServiceState>,
_guard: &TracingExclusiveGuard<TenantOperations>,
) {
let mut tenant_shards = locked.tenants.range(TenantShardId::tenant_range(tenant_id));
if tenant_shards.all(|(_id, shard)| {
shard.policy == PlacementPolicy::Detached
&& shard.reconciler.is_none()
&& shard.observed.is_empty()
}) {
let keys = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(id, _)| id)
.copied()
.collect::<Vec<_>>();
for key in keys {
tracing::info!("Dropping detached tenant shard {} from memory", key);
locked.tenants.remove(&key);
}
}
}

/// This API is used by the cloud control plane to migrate unsharded tenants that it created
/// directly with pageservers into this service.
///
Expand All @@ -2465,14 +2579,26 @@ impl Service {
)
.await;

if !tenant_shard_id.is_unsharded() {
let tenant_id = if !tenant_shard_id.is_unsharded() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"This API is for importing single-sharded or unsharded tenants"
)));
}
} else {
tenant_shard_id.tenant_id
};

// In case we are waking up a Detached tenant
match self.maybe_load_tenant(tenant_id, &_tenant_lock).await {
Ok(()) | Err(ApiError::NotFound(_)) => {
// This is a creation or an update
}
Err(e) => {
return Err(e);
}
};

// First check if this is a creation or an update
let create_or_update = self.tenant_location_config_prepare(tenant_shard_id.tenant_id, req);
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);

let mut result = TenantLocationConfigResponse {
shards: Vec::new(),
Expand Down Expand Up @@ -2600,6 +2726,8 @@ impl Service {
let tenant_id = req.tenant_id;
let patch = req.config;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

let base = {
let locked = self.inner.read().unwrap();
let shards = locked
Expand Down Expand Up @@ -2644,19 +2772,7 @@ impl Service {
)
.await;

let tenant_exists = {
let locked = self.inner.read().unwrap();
let mut r = locked
.tenants
.range(TenantShardId::tenant_range(req.tenant_id));
r.next().is_some()
};

if !tenant_exists {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", req.tenant_id).into(),
));
}
self.maybe_load_tenant(req.tenant_id, &_tenant_lock).await?;

self.set_tenant_config_and_reconcile(req.tenant_id, req.config)
.await
Expand Down Expand Up @@ -2949,6 +3065,8 @@ impl Service {
let _tenant_lock =
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
let mut detach_waiters = Vec::new();
Expand Down Expand Up @@ -3068,6 +3186,8 @@ impl Service {
)
.await;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

failpoint_support::sleep_millis_async!("tenant-update-policy-exclusive-lock");

let TenantPolicyRequest {
Expand Down Expand Up @@ -5150,11 +5270,13 @@ impl Service {
)));
}

let mut shards = self.persistence.list_tenant_shards().await?;
shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
persistent_shards
.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));

expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));

if shards != expect_shards {
if persistent_shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
Expand All @@ -5163,7 +5285,7 @@ impl Service {
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&shards)
serde_json::to_string(&persistent_shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
Expand Down Expand Up @@ -6119,6 +6241,10 @@ impl Service {
let mut pending_reconciles = 0;
let mut az_violations = 0;

// If we find any tenants to drop from memory, stash them to offload after
// we're done traversing the map of tenants.
let mut drop_detached_tenants = Vec::new();

let mut reconciles_spawned = 0;
for shard in tenants.values_mut() {
// Accumulate scheduling statistics
Expand Down Expand Up @@ -6152,6 +6278,25 @@ impl Service {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
}

// If this tenant is detached, try dropping it from memory. This is usually done
// proactively in [`Self::process_results`], but we do it here to handle the edge
// case where a reconcile completes while someone else is holding an op lock for the tenant.
if shard.tenant_shard_id.shard_number == ShardNumber(0)
&& shard.policy == PlacementPolicy::Detached
{
if let Some(guard) = self.tenant_op_locks.try_exclusive(
shard.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
) {
drop_detached_tenants.push((shard.tenant_shard_id.tenant_id, guard));
}
}
}

// Process any deferred tenant drops
for (tenant_id, guard) in drop_detached_tenants {
self.maybe_drop_tenant(tenant_id, &mut locked, &guard);
}

metrics::METRICS_REGISTRY
Expand Down
4 changes: 4 additions & 0 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl ObservedState {
locations: HashMap::new(),
}
}

pub(crate) fn is_empty(&self) -> bool {
self.locations.is_empty()
}
}

impl TenantShard {
Expand Down
Loading

1 comment on commit 68d8acf

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7399 tests run: 7024 passed, 1 failed, 374 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (2)

Postgres 17

Postgres 16

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64

Code coverage* (full report)

  • functions: 31.2% (8409 of 26979 functions)
  • lines: 47.9% (66789 of 139357 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
68d8acf at 2025-01-08T20:30:33.459Z :recycle:

Please sign in to comment.