Skip to content

server: introduce a serial console backend #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ serde_derive = "1.0"
serde_json = "1.0"
serde_test = "1.0.138"
serde_with = "3.11.0"
slab = "0.4.9"
slog = "2.7"
slog-async = "2.8"
slog-bunyan = "2.4.0"
Expand Down
16 changes: 12 additions & 4 deletions bin/propolis-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ enum Command {
/// Defaults to the most recent 16 KiB of console output (-16384).
#[clap(long, short)]
byte_offset: Option<i64>,

/// True if the serial console connection should be read-only.
#[clap(long, action)]
readonly: bool,
},

/// Migrate instance to new propolis-server
Expand Down Expand Up @@ -604,9 +608,11 @@ async fn stdin_to_websockets_task(
async fn serial(
addr: SocketAddr,
byte_offset: Option<i64>,
readonly: bool,
log: Logger,
) -> anyhow::Result<()> {
let mut ws_console = serial_connect(addr, byte_offset, log).await?;
let mut ws_console =
serial_connect(addr, byte_offset, readonly, log).await?;

let _raw_guard = RawTermiosGuard::stdio_guard()
.with_context(|| anyhow!("failed to set raw mode"))?;
Expand Down Expand Up @@ -699,6 +705,7 @@ async fn serial(
async fn serial_connect(
addr: SocketAddr,
byte_offset: Option<i64>,
readonly: bool,
log: Logger,
) -> anyhow::Result<InstanceSerialConsoleHelper> {
let offset = match byte_offset {
Expand All @@ -707,7 +714,8 @@ async fn serial_connect(
None => WSClientOffset::MostRecent(16384),
};

Ok(InstanceSerialConsoleHelper::new(addr, offset, Some(log)).await?)
Ok(InstanceSerialConsoleHelper::new(addr, offset, readonly, Some(log))
.await?)
}

async fn migrate_instance(
Expand Down Expand Up @@ -914,8 +922,8 @@ async fn main() -> anyhow::Result<()> {
}
Command::Get => get_instance(&client).await?,
Command::State { state } => put_instance(&client, state).await?,
Command::Serial { byte_offset } => {
serial(addr, byte_offset, log).await?
Command::Serial { byte_offset, readonly } => {
serial(addr, byte_offset, readonly, log).await?
}
Command::Migrate { dst_server, dst_port, dst_uuid, crucible_disks } => {
let dst_addr = SocketAddr::new(dst_server, dst_port);
Expand Down
1 change: 1 addition & 0 deletions bin/propolis-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ toml.workspace = true
serde.workspace = true
serde_derive.workspace = true
serde_json.workspace = true
slab.workspace = true
slog.workspace = true
slog-async.workspace = true
slog-bunyan.workspace = true
Expand Down
9 changes: 4 additions & 5 deletions bin/propolis-server/src/lib/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::os::unix::fs::FileTypeExt;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::serial::Serial;
use crate::serial::backend::ConsoleBackend;
use crate::spec::{self, Spec, StorageBackend, StorageDevice};
use crate::stats::{
track_network_interface_kstats, track_vcpu_kstats, VirtualDiskProducer,
Expand Down Expand Up @@ -377,7 +377,7 @@ impl MachineInitializer<'_> {
pub fn initialize_uart(
&mut self,
chipset: &RegisteredChipset,
) -> Serial<LpcUart> {
) -> Arc<ConsoleBackend> {
let mut com1 = None;
for (name, desc) in self.spec.serial.iter() {
if desc.device != spec::SerialPortDevice::Uart {
Expand All @@ -401,9 +401,8 @@ impl MachineInitializer<'_> {
}
}

let sink_size = NonZeroUsize::new(64).unwrap();
let source_size = NonZeroUsize::new(1024).unwrap();
Serial::new(com1.unwrap(), sink_size, source_size)
const COM1_HISTORY_BYTES: usize = 1024 * 1024;
ConsoleBackend::new(com1.unwrap().clone(), COM1_HISTORY_BYTES)
}

pub fn initialize_ps2(
Expand Down
50 changes: 31 additions & 19 deletions bin/propolis-server/src/lib/migrate/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use bitvec::prelude as bv;
use futures::{SinkExt, StreamExt};
use propolis::common::{GuestAddr, Lifecycle, PAGE_SIZE};
use propolis::migrate::{
MigrateCtx, MigrateStateError, Migrator, PayloadOffer, PayloadOffers,
MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOffer,
PayloadOffers,
};
use propolis::vmm;
use propolis_api_types::instance_spec::SpecKey;
Expand All @@ -22,10 +23,10 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::{tungstenite, WebSocketStream};
use uuid::Uuid;

use crate::migrate::codec;
use crate::migrate::memx;
use crate::migrate::preamble::Preamble;
use crate::migrate::probes;
use crate::migrate::{codec, DevicePayload};
use crate::migrate::{
Device, MigrateError, MigratePhase, MigrateRole, MigrationState, PageIter,
};
Expand Down Expand Up @@ -511,6 +512,21 @@ impl<T: MigrateConn> RonV0<T> {
return Err(MigrateError::UnexpectedMessage);
}
};

let com1_payload: DevicePayload = match self.read_msg().await? {
codec::Message::Serialized(encoded) => {
ron::de::from_reader(encoded.as_bytes())
.map_err(codec::ProtocolError::from)?
}
msg => {
error!(
self.log(),
"device_state: unexpected COM1 history message: {msg:?}"
);
return Err(MigrateError::UnexpectedMessage);
}
};

self.read_ok().await?;

info!(self.log(), "Devices: {devices:#?}");
Expand All @@ -529,6 +545,18 @@ impl<T: MigrateConn> RonV0<T> {
})?;
self.import_device(&target, &device, &migrate_ctx)?;
}

let com1_data =
&mut ron::Deserializer::from_str(&com1_payload.data)
.map_err(codec::ProtocolError::from)?;
let com1_offer = PayloadOffer {
kind: &com1_payload.kind,
version: com1_payload.version,
payload: Box::new(<dyn erased_serde::Deserializer>::erase(
com1_data,
)),
};
vm_objects.com1().import(com1_offer, &migrate_ctx)?;
}

self.send_msg(codec::Message::Okay).await
Expand Down Expand Up @@ -762,24 +790,8 @@ impl<T: MigrateConn> RonV0<T> {
.map_err(codec::ProtocolError::from)?,
))
.await?;
let com1_history = match self.read_msg().await? {
codec::Message::Serialized(encoded) => encoded,
msg => {
error!(self.log(), "server_state: unexpected message: {msg:?}");
return Err(MigrateError::UnexpectedMessage);
}
};

ensure_ctx
.vm_objects()
.lock_shared()
.await
.com1()
.import(&com1_history)
.await
.map_err(|e| MigrateError::Codec(e.to_string()))?;

self.send_msg(codec::Message::Okay).await
Ok(())
}

async fn finish(
Expand Down
41 changes: 31 additions & 10 deletions bin/propolis-server/src/lib/migrate/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bitvec::prelude::{BitSlice, Lsb0};
use futures::{SinkExt, StreamExt};
use propolis::common::{GuestAddr, GuestData, PAGE_SIZE};
use propolis::migrate::{
MigrateCtx, MigrateStateError, Migrator, PayloadOutputs,
MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOutputs,
};
use propolis::vmm;
use propolis_api_types::instance_spec::VersionedInstanceSpec;
Expand All @@ -32,6 +32,7 @@ use crate::migrate::{
};

use crate::vm::objects::VmObjects;
use crate::vm::services::VmServices;
use crate::vm::state_publisher::{
ExternalStateUpdate, MigrationStateUpdate, StatePublisher,
};
Expand Down Expand Up @@ -133,6 +134,7 @@ pub(crate) trait SourceProtocol {
async fn run(
self,
vm_objects: &VmObjects,
vm_services: &VmServices,
publisher: &mut StatePublisher,
persistent_state: &mut PersistentState,
) -> Result<(), MigrateError>;
Expand Down Expand Up @@ -318,6 +320,7 @@ impl<T: MigrateConn> SourceProtocol for RonV0<T> {
async fn run(
self,
vm_objects: &VmObjects,
vm_services: &VmServices,
publisher: &mut StatePublisher,
persistent_state: &mut PersistentState,
) -> Result<(), MigrateError> {
Expand All @@ -327,6 +330,7 @@ impl<T: MigrateConn> SourceProtocol for RonV0<T> {
conn: self.conn,
dirt: self.dirt,
vm: vm_objects,
vm_services,
state_publisher: publisher,
persistent_state,
paused: false,
Expand All @@ -342,6 +346,7 @@ struct RonV0Runner<'vm, T: MigrateConn> {
conn: WebSocketStream<T>,
dirt: Option<HashMap<GuestAddr, PageBitmap>>,
vm: &'vm VmObjects,
vm_services: &'vm VmServices,
state_publisher: &'vm mut StatePublisher,
persistent_state: &'vm mut PersistentState,
paused: bool,
Expand Down Expand Up @@ -680,6 +685,7 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
async fn device_state(&mut self) -> Result<(), MigrateError> {
self.update_state(MigrationState::Device);
let mut device_states = vec![];
let com1_payload;
{
let objects = self.vm.lock_shared().await;
let migrate_ctx =
Expand Down Expand Up @@ -728,6 +734,14 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
}
Ok(())
})?;

let com1_state = objects.com1().export(&migrate_ctx)?;
com1_payload = DevicePayload {
kind: com1_state.kind.to_owned(),
version: com1_state.version,
data: ron::ser::to_string(&com1_state.payload)
.map_err(codec::ProtocolError::from)?,
};
}

info!(self.log(), "Device States: {device_states:#?}");
Expand All @@ -738,7 +752,14 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
))
.await?;

self.send_msg(codec::Message::Serialized(
ron::ser::to_string(&com1_payload)
.map_err(codec::ProtocolError::from)?,
))
.await?;

self.send_msg(codec::Message::Okay).await?;

self.read_ok().await
}

Expand Down Expand Up @@ -782,15 +803,15 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
}
_ => return Err(MigrateError::UnexpectedMessage),
};
let com1_history = self
.vm
.lock_shared()
.await
.com1()
.export_history(remote_addr)
.await?;
self.send_msg(codec::Message::Serialized(com1_history)).await?;
self.read_ok().await

{
let mgr = self.vm_services.serial_mgr.lock().await;
if let Some(mgr) = mgr.as_ref() {
mgr.notify_migration(remote_addr).await;
}
}

Ok(())
}

async fn finish(&mut self) -> Result<(), MigrateError> {
Expand Down
Loading
Loading