Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: don't hold detached tenants in memory #10264

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions storage_controller/src/id_lock_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ where
}
}

pub(crate) fn try_exclusive(&self, key: T, operation: I) -> Option<TracingExclusiveGuard<I>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default().clone();
let mut guard = TracingExclusiveGuard::new(entry.try_write_owned().ok()?);
*guard.guard = Some(operation);
Some(guard)
}

/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
Expand Down
34 changes: 32 additions & 2 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub(crate) enum DatabaseOperation {
TenantGenerations,
ShardGenerations,
ListTenantShards,
LoadTenant,
InsertTenantShards,
UpdateTenantShard,
DeleteTenant,
Expand Down Expand Up @@ -330,11 +331,40 @@ impl Persistence {

/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
///
/// We exclude shards configured to be detached. During startup, if we see any attached locations
/// for such shards, they will automatically be detached as 'orphans'.
pub(crate) async fn load_active_tenant_shards(
&self,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
let query = tenant_shards.filter(
placement_policy.ne(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
);
let result = query.load::<TenantShardPersistence>(conn)?;

Ok(result)
},
)
.await
}

/// When restoring a previously detached tenant into memory, load it from the database
pub(crate) async fn load_tenant(
&self,
filter_tenant_id: TenantId,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::LoadTenant,
move |conn| -> DatabaseResult<_> {
let query = tenant_shards.filter(tenant_id.eq(filter_tenant_id.to_string()));
let result = query.load::<TenantShardPersistence>(conn)?;

Ok(result)
},
)
.await
Expand Down
185 changes: 162 additions & 23 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ enum TenantOperations {
TimelineArchivalConfig,
TimelineDetachAncestor,
TimelineGcBlockUnblock,
DropDetached,
}

#[derive(Clone, strum_macros::Display)]
Expand Down Expand Up @@ -415,8 +416,8 @@ pub struct Service {
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
/// Note that this state logically lives inside ServiceState, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceState. This could be optimized to
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,

Expand Down Expand Up @@ -1162,6 +1163,20 @@ impl Service {
}
}

// If we just finished detaching all shards for a tenant, it might be time to drop it from memory.
if tenant.policy == PlacementPolicy::Detached {
// We may only drop a tenant from memory while holding the exclusive lock on the tenant ID: this protects us
// from concurrent execution wrt a request handler that might expect the tenant to remain in memory for the
// duration of the request.
let guard = self.tenant_op_locks.try_exclusive(
tenant.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
);
if let Some(guard) = guard {
self.maybe_drop_tenant(tenant.tenant_shard_id.tenant_id, &mut locked, &guard);
}
}

// Maybe some other work can proceed now that this job finished.
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
Expand Down Expand Up @@ -1291,7 +1306,7 @@ impl Service {
.set(nodes.len() as i64);

tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.list_tenant_shards().await?;
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
tracing::info!(
"Loaded {} shards from database.",
tenant_shard_persistence.len()
Expand Down Expand Up @@ -2436,6 +2451,99 @@ impl Service {
}
}

/// For APIs that might act on tenants with [`PlacementPolicy::Detached`], first check if
/// the tenant is present in memory. If not, load it from the database. If it is found
/// in neither location, return a NotFound error.
///
/// Caller must demonstrate they hold a lock guard, as otherwise two callers might try and load
/// it at the same time, or we might race with [`Self::maybe_drop_tenant`]
async fn maybe_load_tenant(
&self,
tenant_id: TenantId,
_guard: &TracingExclusiveGuard<TenantOperations>,
) -> Result<(), ApiError> {
let present_in_memory = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
.is_some()
};

if present_in_memory {
return Ok(());
}

let tenant_shards = self.persistence.load_tenant(tenant_id).await?;
if tenant_shards.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
));
}

// TODO: choose a fresh AZ to use for this tenant when un-detaching: there definitely isn't a running
// compute, so no benefit to making AZ sticky across detaches.

let mut locked = self.inner.write().unwrap();
tracing::info!(
"Loaded {} shards for tenant {}",
tenant_shards.len(),
tenant_id
);

locked.tenants.extend(tenant_shards.into_iter().map(|p| {
let intent = IntentState::new();
let shard =
TenantShard::from_persistent(p, intent).expect("Corrupt shard row in database");

// Sanity check: when loading on-demand, we should always be loaded something Detached
debug_assert!(shard.policy == PlacementPolicy::Detached);
if shard.policy != PlacementPolicy::Detached {
tracing::error!(
"Tenant shard {} loaded on-demand, but has non-Detached policy {:?}",
shard.tenant_shard_id,
shard.policy
);
}

(shard.tenant_shard_id, shard)
}));

Ok(())
}

