Skip to content

Commit 6f85e92

Browse files
committed
server: import/export history
1 parent c7bc00d commit 6f85e92

File tree

6 files changed

+171
-72
lines changed

6 files changed

+171
-72
lines changed

bin/propolis-server/src/lib/migrate/destination.rs

+31-19
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use bitvec::prelude as bv;
66
use futures::{SinkExt, StreamExt};
77
use propolis::common::{GuestAddr, Lifecycle, PAGE_SIZE};
88
use propolis::migrate::{
9-
MigrateCtx, MigrateStateError, Migrator, PayloadOffer, PayloadOffers,
9+
MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOffer,
10+
PayloadOffers,
1011
};
1112
use propolis::vmm;
1213
use propolis_api_types::instance_spec::SpecKey;
@@ -22,10 +23,10 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame;
2223
use tokio_tungstenite::{tungstenite, WebSocketStream};
2324
use uuid::Uuid;
2425

25-
use crate::migrate::codec;
2626
use crate::migrate::memx;
2727
use crate::migrate::preamble::Preamble;
2828
use crate::migrate::probes;
29+
use crate::migrate::{codec, DevicePayload};
2930
use crate::migrate::{
3031
Device, MigrateError, MigratePhase, MigrateRole, MigrationState, PageIter,
3132
};
@@ -511,6 +512,21 @@ impl<T: MigrateConn> RonV0<T> {
511512
return Err(MigrateError::UnexpectedMessage);
512513
}
513514
};
515+
516+
let com1_payload: DevicePayload = match self.read_msg().await? {
517+
codec::Message::Serialized(encoded) => {
518+
ron::de::from_reader(encoded.as_bytes())
519+
.map_err(codec::ProtocolError::from)?
520+
}
521+
msg => {
522+
error!(
523+
self.log(),
524+
"device_state: unexpected COM1 history message: {msg:?}"
525+
);
526+
return Err(MigrateError::UnexpectedMessage);
527+
}
528+
};
529+
514530
self.read_ok().await?;
515531

516532
info!(self.log(), "Devices: {devices:#?}");
@@ -529,6 +545,18 @@ impl<T: MigrateConn> RonV0<T> {
529545
})?;
530546
self.import_device(&target, &device, &migrate_ctx)?;
531547
}
548+
549+
let com1_data =
550+
&mut ron::Deserializer::from_str(&com1_payload.data)
551+
.map_err(codec::ProtocolError::from)?;
552+
let com1_offer = PayloadOffer {
553+
kind: &com1_payload.kind,
554+
version: com1_payload.version,
555+
payload: Box::new(<dyn erased_serde::Deserializer>::erase(
556+
com1_data,
557+
)),
558+
};
559+
vm_objects.com1().import(com1_offer, &migrate_ctx)?;
532560
}
533561

534562
self.send_msg(codec::Message::Okay).await
@@ -762,24 +790,8 @@ impl<T: MigrateConn> RonV0<T> {
762790
.map_err(codec::ProtocolError::from)?,
763791
))
764792
.await?;
765-
let com1_history = match self.read_msg().await? {
766-
codec::Message::Serialized(encoded) => encoded,
767-
msg => {
768-
error!(self.log(), "server_state: unexpected message: {msg:?}");
769-
return Err(MigrateError::UnexpectedMessage);
770-
}
771-
};
772-
773-
ensure_ctx
774-
.vm_objects()
775-
.lock_shared()
776-
.await
777-
.com1()
778-
.import(&com1_history)
779-
.await
780-
.map_err(|e| MigrateError::Codec(e.to_string()))?;
781793

782-
self.send_msg(codec::Message::Okay).await
794+
Ok(())
783795
}
784796

785797
async fn finish(

bin/propolis-server/src/lib/migrate/source.rs

+26-10
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bitvec::prelude::{BitSlice, Lsb0};
66
use futures::{SinkExt, StreamExt};
77
use propolis::common::{GuestAddr, GuestData, PAGE_SIZE};
88
use propolis::migrate::{
9-
MigrateCtx, MigrateStateError, Migrator, PayloadOutputs,
9+
MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOutputs,
1010
};
1111
use propolis::vmm;
1212
use propolis_api_types::instance_spec::VersionedInstanceSpec;
@@ -685,6 +685,7 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
685685
async fn device_state(&mut self) -> Result<(), MigrateError> {
686686
self.update_state(MigrationState::Device);
687687
let mut device_states = vec![];
688+
let com1_payload;
688689
{
689690
let objects = self.vm.lock_shared().await;
690691
let migrate_ctx =
@@ -733,6 +734,14 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
733734
}
734735
Ok(())
735736
})?;
737+
738+
let com1_state = objects.com1().export(&migrate_ctx)?;
739+
com1_payload = DevicePayload {
740+
kind: com1_state.kind.to_owned(),
741+
version: com1_state.version,
742+
data: ron::ser::to_string(&com1_state.payload)
743+
.map_err(codec::ProtocolError::from)?,
744+
};
736745
}
737746

738747
info!(self.log(), "Device States: {device_states:#?}");
@@ -743,7 +752,14 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
743752
))
744753
.await?;
745754

