Skip to content

Commit

Permalink
Run live migration directly on the state driver task (#720)
Browse files Browse the repository at this point in the history
Run live migration protocols on the state driver's tokio task without using
`block_on`:

- Define `SourceProtocol` and `DestinationProtocol` traits that describe
  the routines the state driver uses to run a generic migration
  irrespective of its protocol version. (This will be useful for protocol
  versioning later.)
- Move the `migrate::source_start` and `migrate::dest_initiate` routines
  into factory functions that connect to the peer Propolis, negotiate
  protocol versions, and return an appropriate protocol impl.
- Use the protocol impls to run migration on the state driver task. Remove
  all the types and constructs used to pass messages between it and
  migration tasks.

Also, improve the interface between the `vm` and `migrate` modules for
inbound migrations by defining some objects that migrations can use either
to fully initialize a VM or to unwind correctly if migration fails. This
allows migration to take control of when precisely a VM's components get
created (and from what spec) without exposing to the migration task all the
complexity of unwinding from a failed attempt to create a VM.

Tested via full PHD run with a Debian 11 guest.
  • Loading branch information
gjcolombo authored Jul 12, 2024
1 parent 64bcf94 commit 994cdf3
Show file tree
Hide file tree
Showing 11 changed files with 1,107 additions and 1,218 deletions.
469 changes: 305 additions & 164 deletions bin/propolis-server/src/lib/migrate/destination.rs

Large diffs are not rendered by default.

205 changes: 12 additions & 193 deletions bin/propolis-server/src/lib/migrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,14 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::net::SocketAddr;

use bit_field::BitField;
use dropshot::HttpError;
use futures::{SinkExt, StreamExt};
use propolis::migrate::MigrateStateError;
use propolis_api_types::{self as api, MigrationState};
use propolis_api_types::MigrationState;
use serde::{Deserialize, Serialize};
use slog::{error, info, o};
use slog::error;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::{tungstenite, WebSocketStream};
use uuid::Uuid;

mod codec;
pub mod destination;
Expand All @@ -25,6 +18,15 @@ mod preamble;
pub mod protocol;
pub mod source;

/// Trait bounds for connection objects used in live migrations.
pub(crate) trait MigrateConn:
AsyncRead + AsyncWrite + Unpin + Send
{
}

impl MigrateConn for tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream> {}
impl MigrateConn for hyper::upgrade::Upgraded {}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum MigrateRole {
Source,
Expand Down Expand Up @@ -141,11 +143,6 @@ pub enum MigrateError {
/// The other end of the migration ran into an error
#[error("{0:?} migration instance encountered error: {1}")]
RemoteError(MigrateRole, String),

/// Sending/receiving from the VM state driver command/response channels
/// returned an error.
#[error("VM state driver unexpectedly closed channel")]
StateDriverChannelClosed,
}

impl From<tokio_tungstenite::tungstenite::Error> for MigrateError {
Expand Down Expand Up @@ -183,8 +180,7 @@ impl From<MigrateError> for HttpError {
| MigrateError::TimeData(_)
| MigrateError::DeviceState(_)
| MigrateError::RemoteError(_, _)
| MigrateError::StateMachine(_)
| MigrateError::StateDriverChannelClosed => {
| MigrateError::StateMachine(_) => {
HttpError::for_internal_error(msg)
}
MigrateError::MigrationAlreadyInProgress
Expand Down Expand Up @@ -219,183 +215,6 @@ struct DevicePayload {
pub data: String,
}

pub(crate) struct SourceContext<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> {
pub conn: WebSocketStream<T>,
pub protocol: crate::migrate::protocol::Protocol,
}

/// Begin the migration process (source-side).
///
/// This will check protocol version and then begin the migration in a separate task.
pub async fn source_start<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
>(
log: &slog::Logger,
migration_id: Uuid,
mut conn: WebSocketStream<T>,
) -> Result<SourceContext<T>, MigrateError> {
// Create a new log context for the migration
let log = log.new(o!(
"migration_id" => migration_id.to_string(),
"migrate_role" => "source"
));
info!(log, "Migration Source");

let selected = match conn.next().await {
Some(Ok(tungstenite::Message::Text(dst_protocols))) => {
info!(log, "destination offered protocols: {}", dst_protocols);
match protocol::select_protocol_from_offer(&dst_protocols) {
Ok(Some(selected)) => {
info!(log, "selected protocol {:?}", selected);
conn.send(tungstenite::Message::Text(
selected.offer_string(),
))
.await?;
selected
}
Ok(None) => {
let src_protocols = protocol::make_protocol_offer();
error!(
log,
"no compatible destination protocols";
"dst_protocols" => &dst_protocols,
"src_protocols" => &src_protocols,
);
return Err(MigrateError::NoMatchingProtocol(
src_protocols,
dst_protocols,
));
}
Err(e) => {
error!(log, "failed to parse destination protocol offer";
"dst_protocols" => &dst_protocols,
"error" => %e);
return Err(MigrateError::ProtocolParse(
dst_protocols,
e.to_string(),
));
}
}
}
x => {
conn.send(tungstenite::Message::Close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "did not begin with version handshake.".into(),
})))
.await?;
error!(
log,
"destination side did not begin migration version handshake: \
{:?}",
x
);
return Err(MigrateError::Initiate);
}
};

Ok(SourceContext { conn, protocol: selected })
}

pub(crate) struct DestinationContext<
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
> {
pub migration_id: Uuid,
pub conn: WebSocketStream<T>,
pub local_addr: SocketAddr,
pub protocol: crate::migrate::protocol::Protocol,
}

/// Initiate a migration to the given source instance.
///
/// This will attempt to open a websocket to the given source instance and
/// check that the migrate protocol version is compatible ("equal" presently).
/// Once we've successfully established the connection, we can begin the
/// migration process (destination-side).
pub(crate) async fn dest_initiate(
log: &slog::Logger,
migrate_info: &api::InstanceMigrateInitiateRequest,
local_server_addr: SocketAddr,
) -> Result<
DestinationContext<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
MigrateError,
> {
let migration_id = migrate_info.migration_id;

// Create a new log context for the migration
let log = log.new(o!(
"migration_id" => migration_id.to_string(),
"migrate_role" => "destination",
"migrate_src_addr" => migrate_info.src_addr
));
info!(log, "Migration Destination");

// Build upgrade request to the source instance
// (we do this by hand because it's hidden from the OpenAPI spec)
// TODO(#165): https (wss)
// TODO: We need to make sure the src_addr is a valid target
let src_migrate_url = format!(
"ws://{}/instance/migrate/{}/start",
migrate_info.src_addr, migration_id,
);
info!(log, "Begin migration"; "src_migrate_url" => &src_migrate_url);
let (mut conn, _) =
tokio_tungstenite::connect_async(src_migrate_url).await?;

let dst_protocols = protocol::make_protocol_offer();
conn.send(tungstenite::Message::Text(dst_protocols)).await?;
let selected = match conn.next().await {
Some(Ok(tungstenite::Message::Text(selected_protocol))) => {
info!(log, "source negotiated protocol {}", selected_protocol);
match protocol::select_protocol_from_offer(&selected_protocol) {
Ok(Some(selected)) => selected,
Ok(None) => {
let offered = protocol::make_protocol_offer();
error!(log, "source selected protocol not on offer";
"offered" => &offered,
"selected" => &selected_protocol);

return Err(MigrateError::NoMatchingProtocol(
selected_protocol,
offered,
));
}
Err(e) => {
error!(log, "source selected protocol failed to parse";
"selected" => &selected_protocol);

return Err(MigrateError::ProtocolParse(
selected_protocol,
e.to_string(),
));
}
}
}
x => {
conn.send(tungstenite::Message::Close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "did not respond to version handshake.".into(),
})))
.await?;
error!(
log,
"source instance failed to negotiate protocol version: {:?}", x
);
return Err(MigrateError::Initiate);
}
};

Ok(DestinationContext {
migration_id,
conn,
local_addr: local_server_addr,
protocol: selected,
})
}

// We should probably turn this into some kind of ValidatedBitmap
// data structure, so that we're only parsing it once.
struct PageIter<'a> {
Expand Down
Loading

0 comments on commit 994cdf3

Please sign in to comment.