Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
working status codes
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fisher <[email protected]>
  • Loading branch information
Matthew Fisher committed May 21, 2020
1 parent 86305c2 commit 48f3cc1
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 102 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
module_store_path.push("modules");
let store = FileModuleStore::new(client, &module_store_path);

let provider = Provider::new(store, kubeconfig.clone());
let provider = Provider::new(store, &config, kubeconfig.clone()).await?;
let kubelet = Kubelet::new(provider, kubeconfig, config);
kubelet.start().await
}
51 changes: 30 additions & 21 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
//! kubelet.start().await.unwrap();
//! };
//! ```
use std::path::PathBuf;

use k8s_openapi::api::core::v1::Pod as KubePod;
use kube::api::DeleteParams;
use kube::Api;
use kube::Config as KubeConfig;
use kubelet::handle::{key_from_pod, pod_key};
use kubelet::config::Config as KubeletConfig;
use kubelet::handle::{key_from_pod, pod_key, PodHandle};
use kubelet::module_store::ModuleStore;
use kubelet::provider::ProviderError;
use kubelet::Pod;
Expand All @@ -42,26 +44,31 @@ use std::sync::Arc;
use tokio::sync::RwLock;

mod runtime;
use runtime::Runtime;
use runtime::{HandleStopper, LogHandleFactory, Runtime};

const TARGET_WASM32_WASI: &str = "wasm32-wasi";
const LOG_DIR_NAME: &str = "wasm3-logs";

/// Provider provides a Kubelet runtime implementation that executes WASM
/// binaries conforming to the WASI spec
pub struct Provider<S> {
pods: Arc<RwLock<HashMap<String, HashMap<String, Runtime>>>>,
handles: Arc<RwLock<HashMap<String, PodHandle<HandleStopper, LogHandleFactory>>>>,
store: S,
log_path: PathBuf,
kubeconfig: KubeConfig,
}