/// If all shards for a tenant are detached, and in a fully quiescent state (no observed locations on pageservers),
/// and have no reconciler running, then we can drop the tenant from memory. It will be reloaded on-demand
/// if we are asked to attach it again (see [`Self::maybe_load_tenant`]).
///
/// Caller must demonstrate they hold a lock guard, as otherwise it is unsafe to drop a tenant from
/// memory while some other function might assume it continues to exist while not holding the lock on Self::inner.
fn maybe_drop_tenant(
&self,
tenant_id: TenantId,
locked: &mut std::sync::RwLockWriteGuard<ServiceState>,
_guard: &TracingExclusiveGuard<TenantOperations>,
) {
let mut tenant_shards = locked.tenants.range(TenantShardId::tenant_range(tenant_id));
if tenant_shards.all(|(_id, shard)| {
shard.policy == PlacementPolicy::Detached
&& shard.reconciler.is_none()
&& shard.observed.is_empty()
}) {
let keys = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(id, _)| id)
.copied()
.collect::<Vec<_>>();
for key in keys {
tracing::info!("Dropping detached tenant shard {} from memory", key);
locked.tenants.remove(&key);
}
}
}

/// This API is used by the cloud control plane to migrate unsharded tenants that it created
/// directly with pageservers into this service.
///
Expand All @@ -2462,14 +2570,26 @@ impl Service {
)
.await;

if !tenant_shard_id.is_unsharded() {
let tenant_id = if !tenant_shard_id.is_unsharded() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"This API is for importing single-sharded or unsharded tenants"
)));
}
} else {
tenant_shard_id.tenant_id
};

// In case we are waking up a Detached tenant
match self.maybe_load_tenant(tenant_id, &_tenant_lock).await {
Ok(()) | Err(ApiError::NotFound(_)) => {
// This is a creation or an update
VladLazar marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
return Err(e);
}
};

// First check if this is a creation or an update
let create_or_update = self.tenant_location_config_prepare(tenant_shard_id.tenant_id, req);
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);

let mut result = TenantLocationConfigResponse {
shards: Vec::new(),
Expand Down Expand Up @@ -2596,6 +2716,8 @@ impl Service {
let tenant_id = req.tenant_id;
let patch = req.config;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

let base = {
let locked = self.inner.read().unwrap();
let shards = locked
Expand Down Expand Up @@ -2640,19 +2762,7 @@ impl Service {
)
.await;

let tenant_exists = {
let locked = self.inner.read().unwrap();
let mut r = locked
.tenants
.range(TenantShardId::tenant_range(req.tenant_id));
r.next().is_some()
};

if !tenant_exists {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", req.tenant_id).into(),
));
}
self.maybe_load_tenant(req.tenant_id, &_tenant_lock).await?;

self.set_tenant_config_and_reconcile(req.tenant_id, req.config)
.await
Expand Down Expand Up @@ -2945,6 +3055,8 @@ impl Service {
let _tenant_lock =
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
let mut detach_waiters = Vec::new();
Expand Down Expand Up @@ -3064,6 +3176,8 @@ impl Service {
)
.await;

self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;

failpoint_support::sleep_millis_async!("tenant-update-policy-exclusive-lock");

let TenantPolicyRequest {
Expand Down Expand Up @@ -5146,11 +5260,13 @@ impl Service {
)));
}

let mut shards = self.persistence.list_tenant_shards().await?;
shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
persistent_shards
.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));

expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));

if shards != expect_shards {
if persistent_shards != expect_shards {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
Expand All @@ -5159,7 +5275,7 @@ impl Service {
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&shards)
serde_json::to_string(&persistent_shards)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
Expand Down Expand Up @@ -6115,6 +6231,10 @@ impl Service {
let mut pending_reconciles = 0;
let mut az_violations = 0;

// If we find any tenants to drop from memory, stash them to offload after
// we're done traversing t map of tenants.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the map of

let mut drop_detached_tenants = Vec::new();

let mut reconciles_spawned = 0;
for shard in tenants.values_mut() {
// Accumulate scheduling statistics
Expand Down Expand Up @@ -6148,6 +6268,25 @@ impl Service {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
}

// If this tenant is detached, try dropping it from memory. This is usually done
// proactively in [`Self::process_results`], but we do it here to handle the edge
// case where a reconcile completes while someone else is holding an op lock for the tenant.
if shard.tenant_shard_id.shard_number == ShardNumber(0)
&& shard.policy == PlacementPolicy::Detached
{
if let Some(guard) = self.tenant_op_locks.try_exclusive(
shard.tenant_shard_id.tenant_id,
TenantOperations::DropDetached,
) {
drop_detached_tenants.push((shard.tenant_shard_id.tenant_id, guard));
}
}
}

// Process any deferred tenant drops
for (tenant_id, guard) in drop_detached_tenants {
self.maybe_drop_tenant(tenant_id, &mut locked, &guard);
}

metrics::METRICS_REGISTRY
Expand Down
4 changes: 4 additions & 0 deletions storage_controller/src/tenant_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl ObservedState {
locations: HashMap::new(),
}
}

pub(crate) fn is_empty(&self) -> bool {
self.locations.is_empty()
}
}

impl TenantShard {
Expand Down
Loading
Loading