755+
self.send_msg(codec::Message::Serialized(
756+
ron::ser::to_string(&com1_payload)
757+
.map_err(codec::ProtocolError::from)?,
758+
))
759+
.await?;
760+
746761
self.send_msg(codec::Message::Okay).await?;
762+
747763
self.read_ok().await
748764
}
749765

@@ -787,15 +803,15 @@ impl<T: MigrateConn> RonV0Runner<'_, T> {
787803
}
788804
_ => return Err(MigrateError::UnexpectedMessage),
789805
};
790-
let com1_history = self
791-
.vm
792-
.lock_shared()
793-
.await
794-
.com1()
795-
.export_history(remote_addr)
796-
.await?;
797-
self.send_msg(codec::Message::Serialized(com1_history)).await?;
798-
self.read_ok().await
806+
807+
{
808+
let mgr = self.vm_services.serial_mgr.lock().await;
809+
if let Some(mgr) = mgr.as_ref() {
810+
mgr.notify_migration(remote_addr).await;
811+
}
812+
}
813+
814+
Ok(())
799815
}
800816

801817
async fn finish(&mut self) -> Result<(), MigrateError> {

bin/propolis-server/src/lib/serial/backend.rs

+30-5
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ use super::history_buffer::{HistoryBuffer, SerialHistoryOffset};
2525
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
2626
struct ClientId(u64);
2727

28-
trait ConsoleDevice: Source + Sink {}
29-
impl<T: Source + Sink> ConsoleDevice for T {}
30-
3128
/// A client's rights when accessing a character backend.
3229
#[derive(Clone, Copy)]
3330
pub(super) enum Permissions {
@@ -188,7 +185,6 @@ impl Inner {
188185
/// access a single serial device.
189186
pub struct ConsoleBackend {
190187
inner: Arc<Mutex<Inner>>,
191-
dev: Arc<dyn ConsoleDevice>,
192188
sink: Arc<dyn Sink>,
193189
sink_buffer: Arc<SinkBuffer>,
194190
done_tx: oneshot::Sender<()>,
@@ -211,7 +207,6 @@ impl ConsoleBackend {
211207

212208
let this = Arc::new(Self {
213209
inner: Arc::new(Mutex::new(Inner::new(history_bytes))),
214-
dev: device,
215210
sink,
216211
sink_buffer,
217212
done_tx,
@@ -268,6 +263,36 @@ impl Drop for ConsoleBackend {
268263
}
269264
}
270265

266+
mod migrate {
267+
use propolis::migrate::{
268+
MigrateCtx, MigrateSingle, MigrateStateError, PayloadOffer,
269+
};
270+
271+
use crate::serial::history_buffer::migrate::HistoryBufferContentsV1;
272+
273+
use super::ConsoleBackend;
274+
275+
impl MigrateSingle for ConsoleBackend {
276+
fn export(
277+
&self,
278+
_ctx: &MigrateCtx,
279+
) -> Result<propolis::migrate::PayloadOutput, MigrateStateError>
280+
{
281+
Ok(self.inner.lock().unwrap().buffer.export().into())
282+
}
283+
284+
fn import(
285+
&self,
286+
mut offer: PayloadOffer,
287+
_ctx: &MigrateCtx,
288+
) -> Result<(), MigrateStateError> {
289+
let contents: HistoryBufferContentsV1 = offer.parse()?;
290+
self.inner.lock().unwrap().buffer.import(contents);
291+
Ok(())
292+
}
293+
}
294+
}
295+
271296
async fn read_task(
272297
inner: Arc<Mutex<Inner>>,
273298
source: Arc<dyn Source>,

bin/propolis-server/src/lib/serial/history_buffer.rs

+43-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
88
use dropshot::HttpError;
99
use propolis_api_types as api;
10-
use serde::{Deserialize, Serialize};
1110
use std::collections::VecDeque;
1211
use std::convert::TryFrom;
1312

@@ -29,7 +28,7 @@ const DEFAULT_MAX_LENGTH: isize = 16 * 1024;
2928
/// An abstraction for storing the contents of the instance's serial console
3029
/// output, intended for retrieval by the web console or other monitoring or
3130
/// troubleshooting tools.
32-
#[derive(Deserialize, Serialize, Clone)]
31+
#[derive(Clone)]
3332
pub(crate) struct HistoryBuffer {
3433
beginning: Vec<u8>,
3534
rolling: VecDeque<u8>,
@@ -223,6 +222,48 @@ impl HistoryBuffer {
223222
pub fn bytes_from_start(&self) -> usize {
224223
self.total_bytes
225224
}
225+
226+
pub(super) fn export(&self) -> migrate::HistoryBufferContentsV1 {
227+
let slices = self.rolling.as_slices();
228+
let mut most_recent = Vec::with_capacity(self.rolling.len());
229+
most_recent.extend(slices.0);
230+
most_recent.extend(slices.1);
231+
migrate::HistoryBufferContentsV1 {
232+
beginning: self.beginning.clone(),
233+
most_recent,
234+
total_bytes: self.total_bytes,
235+
buffer_size: self.buffer_size,
236+
}
237+
}
238+
239+
pub(super) fn import(
240+
&mut self,
241+
contents: migrate::HistoryBufferContentsV1,
242+
) {
243+
self.beginning = contents.beginning;
244+
self.rolling.clear();
245+
self.rolling.extend(contents.most_recent.iter());
246+
self.total_bytes = contents.total_bytes;
247+
self.buffer_size = contents.buffer_size;
248+
}
249+
}
250+
251+
pub(crate) mod migrate {
252+
use serde::{Deserialize, Serialize};
253+
254+
#[derive(Deserialize, Serialize, Clone)]
255+
pub(crate) struct HistoryBufferContentsV1 {
256+
pub(super) beginning: Vec<u8>,
257+
pub(super) most_recent: Vec<u8>,
258+
pub(super) total_bytes: usize,
259+
pub(super) buffer_size: usize,
260+
}
261+
262+
impl propolis::migrate::Schema<'_> for HistoryBufferContentsV1 {
263+
fn id() -> propolis::migrate::SchemaId {
264+
("serial-console-backend", 1)
265+
}
266+
}
226267
}
227268

228269
#[cfg(test)]

bin/propolis-server/src/lib/serial/mod.rs

+39-35
Original file line numberDiff line numberDiff line change
@@ -145,48 +145,52 @@ impl SerialConsoleManager {
145145
),
146146
};
147147

148-
let mut client_tasks = self.client_tasks.lock().unwrap();
149-
let client_id = client_tasks.next_id();
150-
let prev_rw_task = if kind == ClientKind::ReadWrite {
151-
client_tasks.remove_rw_client()
152-
} else {
153-
None
154-
};
148+
let prev_rw_task;
149+
{
150+
let mut client_tasks = self.client_tasks.lock().unwrap();
151+
let client_id = client_tasks.next_id();
152+
prev_rw_task = if kind == ClientKind::ReadWrite {
153+
client_tasks.remove_rw_client()
154+
} else {
155+
None
156+
};
155157

156-
let backend_hdl =
157-
self.backend.attach_client(console_tx, permissions, discipline);
158-
159-
let ctx = SerialTaskContext {
160-
log: self.log.clone(),
161-
ws,
162-
backend_hdl,
163-
console_rx,
164-
control_rx,
165-
start_rx: task_start_rx,
166-
done_rx: task_done_rx,
167-
client_tasks: self.client_tasks.clone(),
168-
client_id,
169-
};
158+
let backend_hdl =
159+
self.backend.attach_client(console_tx, permissions, discipline);
160+
161+
let ctx = SerialTaskContext {
162+
log: self.log.clone(),
163+
ws,
164+
backend_hdl,
165+
console_rx,
166+
control_rx,
167+
start_rx: task_start_rx,
168+
done_rx: task_done_rx,
169+
client_tasks: self.client_tasks.clone(),
170+
client_id,
171+
};
170172

171-
let task = ClientTask {
172-
hdl: tokio::spawn(async move { serial_task(ctx).await }),
173-
control_tx,
174-
done_tx: task_done_tx,
175-
};
173+
let task = ClientTask {
174+
hdl: tokio::spawn(async move { serial_task(ctx).await }),
175+
control_tx,
176+
done_tx: task_done_tx,
177+
};
176178

177-
client_tasks.tasks.insert(client_id, task);
178-
if kind == ClientKind::ReadWrite {
179-
assert!(client_tasks.rw_client_id.is_none());
180-
client_tasks.rw_client_id = Some(client_id);
179+
client_tasks.tasks.insert(client_id, task);
180+
if kind == ClientKind::ReadWrite {
181+
assert!(client_tasks.rw_client_id.is_none());
182+
client_tasks.rw_client_id = Some(client_id);
183+
}
181184
}
182185

183-
drop(client_tasks);
184186
if let Some(task) = prev_rw_task {
185-
task.done_tx.send(());
186-
task.hdl.await;
187+
let _ = task.done_tx.send(());
188+
let _ = task.hdl.await;
187189
}
188190

189-
task_start_tx.send(());
191+
task_start_tx
192+
.send(())
193+
.expect("new serial task shouldn't exit before starting");
190194
}
191195

192196
pub async fn notify_migration(&self, destination: SocketAddr) {
@@ -468,5 +472,5 @@ async fn serial_task(
468472
close(&log, client_id, sink, stream, close_reason).await;
469473
}
470474

471-
client_tasks.lock().unwrap().tasks.remove(&client_id);
475+
client_tasks.lock().unwrap().remove_by_id(client_id);
472476
}

bin/propolis-server/src/lib/server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,8 @@ async fn instance_serial(
459459
serial_mgr
460460
.as_ref()
461461
.ok_or("Instance has no serial console manager")?
462-
.connect(ws_stream, crate::serial::ClientKind::ReadWrite);
462+
.connect(ws_stream, crate::serial::ClientKind::ReadWrite)
463+
.await;
463464

464465
Ok(())
465466
}

0 commit comments

Comments
 (0)