impl<S: ModuleStore + Send + Sync> Provider<S> {
/// Create a new wasi provider from a module store and a kubelet config
pub fn new(store: S, kubeconfig: KubeConfig) -> Self {
Self {
pods: Default::default(),
pub async fn new(store: S, config: &KubeletConfig, kubeconfig: KubeConfig) -> anyhow::Result<Self> {
let log_path = config.data_dir.join(LOG_DIR_NAME);
tokio::fs::create_dir_all(&log_path).await?;
Ok(Self {
handles: Default::default(),
store,
log_path,
kubeconfig,
}
})
}
}

Expand All @@ -77,6 +84,7 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {

let pod_name = pod.name();
let mut containers = HashMap::new();
let client = kube::Client::new(self.kubeconfig.clone());

let mut modules = self.store.fetch_pod_modules(&pod).await?;
info!("Starting containers for pod {:?}", pod_name);
Expand All @@ -86,11 +94,11 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
.expect("FATAL ERROR: module map not properly populated");

// TODO: expose this as a feature flag (--stack-size)
let mut runtime = Runtime::new(module_data, (1024 * 60) as u32);
let mut runtime = Runtime::new(module_data, (1024 * 60) as u32, self.log_path.clone()).await?;

debug!("Starting container {} on thread", container.name);
runtime.start()?;
containers.insert(container.name.clone(), runtime);
let handle = runtime.start().await?;
containers.insert(container.name.clone(), handle);
}
info!(
"All containers started for pod {:?}. Updating status",
Expand All @@ -100,8 +108,11 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
// Wrap this in a block so the write lock goes out of scope when we are done
{
// Grab the entry while we are creating things
let mut pods = self.pods.write().await;
pods.insert(key_from_pod(&pod), containers);
let mut handles = self.handles.write().await;
handles.insert(
key_from_pod(&pod),
PodHandle::new(containers, pod, client, None)?,
);
}

Ok(())
Expand All @@ -122,12 +133,10 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
);
trace!("Modified pod spec: {:#?}", pod.as_kube_pod());
if let Some(_timestamp) = pod.deletion_timestamp() {
let mut pods = self.pods.write().await;
match pods.get_mut(&key_from_pod(&pod)) {
let mut handles = self.handles.write().await;
match handles.get_mut(&key_from_pod(&pod)) {
Some(h) => {
for (_name, runtime) in h {
runtime.stop()?;
}
h.stop().await?;
// Follow up with a delete when everything is stopped
let dp = DeleteParams {
grace_period_seconds: Some(0),
Expand Down Expand Up @@ -162,8 +171,8 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
}

async fn delete(&self, pod: Pod) -> anyhow::Result<()> {
let mut pods = self.pods.write().await;
match pods.remove(&key_from_pod(&pod)) {
let mut handles = self.handles.write().await;
match handles.remove(&key_from_pod(&pod)) {
Some(_) => debug!(
"Pod {} in namespace {} removed",
pod.name(),
Expand All @@ -185,8 +194,8 @@ impl<S: ModuleStore + Send + Sync> kubelet::Provider for Provider<S> {
_container_name: String,
_sender: kubelet::LogSender,
) -> anyhow::Result<()> {
let mut pods = self.pods.write().await;
let _containers = pods
let mut handles = self.handles.write().await;
let _containers = handles
.get_mut(&pod_key(&namespace, &pod_name))
.ok_or_else(|| ProviderError::PodNotFound {
pod_name: pod_name.clone(),
Expand Down
171 changes: 91 additions & 80 deletions src/provider/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,104 +1,115 @@
use std::error;
use std::fmt;
use std::path::Path;
use std::sync::Arc;

use wasm3::Environment;
use tempfile::NamedTempFile;
use tokio::sync::watch::{self, Sender};
use tokio::task::JoinHandle;
use wasm3::{Environment, Module};
use kubelet::handle::{RuntimeHandle, Stop};
use kubelet::status::ContainerStatus;

type Result<T> = std::result::Result<T, RuntimeError>;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RuntimeError {
kind: RuntimeErrorKind,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum RuntimeErrorKind {
AlreadyStarted,
CannotCreateRuntime,
CannotLinkWASI,
CannotLoadModule,
NoEntrypoint,
RunFailure,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum RuntimeStatus {
Running,
Stopped,
}

impl RuntimeError {
fn new(kind: RuntimeErrorKind) -> Self {
Self { kind: kind }
}

fn __description(&self) -> &str {
match self.kind {
RuntimeErrorKind::AlreadyStarted => "runtime already started",
RuntimeErrorKind::CannotCreateRuntime => "cannot create runtime",
RuntimeErrorKind::CannotLinkWASI => "cannot link module to the WASI runtime",
RuntimeErrorKind::CannotLoadModule => "cannot load module",
RuntimeErrorKind::NoEntrypoint => "no entrypoint function called '_start' found",
RuntimeErrorKind::RunFailure => "failure during function call",
}
}
pub struct HandleStopper {
handle: JoinHandle<anyhow::Result<()>>,
}

impl fmt::Display for RuntimeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.__description())
#[async_trait::async_trait]
impl Stop for HandleStopper {
async fn stop(&mut self) -> anyhow::Result<()> {
// no nothing
Ok(())
}
}

impl error::Error for RuntimeError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
// source is not tracked
None
async fn wait(&mut self) -> anyhow::Result<()> {
(&mut self.handle).await??;
Ok(())
}
}

/// A runtime context for running a wasm module with wasm3
pub struct Runtime {
module_bytes: Vec<u8>,
stack_size: u32,
current_status: RuntimeStatus,
output: Arc<NamedTempFile>,
}

impl Runtime {
pub fn new(module_bytes: Vec<u8>, stack_size: u32) -> Self {
Self {
pub async fn new<L: AsRef<Path> + Send + Sync + 'static>(module_bytes: Vec<u8>, stack_size: u32, log_dir: L) -> anyhow::Result<Self> {
let temp = tokio::task::spawn_blocking(move || -> anyhow::Result<NamedTempFile> {
Ok(NamedTempFile::new_in(log_dir)?)
})
.await??;

Ok(Self {
module_bytes: module_bytes,
stack_size: stack_size,
current_status: RuntimeStatus::Stopped,
}
output: Arc::new(temp),
})
}

pub fn start(&mut self) -> Result<()> {
if self.current_status == RuntimeStatus::Running {
return Err(RuntimeError::new(RuntimeErrorKind::AlreadyStarted));
}
let env = Environment::new()
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotCreateRuntime))?;
let rt = env
.create_runtime(self.stack_size)
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotCreateRuntime))?;
let mut module = rt
.parse_and_load_module(&self.module_bytes)
.map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLoadModule))?;
module.link_wasi().map_err(|_| RuntimeError::new(RuntimeErrorKind::CannotLinkWASI))?;
let func = module
.find_function::<(), ()>("_start")
.map_err(|_| RuntimeError::new(RuntimeErrorKind::NoEntrypoint))?;
self.current_status = RuntimeStatus::Running;
// FIXME: run this in the background
// for now, we block until the function is complete, then call .stop()
func.call()
.map_err(|_| RuntimeError::new(RuntimeErrorKind::RunFailure))?;
self.stop()
pub async fn start(&mut self) -> anyhow::Result<RuntimeHandle<HandleStopper, LogHandleFactory>> {
let temp = self.output.clone();
let output_write = tokio::task::spawn_blocking(move || -> anyhow::Result<std::fs::File> {
Ok(temp.reopen()?)
})
.await??;

let (status_sender, status_recv) = watch::channel(ContainerStatus::Waiting {
timestamp: chrono::Utc::now(),
message: "No status has been received from the process".into(),
});
let handle = spawn_wasm3(self.module_bytes.clone(), self.stack_size, status_sender, output_write).await?;


let log_handle_factory = LogHandleFactory {
temp: self.output.clone(),
};

Ok(RuntimeHandle::new(
HandleStopper{handle},
log_handle_factory,
status_recv,
))
}
}

pub fn stop(&mut self) -> Result<()> {
// it is OK for the runtime to stop an already stopped module. Effectively a no-op
self.current_status = RuntimeStatus::Stopped;
Ok(())
/// Holds our tempfile handle.
pub struct LogHandleFactory {
temp: Arc<NamedTempFile>,
}

impl kubelet::handle::LogHandleFactory<tokio::fs::File> for LogHandleFactory {
/// Creates `tokio::fs::File` on demand for log reading.
fn new_handle(&self) -> tokio::fs::File {
tokio::fs::File::from_std(self.temp.reopen().unwrap())
}
}

// Spawns a running wasmtime instance with the given context and status
// channel. Due to the Instance type not being Send safe, all of the logic
// needs to be done within the spawned task
async fn spawn_wasm3(
module_bytes: Vec<u8>,
stack_size: u32,
status_sender: Sender<ContainerStatus>,
_output_write: std::fs::File, //TODO: hook this up such that log output will be written to the file
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let handle = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let env = Environment::new().expect("cannot create environment");
let rt = env.create_runtime(stack_size).expect("cannot create runtime");
let module = Module::parse(&env, &module_bytes).expect("cannot parse module");
let mut module = rt.load_module(module).expect("cannot load module");
module.link_wasi().expect("cannot link WASI");
let func = module.find_function::<(), ()>("_start").expect("cannot find function '_start' in module");
func.call().expect("cannot call '_start' in module");
status_sender
.broadcast(ContainerStatus::Terminated {
failed: false,
message: "Module run completed".into(),
timestamp: chrono::Utc::now(),
})
.expect("status should be able to send");
Ok(())
});

Ok(handle)
}

0 comments on commit 48f3cc1

Please sign in to comment.