Skip to content

[sled-agent] Don't block InstanceManager on full MPSCs #6913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 134 additions & 60 deletions sled-agent/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub enum Error {
#[error("Failed to send request to Instance: Channel closed")]
FailedSendChannelClosed,

#[error(
"Failed to send request to Instance: channel at capacity \
({QUEUE_SIZE})"
)]
FailedSendChannelFull,

#[error(
"Failed to send request from Instance Runner: Client Channel closed"
)]
Expand Down Expand Up @@ -217,10 +223,10 @@ enum InstanceRequest {
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
},
GetFilesystemPool {
tx: oneshot::Sender<Option<ZpoolName>>,
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
},
CurrentState {
tx: oneshot::Sender<SledVmmState>,
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
},
PutState {
state: VmmStateRequested,
Expand Down Expand Up @@ -248,6 +254,58 @@ enum InstanceRequest {
},
}

impl InstanceRequest {
/// Handle an error returned by [`mpsc::Sender::try_send`] when attempting
/// to send a request to the instance.
///
/// This is a bit complex: the returned [`mpsc::error::TrySendError`] will
/// contain the [`InstanceRequest`] we were trying to send, and thus the
/// [`oneshot::Sender`] for that request's response. This function handles
/// the `TrySendError` by inspecting the error to determine whether the
/// channel has closed or is full, constructing the relevant [`Error`], and
/// extracting the response oneshot channel from the request, and then
/// sending back the error over that channel.
///
/// If sending the error back to the client fails, this function returns an
/// error, so that the client having given up can be logged; otherwise, it returns `Ok(())`.
fn fail_try_send(
err: mpsc::error::TrySendError<Self>,
) -> Result<(), Error> {
let (error, this) = match err {
mpsc::error::TrySendError::Closed(this) => {
(Error::FailedSendChannelClosed, this)
}
mpsc::error::TrySendError::Full(this) => {
(Error::FailedSendChannelFull, this)
}
};

match this {
Self::RequestZoneBundle { tx } => tx
.send(Err(BundleError::FailedSend(anyhow!(error))))
.map_err(|_| Error::FailedSendClientClosed),
Self::GetFilesystemPool { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::CurrentState { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::PutState { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::Terminate { tx, .. } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
Self::IssueSnapshotRequest { tx, .. }
| Self::AddExternalIp { tx, .. }
| Self::DeleteExternalIp { tx, .. }
| Self::RefreshExternalIps { tx } => tx
.send(Err(error.into()))
.map_err(|_| Error::FailedSendClientClosed),
}
}
}

// A small task which tracks the state of the instance, by constantly querying
// the state of Propolis for updates.
//
Expand Down Expand Up @@ -488,11 +546,11 @@ impl InstanceRunner {
.map_err(|_| Error::FailedSendClientClosed)
},
Some(GetFilesystemPool { tx } ) => {
tx.send(self.get_filesystem_zpool())
tx.send(Ok(self.get_filesystem_zpool()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(CurrentState{ tx }) => {
tx.send(self.current_state())
tx.send(Ok(self.current_state()))
.map_err(|_| Error::FailedSendClientClosed)
},
Some(PutState{ state, tx }) => {
Expand Down Expand Up @@ -562,9 +620,9 @@ impl InstanceRunner {
RequestZoneBundle { tx } => tx
.send(Err(BundleError::InstanceTerminating))
.map_err(|_| ()),
GetFilesystemPool { tx } => tx.send(None).map_err(|_| ()),
GetFilesystemPool { tx } => tx.send(Ok(None)).map_err(|_| ()),
CurrentState { tx } => {
tx.send(self.current_state()).map_err(|_| ())
tx.send(Ok(self.current_state())).map_err(|_| ())
}
PutState { tx, .. } => {
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
Expand Down Expand Up @@ -1092,13 +1150,48 @@ fn propolis_error_code(
}

/// Describes a single Propolis server that incarnates a specific instance.
#[derive(Clone)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent behind deriving Clone for this is to allow the instance's handle to be cloned into tasks spawned by the InstanceManager for the functions that need to send multiple requests to the instance (or requests to multiple instances) rather than just passing in the tx from the original request (e.g. use_only_these_disks). I didn't actually do that in this PR but intend to do so in a follow-up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a tiny nitpick I might punt on deriving clone until that PR, because it doesn't seem necessary in this one.

(but also, not a big deal if this is a pain in the butt)

pub struct Instance {
id: InstanceUuid,

/// Request channel for communicating with the instance task.
///
/// # Extremely Serious Warning
///
/// This channel is used by the `InstanceManager` task to communicate to the
/// instance task corresponding to each instance on this sled. Note that all
/// of the methods on this type which send [`InstanceRequest`]s over this
/// channel use [`mpsc::Sender::try_send`], which fails if the channel is at
/// capacity, and *not* [`mpsc::Sender::send`], which is an async method
/// that *waits* until capacity is available. THIS IS VERY IMPORTANT.
///
/// This is because the `InstanceManager` task will call these methods in
/// its request-processing loop as it receives requests from clients, in
/// order to forward the request to the relevant instance. If the instance's
/// channel has filled up because the instance is currently processing a
/// slow request, `await`ing a call to [`mpsc::Sender::send`] will block the
/// `InstanceManager`'s main loop from proceeding until the instance task
/// has finished what it's doing and drained the next request from channel.
/// Critically, this means that requests to *other, unrelated instances* on
/// this sled would have to wait until this instance has finished what it's
/// doing. That means a single deadlocked instance task, which is waiting
/// for something that never completes, can render *all* instances on this
/// sled inaccessible.
///
/// Therefore, any time we send requests to the `Instance` over this channel
/// from code that's called in the `InstanceManager`'s run loop MUST use
/// [`mpsc::Sender::try_send`] rather than [`mpsc::Sender::send`]. Should
/// the channel be at capacity, we return an
/// [`Error::FailedSendChannelFull`], which eventually becomes a 503 Service
/// Unavailable error when returned to the client. It is acceptable to call
/// [`mpsc::Sender::send`] on this channel ONLY from code which runs
/// exclusively in tasks that are not blocking the `InstanceManager`'s run
/// loop.
tx: mpsc::Sender<InstanceRequest>,

/// This is reference-counted so that the `Instance` struct may be cloned.
#[allow(dead_code)]
runner_handle: tokio::task::JoinHandle<()>,
runner_handle: Arc<tokio::task::JoinHandle<()>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -1250,43 +1343,39 @@ impl Instance {
let runner_handle =
tokio::task::spawn(async move { runner.run().await });

Ok(Instance { id, tx, runner_handle })
Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
}

pub fn id(&self) -> InstanceUuid {
self.id
}

/// Create bundle from an instance zone.
pub async fn request_zone_bundle(
pub fn request_zone_bundle(
&self,
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
) -> Result<(), BundleError> {
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::RequestZoneBundle { tx })
.await
.map_err(|err| BundleError::FailedSend(anyhow!(err)))?;
Ok(())
.try_send(InstanceRequest::RequestZoneBundle { tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn get_filesystem_zpool(
pub fn get_filesystem_zpool(
&self,
) -> Result<Option<ZpoolName>, Error> {
let (tx, rx) = oneshot::channel();
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::GetFilesystemPool { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(rx.await?)
.try_send(InstanceRequest::GetFilesystemPool { tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn current_state(&self) -> Result<SledVmmState, Error> {
let (tx, rx) = oneshot::channel();
pub fn current_state(
&self,
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::CurrentState { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(rx.await?)
.try_send(InstanceRequest::CurrentState { tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Attempts to update the current state of the instance by launching a
Expand All @@ -1300,84 +1389,72 @@ impl Instance {
/// instance begins to stop when Propolis has just begun to handle a prior
/// request to reboot, the instance's state may proceed from Stopping to
/// Rebooting to Running to Stopping to Stopped.
pub async fn put_state(
pub fn put_state(
&self,
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
state: VmmStateRequested,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::PutState { state, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::PutState { state, tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Rudely terminates this instance's Propolis (if it has one) and
/// immediately transitions the instance to the Destroyed state.
pub async fn terminate(
pub fn terminate(
&self,
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
mark_failed: bool,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::Terminate { mark_failed, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::Terminate { mark_failed, tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn issue_snapshot_request(
pub fn issue_snapshot_request(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
disk_id: Uuid,
snapshot_id: Uuid,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::IssueSnapshotRequest {
.try_send(InstanceRequest::IssueSnapshotRequest {
disk_id,
snapshot_id,
tx,
})
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.or_else(InstanceRequest::fail_try_send)
}

pub async fn add_external_ip(
pub fn add_external_ip(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::AddExternalIp { ip: *ip, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::AddExternalIp { ip: *ip, tx })
.or_else(InstanceRequest::fail_try_send)
}

pub async fn delete_external_ip(
pub fn delete_external_ip(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
ip: &InstanceExternalIpBody,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
.or_else(InstanceRequest::fail_try_send)
}

/// Reinstalls an instance's set of external IPs within OPTE, using
/// up-to-date IP<->IGW mappings. This will not disrupt existing flows.
pub async fn refresh_external_ips(
pub fn refresh_external_ips(
&self,
tx: oneshot::Sender<Result<(), ManagerError>>,
) -> Result<(), Error> {
self.tx
.send(InstanceRequest::RefreshExternalIps { tx })
.await
.map_err(|_| Error::FailedSendChannelClosed)?;
Ok(())
.try_send(InstanceRequest::RefreshExternalIps { tx })
.or_else(InstanceRequest::fail_try_send)
}
}

Expand Down Expand Up @@ -2104,7 +2181,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, start our "instance"
// (backed by fakes and propolis_mock_server)
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

// even though we ignore this result at instance creation time in
Expand Down Expand Up @@ -2198,7 +2274,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, try in vain to start
// our "instance", but no propolis server is running
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

let timeout_fut = timeout(TIMEOUT_DURATION, put_rx);
Expand Down Expand Up @@ -2305,7 +2380,6 @@ mod tests {
// pretending we're InstanceManager::ensure_state, try in vain to start
// our "instance", but the zone never finishes installing
inst.put_state(put_tx, VmmStateRequested::Running)
.await
.expect("failed to send Instance::put_state");

// Timeout our future waiting for the instance-state-change at 1s. This
Expand Down
Loading
Loading