diff --git a/Cargo.lock b/Cargo.lock index aa75b8589e3..2fa90e2ea05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2574,6 +2574,7 @@ dependencies = [ "sha2", "tar", "tokio", + "tokio-vsock", "tonic", "tower", "x86_64", diff --git a/oak_containers_launcher/Cargo.toml b/oak_containers_launcher/Cargo.toml index 6db2779718b..5751007efcc 100644 --- a/oak_containers_launcher/Cargo.toml +++ b/oak_containers_launcher/Cargo.toml @@ -34,6 +34,6 @@ tokio = { version = "*", features = [ "sync", ] } tokio-stream = { version = "*", features = ["net"] } -tokio-vsock = "*" +tokio-vsock = { version = "*", features = ["tonic-conn"] } tonic = { workspace = true, features = ["codegen"] } which = "*" diff --git a/oak_containers_launcher/src/lib.rs b/oak_containers_launcher/src/lib.rs index 4eb5a83fc9d..b308c56543d 100644 --- a/oak_containers_launcher/src/lib.rs +++ b/oak_containers_launcher/src/lib.rs @@ -50,11 +50,11 @@ use oak_proto_rust::oak::attestation::v1::{ pub use qemu::Params as QemuParams; use tokio::{ net::TcpListener, - sync::oneshot::{channel, Receiver, Sender}, + sync::{oneshot, watch}, task::JoinHandle, time::{timeout, Duration}, }; -use tokio_vsock::VsockAddr; +use tokio_vsock::{VsockAddr, VsockListener, VMADDR_CID_HOST}; use tonic::transport::Channel as TonicChannel; use crate::proto::oak::{ @@ -178,11 +178,11 @@ pub struct Launcher { // Orchestrator) and Attestation Endorsement (initialized by the Launcher). endorsed_evidence: Option, // Receiver that is used to get the Attestation Evidence from the server implementation. - evidence_receiver: Option>, - app_ready_notifier: Option>, + evidence_receiver: Option>, + app_ready_notifier: Option>, orchestrator_key_provisioning_client: Option>, trusted_app_channel: Channel, - shutdown: Option>, + shutdown: Option>, } impl Launcher { @@ -192,12 +192,16 @@ impl Launcher { let orchestrator_sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0); let listener = TcpListener::bind(sockaddr).await?; let port = listener.local_addr()?.port(); + // Resuse the same port we got for the TCP socket for vsock. + let vsock_listener = VsockListener::bind(VsockAddr::new(VMADDR_CID_HOST, port.into()))?; log::info!("Launcher service listening on port {port}"); - let (evidence_sender, evidence_receiver) = channel::(); - let (shutdown_sender, shutdown_receiver) = channel::<()>(); - let (app_notifier_sender, app_notifier_receiver) = channel::<()>(); + let (evidence_sender, evidence_receiver) = oneshot::channel::(); + let (shutdown_sender, mut shutdown_receiver) = watch::channel::<()>(()); + shutdown_receiver.mark_unchanged(); // Don't immediately notify on the initial value. + let (app_notifier_sender, app_notifier_receiver) = oneshot::channel::<()>(); let server = tokio::spawn(server::new( listener, + vsock_listener, args.system_image, args.container_bundle, args.application_config, diff --git a/oak_containers_launcher/src/qemu.rs b/oak_containers_launcher/src/qemu.rs index e46cb26b5e0..40dfcad83ed 100644 --- a/oak_containers_launcher/src/qemu.rs +++ b/oak_containers_launcher/src/qemu.rs @@ -262,6 +262,10 @@ impl Qemu { "brd.max_part=1", format!("ip={vm_address}:::255.255.255.0::eth0:off").as_str(), "quiet", + "--", + // Makes stage1 communicate to the launcher via virtio-vsock. Disabled for now. + //format!("--launcher-addr=vsock://{VMADDR_CID_HOST}:{launcher_service_port}") + // .as_str(), ] .join(" ") .as_str(), diff --git a/oak_containers_launcher/src/server.rs b/oak_containers_launcher/src/server.rs index a1e801f8a86..39cf9cddd22 100644 --- a/oak_containers_launcher/src/server.rs +++ b/oak_containers_launcher/src/server.rs @@ -37,9 +37,10 @@ use opentelemetry_proto::tonic::{ use tokio::{ io::{AsyncReadExt, BufReader}, net::TcpListener, - sync::oneshot::{Receiver, Sender}, + sync::{oneshot, watch}, }; use tokio_stream::wrappers::TcpListenerStream; +use tokio_vsock::VsockListener; use tonic::{transport::Server, Request, Response, Status}; use crate::proto::oak::containers::{ @@ -63,10 +64,10 @@ struct LauncherServerImplementation { container_bundle: std::path::PathBuf, application_config: Vec, // Will be used to send the Attestation Evidence to the Launcher. - evidence_sender: Mutex>>, + evidence_sender: Mutex>>, // Will be used to notify the untrusted application that the trusted application is ready and // listening on a socket address. - app_ready_notifier: Mutex>>, + app_ready_notifier: Mutex>>, } #[tonic::async_trait] @@ -259,14 +260,18 @@ impl LogsService for LauncherServerImplementation { } } +// Clippy is not wrong, but hopefully the situation with two listeners is only +// temporary. +#[allow(clippy::too_many_arguments)] pub async fn new( listener: TcpListener, + vsock_listener: VsockListener, system_image: std::path::PathBuf, container_bundle: std::path::PathBuf, application_config: Vec, - evidence_sender: Sender, - app_ready_notifier: Sender<()>, - shutdown: Receiver<()>, + evidence_sender: oneshot::Sender, + app_ready_notifier: oneshot::Sender<()>, + shutdown: watch::Receiver<()>, ) -> Result<(), anyhow::Error> { let server_impl = Arc::new(LauncherServerImplementation { system_image, @@ -275,12 +280,27 @@ pub async fn new( evidence_sender: Mutex::new(Some(evidence_sender)), app_ready_notifier: Mutex::new(Some(app_ready_notifier)), }); - Server::builder() + + let mut tcp_shutdown = shutdown.clone(); + let tcp_server = Server::builder() .add_service(LauncherServer::from_arc(server_impl.clone())) .add_service(HostlibKeyProvisioningServer::from_arc(server_impl.clone())) .add_service(MetricsServiceServer::from_arc(server_impl.clone())) - .add_service(LogsServiceServer::from_arc(server_impl)) - .serve_with_incoming_shutdown(TcpListenerStream::new(listener), shutdown.map(|_| ())) - .await + .add_service(LogsServiceServer::from_arc(server_impl.clone())) + .serve_with_incoming_shutdown( + TcpListenerStream::new(listener), + tcp_shutdown.changed().map(|_| ()), + ); + + let mut virtio_shutdown = shutdown.clone(); + let virtio_server = Server::builder() + .add_service(LauncherServer::from_arc(server_impl.clone())) + .serve_with_incoming_shutdown( + vsock_listener.incoming(), + virtio_shutdown.changed().map(|_| ()), + ); + + tokio::try_join!(tcp_server, virtio_server) + .map(|((), ())| ()) .map_err(|error| anyhow!("server error: {:?}", error)) } diff --git a/oak_containers_stage1/Cargo.toml b/oak_containers_stage1/Cargo.toml index 60a75d318a3..166599c4617 100644 --- a/oak_containers_stage1/Cargo.toml +++ b/oak_containers_stage1/Cargo.toml @@ -34,6 +34,7 @@ tokio = { version = "*", features = [ "process", "sync", ] } +tokio-vsock = { version = "*", features = ["tonic-conn"] } tonic = { workspace = true } tower = "*" x86_64 = "*" diff --git a/oak_containers_stage1/src/client.rs b/oak_containers_stage1/src/client.rs index ccd29719bb4..589f806de7b 100644 --- a/oak_containers_stage1/src/client.rs +++ b/oak_containers_stage1/src/client.rs @@ -25,7 +25,9 @@ mod proto { use anyhow::{Context, Result}; use proto::oak::containers::launcher_client::LauncherClient as GrpcLauncherClient; +use tokio_vsock::{VsockAddr, VsockStream}; use tonic::transport::{Channel, Uri}; +use tower::service_fn; pub struct LauncherClient { inner: GrpcLauncherClient, @@ -33,7 +35,23 @@ pub struct LauncherClient { impl LauncherClient { pub async fn new(addr: Uri) -> Result { - let inner = GrpcLauncherClient::::connect(addr).await?; + // vsock is unfortunately going to require special handling. + let inner = if addr.scheme_str() == Some("vsock") { + let vsock_addr = VsockAddr::new( + addr.host() + .unwrap_or(format!("{}", tokio_vsock::VMADDR_CID_HOST).as_str()) + .parse() + .context("invalid vsock CID")?, + addr.port_u16().context("invalid vsock port")?.into(), + ); + GrpcLauncherClient::new( + Channel::builder(addr) + .connect_with_connector(service_fn(move |_| VsockStream::connect(vsock_addr))) + .await?, + ) + } else { + GrpcLauncherClient::::connect(addr).await? + }; Ok(Self { inner }) } diff --git a/oak_containers_stage1/src/main.rs b/oak_containers_stage1/src/main.rs index 05b5d258b64..ad11dc62677 100644 --- a/oak_containers_stage1/src/main.rs +++ b/oak_containers_stage1/src/main.rs @@ -40,12 +40,13 @@ use nix::{ use oak_proto_rust::oak::attestation::v1::DiceData; use prost::Message; use tokio::process::Command; +use tonic::transport::Uri; use x86_64::PhysAddr; #[derive(Parser, Debug)] struct Args { #[arg(long, default_value = "http://10.0.2.100:8080")] - launcher_addr: String, + launcher_addr: Uri, #[arg(long, default_value = "/sbin/init")] init: String, @@ -90,7 +91,7 @@ async fn main() -> Result<(), Box> { chroot(".").context("failed to chroot to .")?; chdir("/").context("failed to chdir to /")?; - let mut client = LauncherClient::new(args.launcher_addr.parse()?) + let mut client = LauncherClient::new(args.launcher_addr) .await .context("error creating the launcher client")?;