From 004f46322dc112d92519280aa4dfebe84e1e6e6a Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 13:29:39 +0100 Subject: [PATCH 01/14] feature/nanocld: introduced process trait --- bin/nanocld/src/objects/cargo.rs | 16 +- bin/nanocld/src/objects/generic/mod.rs | 2 + bin/nanocld/src/objects/generic/process.rs | 180 ++++++++++++++++++ bin/nanocld/src/objects/job.rs | 24 ++- bin/nanocld/src/objects/vm.rs | 15 +- bin/nanocld/src/repositories/process.rs | 19 ++ bin/nanocld/src/services/process.rs | 33 +++- bin/nanocld/src/utils/cargo.rs | 24 +-- bin/nanocld/src/utils/namespace.rs | 5 +- bin/nanocld/src/utils/process.rs | 201 +-------------------- bin/nanocld/src/utils/vm.rs | 6 +- crates/nanocl_stubs/src/process.rs | 18 ++ 12 files changed, 302 insertions(+), 241 deletions(-) create mode 100644 bin/nanocld/src/objects/generic/process.rs diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index 430583d94..617f0e829 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -4,7 +4,7 @@ use bollard_next::{ container::{RemoveContainerOptions, Config}, }; -use nanocl_error::http::{HttpResult, HttpError}; +use nanocl_error::http::HttpResult; use nanocl_stubs::{ process::ProcessKind, cargo::{Cargo, CargoDeleteQuery, CargoInspect}, @@ -22,6 +22,12 @@ use crate::{ use super::generic::*; +impl ObjProcess for CargoDb { + fn get_kind() -> ProcessKind { + ProcessKind::Cargo + } +} + impl ObjCreate for CargoDb { type ObjCreateIn = CargoObjCreateIn; type ObjCreateOut = Cargo; @@ -75,7 +81,7 @@ impl ObjDelByPk for CargoDb { processes .into_iter() .map(|process| async move { - utils::process::remove( + CargoDb::del_process_by_pk( &process.key, Some(RemoveContainerOptions { force: opts.force.unwrap_or(false), @@ -86,10 +92,10 @@ impl ObjDelByPk for CargoDb { .await }) .collect::>() - .collect::>>() + .collect::>>() .await .into_iter() - .collect::, _>>()?; + .collect::>>()?; CargoDb::del_by_pk(pk, &state.pool).await?; SpecDb::del_by_kind_key(pk, &state.pool).await?; Ok(cargo) @@ -130,7 +136,7 @@ impl ObjPutByPk for CargoDb { Ok(instances) => instances, }; // start created containers - match utils::process::start_by_kind(&ProcessKind::Cargo, pk, state).await { + match CargoDb::start_process_by_kind_pk(pk, state).await { Err(err) => { log::error!( "Unable to start cargo instance {} : {err}", diff --git a/bin/nanocld/src/objects/generic/mod.rs b/bin/nanocld/src/objects/generic/mod.rs index f3a5de4f5..5acf003b0 100644 --- a/bin/nanocld/src/objects/generic/mod.rs +++ b/bin/nanocld/src/objects/generic/mod.rs @@ -3,9 +3,11 @@ mod delete; mod patch; mod put; mod inspect; +mod process; pub use create::*; pub use delete::*; pub use patch::*; pub use put::*; pub use inspect::*; +pub use process::*; diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs new file mode 100644 index 000000000..9ed4a9d4f --- /dev/null +++ b/bin/nanocld/src/objects/generic/process.rs @@ -0,0 +1,180 @@ +use bollard_next::container::{ + RemoveContainerOptions, StartContainerOptions, StopContainerOptions, Config, + CreateContainerOptions, InspectContainerOptions, +}; +use nanocl_error::{ + http::{HttpResult, HttpError}, + io::FromIo, +}; +use nanocl_stubs::{ + process::{ProcessKind, ProcessPartial, Process}, + system::NativeEventAction, +}; + +use crate::{ + repositories::generic::*, + models::{SystemState, ProcessDb, VmDb, CargoDb, JobDb, JobUpdateDb}, +}; + +/// Represent a object that is treated as a process +/// That you can start, restart, stop, logs, etc. +pub trait ObjProcess { + fn get_kind() -> ProcessKind; + + async fn _emit( + kind_key: &str, + action: NativeEventAction, + state: &SystemState, + ) -> HttpResult<()> { + match Self::get_kind() { + ProcessKind::Vm => { + let vm = VmDb::transform_read_by_pk(kind_key, &state.pool).await?; + state.emit_normal_native_action(&vm, action); + } + ProcessKind::Cargo => { + let cargo = + CargoDb::transform_read_by_pk(kind_key, &state.pool).await?; + state.emit_normal_native_action(&cargo, action); + } + ProcessKind::Job => { + JobDb::update_pk( + kind_key, + JobUpdateDb { + updated_at: Some(chrono::Utc::now().naive_utc()), + }, + &state.pool, + ) + .await?; + let job = JobDb::read_by_pk(kind_key, &state.pool) + .await? + .try_to_spec()?; + state.emit_normal_native_action(&job, action); + } + } + Ok(()) + } + + async fn create_process( + name: &str, + kind_key: &str, + item: Config, + state: &SystemState, + ) -> HttpResult { + let kind = Self::get_kind(); + let mut config = item.clone(); + let mut labels = item.labels.to_owned().unwrap_or_default(); + labels.insert("io.nanocl".to_owned(), "enabled".to_owned()); + labels.insert("io.nanocl.kind".to_owned(), kind.to_string()); + config.labels = Some(labels); + let res = state + .docker_api + .create_container( + Some(CreateContainerOptions { + name, + ..Default::default() + }), + config, + ) + .await?; + let inspect = state + .docker_api + .inspect_container(&res.id, None::) + .await?; + let created_at = inspect.created.clone().unwrap_or_default(); + let new_instance = ProcessPartial { + key: res.id, + name: name.to_owned(), + kind, + data: serde_json::to_value(&inspect) + .map_err(|err| err.map_err_context(|| "CreateProcess"))?, + node_key: state.config.hostname.clone(), + kind_key: kind_key.to_owned(), + created_at: Some( + chrono::NaiveDateTime::parse_from_str( + &created_at, + "%Y-%m-%dT%H:%M:%S%.fZ", + ) + .map_err(|err| { + HttpError::internal_server_error(format!( + "Unable to parse date {err}" + )) + })?, + ), + }; + let process = ProcessDb::create_from(&new_instance, &state.pool).await?; + Process::try_from(process) + .map_err(|err| HttpError::internal_server_error(err.to_string())) + } + + async fn start_process_by_kind_pk( + kind_pk: &str, + state: &SystemState, + ) -> HttpResult<()> { + let processes = ProcessDb::read_by_kind_key(kind_pk, &state.pool).await?; + log::debug!("start_process_by_kind_pk: {kind_pk}"); + for process in processes { + let process_state = process.data.state.unwrap_or_default(); + if process_state.running.unwrap_or_default() { + return Ok(()); + } + state + .docker_api + .start_container( + &process.data.id.unwrap_or_default(), + None::>, + ) + .await?; + } + Self::_emit(kind_pk, NativeEventAction::Create, state).await?; + Ok(()) + } + + async fn stop_process_by_kind_pk( + kind_pk: &str, + state: &SystemState, + ) -> HttpResult<()> { + let processes = ProcessDb::read_by_kind_key(kind_pk, &state.pool).await?; + log::debug!("stop_process_by_kind_pk: {kind_pk}"); + for process in processes { + let process_state = process.data.state.unwrap_or_default(); + if !process_state.running.unwrap_or_default() { + return Ok(()); + } + state + .docker_api + .stop_container( + &process.data.id.unwrap_or_default(), + None::, + ) + .await?; + } + Self::_emit(kind_pk, NativeEventAction::Stop, state).await?; + Ok(()) + } + + /// Delete a process by pk + async fn del_process_by_pk( + pk: &str, + opts: Option, + state: &SystemState, + ) -> HttpResult<()> { + match state.docker_api.remove_container(pk, opts).await { + Ok(_) => {} + Err(err) => match &err { + bollard_next::errors::Error::DockerResponseServerError { + status_code, + message: _, + } => { + if *status_code != 404 { + return Err(err.into()); + } + } + _ => { + return Err(err.into()); + } + }, + }; + ProcessDb::del_by_pk(pk, &state.pool).await?; + Ok(()) + } +} diff --git a/bin/nanocld/src/objects/job.rs b/bin/nanocld/src/objects/job.rs index 294c83f43..1afde8489 100644 --- a/bin/nanocld/src/objects/job.rs +++ b/bin/nanocld/src/objects/job.rs @@ -2,7 +2,10 @@ use bollard_next::container::RemoveContainerOptions; use futures_util::{StreamExt, stream::FuturesUnordered}; use nanocl_error::http::{HttpResult, HttpError}; -use nanocl_stubs::job::{Job, JobPartial, JobInspect}; +use nanocl_stubs::{ + job::{Job, JobPartial, JobInspect}, + process::ProcessKind, +}; use crate::{ utils, @@ -12,6 +15,12 @@ use crate::{ use super::generic::*; +impl ObjProcess for JobDb { + fn get_kind() -> ProcessKind { + ProcessKind::Job + } +} + impl ObjCreate for JobDb { type ObjCreateIn = JobPartial; type ObjCreateOut = Job; @@ -36,16 +45,15 @@ impl ObjCreate for JobDb { container.labels = Some(labels); let short_id = utils::key::generate_short_id(6); let name = format!("{job_name}-{short_id}.j"); - utils::process::create(&name, "job", &job_name, container, state) - .await?; + JobDb::create_process(&name, &job_name, container, state).await?; Ok::<_, HttpError>(()) } }) .collect::>() - .collect::>>() + .collect::>>() .await .into_iter() - .collect::, _>>()?; + .collect::>>()?; if let Some(schedule) = &job.schedule { utils::job::add_cron_rule(&job, schedule, state).await?; } @@ -67,7 +75,7 @@ impl ObjDelByPk for JobDb { processes .into_iter() .map(|process| async move { - utils::process::remove( + JobDb::del_process_by_pk( &process.key, Some(RemoveContainerOptions { force: true, @@ -78,10 +86,10 @@ impl ObjDelByPk for JobDb { .await }) .collect::>() - .collect::>>() + .collect::>>() .await .into_iter() - .collect::, _>>()?; + .collect::>>()?; JobDb::del_by_pk(&job.name, &state.pool).await?; if job.schedule.is_some() { utils::job::remove_cron_rule(&job, state).await?; diff --git a/bin/nanocld/src/objects/vm.rs b/bin/nanocld/src/objects/vm.rs index 550808794..ee6692ad7 100644 --- a/bin/nanocld/src/objects/vm.rs +++ b/bin/nanocld/src/objects/vm.rs @@ -17,6 +17,12 @@ use crate::{ }; use super::generic::*; +impl ObjProcess for VmDb { + fn get_kind() -> ProcessKind { + ProcessKind::Vm + } +} + impl ObjCreate for VmDb { type ObjCreateIn = VmObjCreateIn; type ObjCreateOut = Vm; @@ -73,7 +79,7 @@ impl ObjDelByPk for VmDb { ..Default::default() }; let container_name = format!("{}.v", pk); - utils::process::remove(&container_name, Some(options), state).await?; + VmDb::del_process_by_pk(&container_name, Some(options), state).await?; VmDb::del_by_pk(pk, &state.pool).await?; SpecDb::del_by_kind_key(pk, &state.pool).await?; utils::vm_image::delete_by_name(&vm.spec.disk.image, &state.pool).await?; @@ -92,8 +98,8 @@ impl ObjPutByPk for VmDb { ) -> HttpResult { let vm = VmDb::transform_read_by_pk(pk, &state.pool).await?; let container_name = format!("{}.v", &vm.spec.vm_key); - utils::process::stop_by_kind(&ProcessKind::Vm, pk, state).await?; - utils::process::remove( + VmDb::stop_process_by_kind_pk(pk, state).await?; + VmDb::del_process_by_pk( &container_name, None::, state, @@ -108,8 +114,7 @@ impl ObjPutByPk for VmDb { .await?; let image = VmImageDb::read_by_pk(&vm.spec.disk.image, &state.pool).await?; utils::vm::create_instance(&vm, &image, false, state).await?; - utils::process::start_by_kind(&ProcessKind::Vm, &vm.spec.vm_key, state) - .await?; + VmDb::start_process_by_kind_pk(&vm.spec.vm_key, state).await?; Ok(vm) } } diff --git a/bin/nanocld/src/repositories/process.rs b/bin/nanocld/src/repositories/process.rs index 5761d51a0..a2b6389af 100644 --- a/bin/nanocld/src/repositories/process.rs +++ b/bin/nanocld/src/repositories/process.rs @@ -118,3 +118,22 @@ impl ProcessDb { ProcessDb::transform_read_by(&filter, pool).await } } + +impl ProcessDb { + pub async fn list_by_namespace( + name: &str, + pool: &Pool, + ) -> IoResult> { + let filter = GenericFilter::new().r#where( + "data", + GenericClause::Contains(serde_json::json!({ + "Config": { + "Labels": { + "io.nanocl.n": name + } + } + })), + ); + ProcessDb::transform_read_by(&filter, pool).await + } +} diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index fe4da3a21..8854b0cd2 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -6,13 +6,14 @@ use nanocl_error::http::{HttpResult, HttpError}; use bollard_next::container::LogsOptions; use nanocl_stubs::{ generic::{GenericNspQuery, GenericFilter, GenericListQuery}, - process::{ProcessLogQuery, ProcessOutputLog}, + process::{ProcessLogQuery, ProcessOutputLog, ProcessKind}, }; use crate::{ utils, repositories::generic::*, - models::{SystemState, ProcessDb}, + models::{SystemState, ProcessDb, VmDb, JobDb}, + objects::generic::ObjProcess, }; /// List process (Vm, Job, Cargo) @@ -128,8 +129,18 @@ pub async fn start_process( ) -> HttpResult { let (_, kind, name) = path.into_inner(); let kind = utils::process::parse_kind(&kind)?; - let kind_key = utils::key::gen_kind_key(&kind, &name, &qs.namespace); - utils::process::start_by_kind(&kind, &kind_key, &state).await?; + let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); + match &kind { + ProcessKind::Vm => { + VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Job => { + JobDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Cargo => { + VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + } Ok(web::HttpResponse::Accepted().finish()) } @@ -156,8 +167,18 @@ pub async fn stop_process( ) -> HttpResult { let (_, kind, name) = path.into_inner(); let kind = utils::process::parse_kind(&kind)?; - let kind_key = utils::key::gen_kind_key(&kind, &name, &qs.namespace); - utils::process::stop_by_kind(&kind, &kind_key, &state).await?; + let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); + match &kind { + ProcessKind::Vm => { + VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Job => { + JobDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Cargo => { + VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + } + } Ok(web::HttpResponse::Accepted().finish()) } diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index 2b980c393..bc81ea03f 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -20,6 +20,7 @@ use crate::{ utils, repositories::generic::*, models::{SystemState, CargoDb, ProcessDb, NamespaceDb, SecretDb, SpecDb}, + objects::generic::ObjProcess, }; use super::stream::transform_stream; @@ -50,14 +51,8 @@ async fn execute_before(cargo: &Cargo, state: &SystemState) -> HttpResult<()> { "init-{}-{}.{}.c", cargo.spec.name, short_id, cargo.namespace_name ); - utils::process::create( - &name, - "cargo", - &cargo.spec.cargo_key, - before, - state, - ) - .await?; + CargoDb::create_process(&name, &cargo.spec.cargo_key, before, state) + .await?; state .docker_api .start_container(&name, None::>) @@ -209,15 +204,14 @@ pub async fn create_instances( }), ..container }; - let res = utils::process::create(&name, "cargo", &cargo.spec.cargo_key, new_process, state).await?; - Ok::<_, HttpError>(res) + CargoDb::create_process(&name, &cargo.spec.cargo_key, new_process, state).await } }) .collect::>() - .collect::>>() + .collect::>>() .await .into_iter() - .collect::, HttpError>>() + .collect::>>() } /// The instances (containers) are deleted but the cargo is not. @@ -229,7 +223,7 @@ pub async fn delete_instances( instances .iter() .map(|id| async { - utils::process::remove( + CargoDb::del_process_by_pk( id, Some(RemoveContainerOptions { force: true, @@ -240,10 +234,10 @@ pub async fn delete_instances( .await }) .collect::>() - .collect::>>() + .collect::>>() .await .into_iter() - .collect::>() + .collect::>() } /// Restart cargo instances (containers) by key diff --git a/bin/nanocld/src/utils/namespace.rs b/bin/nanocld/src/utils/namespace.rs index d2546af26..d10c8c7e5 100644 --- a/bin/nanocld/src/utils/namespace.rs +++ b/bin/nanocld/src/utils/namespace.rs @@ -4,9 +4,8 @@ use bollard_next::network::InspectNetworkOptions; use nanocl_stubs::{generic::GenericFilter, namespace::NamespaceSummary}; use crate::{ - utils, repositories::generic::*, - models::{CargoDb, NamespaceDb, SystemState}, + models::{CargoDb, NamespaceDb, SystemState, ProcessDb}, }; /// List all existing namespaces @@ -20,7 +19,7 @@ pub async fn list( let cargo_count = CargoDb::count_by_namespace(&item.name, &state.pool).await?; let processes = - utils::process::list_by_namespace(&item.name, state).await?; + ProcessDb::list_by_namespace(&item.name, &state.pool).await?; let network = state .docker_api .inspect_network(&item.name, None::>) diff --git a/bin/nanocld/src/utils/process.rs b/bin/nanocld/src/utils/process.rs index 2e9c63e43..6c95cd165 100644 --- a/bin/nanocld/src/utils/process.rs +++ b/bin/nanocld/src/utils/process.rs @@ -1,57 +1,11 @@ -use nanocl_error::{ - io::FromIo, - http::{HttpResult, HttpError}, -}; +use nanocl_error::http::{HttpResult, HttpError}; -use bollard_next::container::{ - StartContainerOptions, Config, CreateContainerOptions, - InspectContainerOptions, StopContainerOptions, RemoveContainerOptions, -}; - -use nanocl_stubs::{ - system::NativeEventAction, - process::{Process, ProcessKind, ProcessPartial}, - generic::{GenericFilter, GenericClause}, -}; - -use crate::{ - repositories::generic::*, - models::{SystemState, ProcessDb, JobDb, JobUpdateDb, VmDb, CargoDb}, -}; - -async fn after( - kind: &ProcessKind, - kind_key: &str, - action: NativeEventAction, - state: &SystemState, -) -> HttpResult<()> { - match kind { - ProcessKind::Vm => { - let vm = VmDb::transform_read_by_pk(kind_key, &state.pool).await?; - state.emit_normal_native_action(&vm, action); - } - ProcessKind::Cargo => { - let cargo = CargoDb::transform_read_by_pk(kind_key, &state.pool).await?; - state.emit_normal_native_action(&cargo, action); - } - ProcessKind::Job => { - JobDb::update_pk( - kind_key, - JobUpdateDb { - updated_at: Some(chrono::Utc::now().naive_utc()), - }, - &state.pool, - ) - .await?; - } - } - Ok(()) -} +use nanocl_stubs::process::{Process, ProcessKind}; pub fn parse_kind(kind: &str) -> HttpResult { - kind.to_owned().try_into().map_err(|err: std::io::Error| { - HttpError::internal_server_error(err.to_string()) - }) + kind + .parse() + .map_err(|err| HttpError::bad_request(format!("Invalid kind {kind} {err}"))) } /// Count the number of instances running failing or success @@ -90,148 +44,3 @@ pub fn count_status(instances: &[Process]) -> (usize, usize, usize, usize) { instance_running, ) } - -pub async fn create( - name: &str, - kind: &str, - kind_key: &str, - item: Config, - state: &SystemState, -) -> HttpResult { - let kind: ProcessKind = - kind.to_owned().try_into().map_err(|err: std::io::Error| { - HttpError::internal_server_error(err.to_string()) - })?; - let mut config = item.clone(); - let mut labels = item.labels.to_owned().unwrap_or_default(); - labels.insert("io.nanocl".to_owned(), "enabled".to_owned()); - labels.insert("io.nanocl.kind".to_owned(), kind.to_string()); - config.labels = Some(labels); - let res = state - .docker_api - .create_container( - Some(CreateContainerOptions { - name, - ..Default::default() - }), - config, - ) - .await?; - let inspect = state - .docker_api - .inspect_container(&res.id, None::) - .await?; - let created_at = inspect.created.clone().unwrap_or_default(); - let new_instance = ProcessPartial { - key: res.id, - name: name.to_owned(), - kind, - data: serde_json::to_value(&inspect) - .map_err(|err| err.map_err_context(|| "CreateProcess"))?, - node_key: state.config.hostname.clone(), - kind_key: kind_key.to_owned(), - created_at: Some( - chrono::NaiveDateTime::parse_from_str( - &created_at, - "%Y-%m-%dT%H:%M:%S%.fZ", - ) - .map_err(|err| { - HttpError::internal_server_error(format!("Unable to parse date {err}")) - })?, - ), - }; - let process = ProcessDb::create_from(&new_instance, &state.pool).await?; - Process::try_from(process) - .map_err(|err| HttpError::internal_server_error(err.to_string())) -} - -pub async fn remove( - key: &str, - opts: Option, - state: &SystemState, -) -> HttpResult<()> { - match state.docker_api.remove_container(key, opts).await { - Ok(_) => {} - Err(err) => match &err { - bollard_next::errors::Error::DockerResponseServerError { - status_code, - message: _, - } => { - if *status_code != 404 { - return Err(err.into()); - } - } - _ => { - return Err(err.into()); - } - }, - }; - ProcessDb::del_by_pk(key, &state.pool).await?; - Ok(()) -} - -pub async fn start_by_kind( - kind: &ProcessKind, - kind_key: &str, - state: &SystemState, -) -> HttpResult<()> { - let processes = ProcessDb::read_by_kind_key(kind_key, &state.pool).await?; - log::debug!("process::start_by_kind: {kind_key}"); - for process in processes { - let process_state = process.data.state.unwrap_or_default(); - if process_state.running.unwrap_or_default() { - return Ok(()); - } - state - .docker_api - .start_container( - &process.data.id.unwrap_or_default(), - None::>, - ) - .await?; - } - after(kind, kind_key, NativeEventAction::Start, state).await?; - Ok(()) -} - -pub async fn stop_by_kind( - kind: &ProcessKind, - kind_key: &str, - state: &SystemState, -) -> HttpResult<()> { - let processes = ProcessDb::read_by_kind_key(kind_key, &state.pool).await?; - log::debug!("process::stop_by_kind: {kind:#?} {kind_key}"); - for process in processes { - let process_state = process.data.state.unwrap_or_default(); - if !process_state.running.unwrap_or_default() { - return Ok(()); - } - state - .docker_api - .stop_container( - &process.data.id.unwrap_or_default(), - None::, - ) - .await?; - } - after(kind, kind_key, NativeEventAction::Stop, state).await?; - Ok(()) -} - -pub async fn list_by_namespace( - name: &str, - state: &SystemState, -) -> HttpResult> { - let filter = GenericFilter::new().r#where( - "data", - GenericClause::Contains(serde_json::json!({ - "Config": { - "Labels": { - "io.nanocl.n": name - } - } - })), - ); - let items = ProcessDb::transform_read_by(&filter, &state.pool).await?; - Ok(items) -} diff --git a/bin/nanocld/src/utils/vm.rs b/bin/nanocld/src/utils/vm.rs index 1a86d9ddb..6407da4f9 100644 --- a/bin/nanocld/src/utils/vm.rs +++ b/bin/nanocld/src/utils/vm.rs @@ -7,8 +7,8 @@ use nanocl_error::http::HttpResult; use nanocl_stubs::vm::Vm; use crate::{ - utils, - models::{VmImageDb, SystemState}, + objects::generic::*, + models::{VmImageDb, SystemState, VmDb}, }; /// Create a VM instance from a VM image @@ -110,6 +110,6 @@ pub async fn create_instance( ..Default::default() }; let name = format!("{}.v", &vm.spec.vm_key); - utils::process::create(&name, "vm", &vm.spec.vm_key, spec, state).await?; + VmDb::create_process(&name, &vm.spec.vm_key, spec, state).await?; Ok(()) } diff --git a/crates/nanocl_stubs/src/process.rs b/crates/nanocl_stubs/src/process.rs index 770b9b6c0..19154bac0 100644 --- a/crates/nanocl_stubs/src/process.rs +++ b/crates/nanocl_stubs/src/process.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + #[cfg(feature = "serde")] use serde::{Serialize, Deserialize}; @@ -15,6 +17,22 @@ pub enum ProcessKind { Cargo, } +impl FromStr for ProcessKind { + type Err = std::io::Error; + + fn from_str(s: &str) -> Result { + match s { + "vm" => Ok(Self::Vm), + "job" => Ok(Self::Job), + "cargo" => Ok(Self::Cargo), + _ => Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Invalid process kind {s}"), + )), + } + } +} + impl TryFrom for ProcessKind { type Error = std::io::Error; From d15bc75d0500e11f89152b2b8c7c58689a8e30a5 Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:01:40 +0100 Subject: [PATCH 02/14] fix start --- bin/nanocld/src/services/process.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 8854b0cd2..2e9eae212 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -132,13 +132,13 @@ pub async fn start_process( let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { - VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + VmDb::start_process_by_kind_pk(&kind_pk, &state).await?; } ProcessKind::Job => { - JobDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + JobDb::start_process_by_kind_pk(&kind_pk, &state).await?; } ProcessKind::Cargo => { - VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + VmDb::start_process_by_kind_pk(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) From d065851b913bdb6db5a7aad37ff5fb25555e30aa Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:09:59 +0100 Subject: [PATCH 03/14] fix start again --- bin/nanocld/src/services/process.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 2e9eae212..6ac45e0f8 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -12,7 +12,7 @@ use nanocl_stubs::{ use crate::{ utils, repositories::generic::*, - models::{SystemState, ProcessDb, VmDb, JobDb}, + models::{SystemState, ProcessDb, VmDb, JobDb, CargoDb}, objects::generic::ObjProcess, }; @@ -138,7 +138,7 @@ pub async fn start_process( JobDb::start_process_by_kind_pk(&kind_pk, &state).await?; } ProcessKind::Cargo => { - VmDb::start_process_by_kind_pk(&kind_pk, &state).await?; + CargoDb::start_process_by_kind_pk(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) From 6be61f581fa18aa4a62e41fa349c58a66d9453ea Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:10:15 +0100 Subject: [PATCH 04/14] fix stop --- bin/nanocld/src/services/process.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 6ac45e0f8..ef23a26e3 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -176,7 +176,7 @@ pub async fn stop_process( JobDb::stop_process_by_kind_pk(&kind_pk, &state).await?; } ProcessKind::Cargo => { - VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + CargoDb::stop_process_by_kind_pk(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) From 5a85c6199672b129ebd4baeb6eb62363a7df4627 Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:23:06 +0100 Subject: [PATCH 05/14] better error handling --- bin/nanocld/src/objects/generic/process.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs index 9ed4a9d4f..6706a4734 100644 --- a/bin/nanocld/src/objects/generic/process.rs +++ b/bin/nanocld/src/objects/generic/process.rs @@ -102,8 +102,7 @@ pub trait ObjProcess { ), }; let process = ProcessDb::create_from(&new_instance, &state.pool).await?; - Process::try_from(process) - .map_err(|err| HttpError::internal_server_error(err.to_string())) + Process::try_from(process).map_err(HttpError::from) } async fn start_process_by_kind_pk( From 4db4df1e90382488011f518796443ca94bcc1d1b Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:25:44 +0100 Subject: [PATCH 06/14] removed useless utils --- bin/nanocld/src/services/process.rs | 8 ++++---- bin/nanocld/src/utils/process.rs | 10 +--------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index ef23a26e3..68ea55705 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -67,7 +67,7 @@ async fn logs_process( qs: web::types::Query, ) -> HttpResult { let (_, kind, name) = path.into_inner(); - let kind = utils::process::parse_kind(&kind)?; + let kind = kind.parse().map_err(HttpError::bad_request)?; let kind_key = utils::key::gen_kind_key(&kind, &name, &qs.namespace); let processes = ProcessDb::read_by_kind_key(&kind_key, &state.pool).await?; log::debug!("process::logs_process: {kind_key}"); @@ -121,14 +121,14 @@ async fn logs_process( (status = 404, description = "Process does not exist"), ), ))] -#[web::post("/processes/{type}/{name}/start")] +#[web::post("/processes/{kind}/{name}/start")] pub async fn start_process( state: web::types::State, path: web::types::Path<(String, String, String)>, qs: web::types::Query, ) -> HttpResult { let (_, kind, name) = path.into_inner(); - let kind = utils::process::parse_kind(&kind)?; + let kind = kind.parse().map_err(HttpError::bad_request)?; let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { @@ -166,7 +166,7 @@ pub async fn stop_process( qs: web::types::Query, ) -> HttpResult { let (_, kind, name) = path.into_inner(); - let kind = utils::process::parse_kind(&kind)?; + let kind = kind.parse().map_err(HttpError::bad_request)?; let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { diff --git a/bin/nanocld/src/utils/process.rs b/bin/nanocld/src/utils/process.rs index 6c95cd165..cbfbcf719 100644 --- a/bin/nanocld/src/utils/process.rs +++ b/bin/nanocld/src/utils/process.rs @@ -1,12 +1,4 @@ -use nanocl_error::http::{HttpResult, HttpError}; - -use nanocl_stubs::process::{Process, ProcessKind}; - -pub fn parse_kind(kind: &str) -> HttpResult { - kind - .parse() - .map_err(|err| HttpError::bad_request(format!("Invalid kind {kind} {err}"))) -} +use nanocl_stubs::process::Process; /// Count the number of instances running failing or success pub fn count_status(instances: &[Process]) -> (usize, usize, usize, usize) { From d468b0c1a5b5df72a851500fb5cdc9d8c14bb7c5 Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 14:52:55 +0100 Subject: [PATCH 07/14] refactor/nanocld: mutualised restart --- bin/nanocl/src/commands/cargo.rs | 2 +- bin/nanocld/specs/swagger.yaml | 76 ++++++++++++---------- bin/nanocld/src/objects/generic/process.rs | 28 +++++++- bin/nanocld/src/services/cargo.rs | 29 +-------- bin/nanocld/src/services/openapi.rs | 2 +- bin/nanocld/src/services/process.rs | 49 ++++++++++++-- bin/nanocld/src/utils/cargo.rs | 22 ------- bin/ncdns/src/utils.rs | 4 +- crates/nanocld_client/src/cargo.rs | 25 ------- crates/nanocld_client/src/process.rs | 26 ++++++++ 10 files changed, 144 insertions(+), 119 deletions(-) diff --git a/bin/nanocl/src/commands/cargo.rs b/bin/nanocl/src/commands/cargo.rs index 500f10ec1..18cb52929 100644 --- a/bin/nanocl/src/commands/cargo.rs +++ b/bin/nanocl/src/commands/cargo.rs @@ -127,7 +127,7 @@ async fn exec_cargo_restart( let client = &cli_conf.client; for name in &opts.names { client - .restart_cargo(name, args.namespace.as_deref()) + .restart_process("cargo", name, args.namespace.as_deref()) .await?; } Ok(()) diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index 9caf9c14a..ed224930a 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -473,32 +473,6 @@ paths: description: Cargo killed '404': description: Cargo does not exist - /cargoes/{name}/restart: - post: - tags: - - Cargoes - summary: Restart a cargo - description: Restart a cargo - operationId: restart_cargo - parameters: - - name: name - in: path - description: Name of the cargo - required: true - schema: - type: string - - name: namespace - in: query - description: Namespace where the cargo belongs - required: false - schema: - type: string - nullable: true - responses: - '202': - description: Cargo restarted - '404': - description: Cargo does not exist /cargoes/{name}/stats: get: tags: @@ -935,8 +909,8 @@ paths: get: tags: - Processes - summary: Get logs of a process - description: Get logs of a process + summary: Get logs of processes for given kind and name + description: Get logs of processes for given kind and name operationId: logs_process parameters: - name: kind @@ -1002,12 +976,46 @@ paths: description: Process instances logs '404': description: Process don't exist + /processes/{kind}/{name}/restart: + post: + tags: + - Processes + summary: Restart processes of given kind and name + description: Restart processes of given kind and name + operationId: restart_process + parameters: + - name: kind + in: path + description: Kind of the process + required: true + schema: + type: string + example: cargo + - name: name + in: path + description: Name of the process + required: true + schema: + type: string + example: deploy-example + - name: namespace + in: query + description: Namespace where the process belongs is needed + required: false + schema: + type: string + nullable: true + responses: + '202': + description: Process restarted + '404': + description: Process does not exist /processes/{kind}/{name}/start: post: tags: - Processes - summary: Start a process - description: Start a process + summary: Start processes of given kind and name + description: Start processes of given kind and name operationId: start_process parameters: - name: kind @@ -1040,8 +1048,8 @@ paths: post: tags: - Processes - summary: Stop a cargo - description: Stop a cargo + summary: Stop a processes of given kind and name + description: Stop a processes of given kind and name operationId: stop_process parameters: - name: kind @@ -1067,9 +1075,9 @@ paths: nullable: true responses: '202': - description: Cargo stopped + description: Process stopped '404': - description: Cargo does not exist + description: Process does not exist /resource/kinds: get: tags: diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs index 6706a4734..bed69567c 100644 --- a/bin/nanocld/src/objects/generic/process.rs +++ b/bin/nanocld/src/objects/generic/process.rs @@ -1,14 +1,15 @@ +use futures_util::{StreamExt, stream::FuturesUnordered}; use bollard_next::container::{ RemoveContainerOptions, StartContainerOptions, StopContainerOptions, Config, CreateContainerOptions, InspectContainerOptions, }; use nanocl_error::{ - http::{HttpResult, HttpError}, io::FromIo, + http::{HttpResult, HttpError}, }; use nanocl_stubs::{ - process::{ProcessKind, ProcessPartial, Process}, system::NativeEventAction, + process::{ProcessKind, ProcessPartial, Process}, }; use crate::{ @@ -151,6 +152,29 @@ pub trait ObjProcess { Ok(()) } + async fn restart_process_by_kind_pk( + pk: &str, + state: &SystemState, + ) -> HttpResult<()> { + let processes = ProcessDb::read_by_kind_key(pk, &state.pool).await?; + processes + .into_iter() + .map(|process| async move { + state + .docker_api + .restart_container(&process.key, None) + .await + .map_err(HttpError::from) + }) + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::>>()?; + Self::_emit(pk, NativeEventAction::Restart, state).await?; + Ok(()) + } + /// Delete a process by pk async fn del_process_by_pk( pk: &str, diff --git a/bin/nanocld/src/services/cargo.rs b/bin/nanocld/src/services/cargo.rs index 1b37c263c..d5cbf1491 100644 --- a/bin/nanocld/src/services/cargo.rs +++ b/bin/nanocld/src/services/cargo.rs @@ -123,32 +123,6 @@ pub async fn delete_cargo( Ok(web::HttpResponse::Accepted().finish()) } -/// Restart a cargo -#[cfg_attr(feature = "dev", utoipa::path( - post, - tag = "Cargoes", - path = "/cargoes/{name}/restart", - params( - ("name" = String, Path, description = "Name of the cargo"), - ("namespace" = Option, Query, description = "Namespace where the cargo belongs"), - ), - responses( - (status = 202, description = "Cargo restarted"), - (status = 404, description = "Cargo does not exist"), - ), -))] -#[web::post("/cargoes/{name}/restart")] -pub async fn restart_cargo( - state: web::types::State, - path: web::types::Path<(String, String)>, - qs: web::types::Query, -) -> HttpResult { - let namespace = utils::key::resolve_nsp(&qs.namespace); - let key = utils::key::gen_key(&namespace, &path.1); - utils::cargo::restart(&key, &state).await?; - Ok(web::HttpResponse::Accepted().finish()) -} - /// Create a new cargo spec and add history entry #[cfg_attr(feature = "dev", utoipa::path( put, @@ -340,7 +314,6 @@ pub async fn stats_cargo( pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(create_cargo); config.service(delete_cargo); - config.service(restart_cargo); config.service(kill_cargo); config.service(patch_cargo); config.service(put_cargo); @@ -459,7 +432,7 @@ mod tests { test_status_code!(res.status(), http::StatusCode::OK, "basic cargo stats"); let res = client .send_post( - &format!("{ENDPOINT}/{main_test_cargo}/restart"), + &format!("/processes/cargo/{main_test_cargo}/restart"), None::, None::, ) diff --git a/bin/nanocld/src/services/openapi.rs b/bin/nanocld/src/services/openapi.rs index d13764b28..58d90983e 100644 --- a/bin/nanocld/src/services/openapi.rs +++ b/bin/nanocld/src/services/openapi.rs @@ -250,7 +250,6 @@ impl Modify for VersionModifier { cargo::inspect_cargo, cargo::create_cargo, cargo::delete_cargo, - cargo::restart_cargo, cargo::put_cargo, cargo::patch_cargo, cargo::kill_cargo, @@ -304,6 +303,7 @@ impl Modify for VersionModifier { process::start_process, process::stop_process, process::list_process, + process::restart_process, // Event event::list_event, ), diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 68ea55705..1450f44e8 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -40,7 +40,7 @@ pub async fn list_process( Ok(web::HttpResponse::Ok().json(&processes)) } -/// Get logs of a process +/// Get logs of processes for given kind and name #[cfg_attr(feature = "dev", utoipa::path( get, tag = "Processes", @@ -106,7 +106,7 @@ async fn logs_process( ) } -/// Start a process +/// Start processes of given kind and name #[cfg_attr(feature = "dev", utoipa::path( post, tag = "Processes", @@ -144,7 +144,45 @@ pub async fn start_process( Ok(web::HttpResponse::Accepted().finish()) } -/// Stop a cargo +/// Restart processes of given kind and name +#[cfg_attr(feature = "dev", utoipa::path( + post, + tag = "Processes", + path = "/processes/{kind}/{name}/restart", + params( + ("kind" = String, Path, description = "Kind of the process", example = "cargo"), + ("name" = String, Path, description = "Name of the process", example = "deploy-example"), + ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), + ), + responses( + (status = 202, description = "Process restarted"), + (status = 404, description = "Process does not exist"), + ), +))] +#[web::post("/processes/{kind}/{name}/restart")] +pub async fn restart_process( + state: web::types::State, + path: web::types::Path<(String, String, String)>, + qs: web::types::Query, +) -> HttpResult { + let (_, kind, name) = path.into_inner(); + let kind = kind.parse().map_err(HttpError::bad_request)?; + let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); + match &kind { + ProcessKind::Vm => { + VmDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Job => { + JobDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + } + ProcessKind::Cargo => { + CargoDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + } + } + Ok(web::HttpResponse::Accepted().finish()) +} + +/// Stop a processes of given kind and name #[cfg_attr(feature = "dev", utoipa::path( post, tag = "Processes", @@ -155,8 +193,8 @@ pub async fn start_process( ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), ), responses( - (status = 202, description = "Cargo stopped"), - (status = 404, description = "Cargo does not exist"), + (status = 202, description = "Process stopped"), + (status = 404, description = "Process does not exist"), ), ))] #[web::post("/processes/{kind}/{name}/stop")] @@ -185,6 +223,7 @@ pub async fn stop_process( pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(list_process); config.service(logs_process); + config.service(restart_process); config.service(start_process); config.service(stop_process); } diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index bc81ea03f..d69d3a6ca 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -240,28 +240,6 @@ pub async fn delete_instances( .collect::>() } -/// Restart cargo instances (containers) by key -pub async fn restart(key: &str, state: &SystemState) -> HttpResult<()> { - let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; - let processes = - ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; - processes - .into_iter() - .map(|process| async move { - state - .docker_api - .restart_container(&process.key, None) - .await - .map_err(HttpError::from) - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::>>()?; - Ok(()) -} - /// List the cargoes for the given query pub async fn list( query: &GenericListNspQuery, diff --git a/bin/ncdns/src/utils.rs b/bin/ncdns/src/utils.rs index f5f4943b6..191086956 100644 --- a/bin/ncdns/src/utils.rs +++ b/bin/ncdns/src/utils.rs @@ -60,7 +60,9 @@ async fn get_network_addr( /// Reload the dns service /// TODO: use a better way to reload the service, we may have to move from dnsmasq to something else pub(crate) async fn reload_service(client: &NanocldClient) -> IoResult<()> { - client.restart_cargo("ndns", Some("system")).await?; + client + .restart_process("cargo", "ndns", Some("system")) + .await?; Ok(()) } diff --git a/crates/nanocld_client/src/cargo.rs b/crates/nanocld_client/src/cargo.rs index e87c40c87..5c798f7f1 100644 --- a/crates/nanocld_client/src/cargo.rs +++ b/crates/nanocld_client/src/cargo.rs @@ -95,31 +95,6 @@ impl NanocldClient { Self::res_json(res).await } - /// Restart a cargo by it's name and namespace - /// - /// ## Example - /// - /// ```no_run,ignore - /// use nanocld_client::NanocldClient; - /// - /// let client = NanocldClient::connect_to("http://localhost:8585", None); - /// let res = client.restart_cargo("my-cargo", None).await; - /// ``` - pub async fn restart_cargo( - &self, - name: &str, - namespace: Option<&str>, - ) -> HttpClientResult<()> { - self - .send_post( - &format!("{}/{name}/restart", Self::CARGO_PATH), - None::, - Some(GenericNspQuery::new(namespace)), - ) - .await?; - Ok(()) - } - /// List all cargoes in a namespace /// /// ## Example diff --git a/crates/nanocld_client/src/process.rs b/crates/nanocld_client/src/process.rs index 78aaba0bc..47cc35438 100644 --- a/crates/nanocld_client/src/process.rs +++ b/crates/nanocld_client/src/process.rs @@ -76,6 +76,32 @@ impl NanocldClient { Ok(()) } + /// Restart a process by it's kind and name and namespace + /// + /// ## Example + /// + /// ```no_run,ignore + /// use nanocld_client::NanocldClient; + /// + /// let client = NanocldClient::connect_to("http://localhost:8585", None); + /// let res = client.start_process("cargo", "my-cargo", None).await; + /// ``` + pub async fn restart_process( + &self, + kind: &str, + name: &str, + namespace: Option<&str>, + ) -> HttpClientResult<()> { + self + .send_post( + &format!("{}/{kind}/{name}/restart", Self::PROCESS_PATH), + None::, + Some(GenericNspQuery::new(namespace)), + ) + .await?; + Ok(()) + } + /// Stop a process by it's kind and name and namespace /// /// ## Example From f0ba2ff17bcf17647222f5b43f361d84939b94ae Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 15:16:29 +0100 Subject: [PATCH 08/14] refactor/nanocld: mutualised kill --- bin/nanocld/specs/swagger.yaml | 72 ++++++++++++---------- bin/nanocld/src/objects/generic/process.rs | 26 ++++++++ bin/nanocld/src/services/cargo.rs | 33 +--------- bin/nanocld/src/services/openapi.rs | 2 +- bin/nanocld/src/services/process.rs | 42 +++++++++++++ bin/nanocld/src/utils/cargo.rs | 21 +------ crates/nanocld_client/src/cargo.rs | 30 +-------- crates/nanocld_client/src/process.rs | 30 ++++++++- 8 files changed, 143 insertions(+), 113 deletions(-) diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index ed224930a..fe3434a52 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -441,38 +441,6 @@ paths: application/json: schema: $ref: '#/components/schemas/CargoInspect' - /cargoes/{name}/kill: - post: - tags: - - Cargoes - summary: Send a signal to a cargo this will kill the cargo if the signal is SIGKILL - description: Send a signal to a cargo this will kill the cargo if the signal is SIGKILL - operationId: kill_cargo - parameters: - - name: name - in: path - description: Name of the cargo - required: true - schema: - type: string - - name: namespace - in: query - description: Namespace where the cargo belongs - required: false - schema: - type: string - nullable: true - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/CargoKillOptions' - required: true - responses: - '200': - description: Cargo killed - '404': - description: Cargo does not exist /cargoes/{name}/stats: get: tags: @@ -1078,6 +1046,46 @@ paths: description: Process stopped '404': description: Process does not exist + /processes/{name}/kill: + post: + tags: + - Processes + summary: Send a signal to processes of given kind and name + description: Send a signal to processes of given kind and name + operationId: kill_process + parameters: + - name: kind + in: path + description: Kind of the process + required: true + schema: + type: string + example: cargo + - name: name + in: path + description: Name of the process + required: true + schema: + type: string + example: deploy-example + - name: namespace + in: query + description: Namespace where the process belongs is needed + required: false + schema: + type: string + nullable: true + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CargoKillOptions' + required: true + responses: + '200': + description: Cargo killed + '404': + description: Cargo does not exist /resource/kinds: get: tags: diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs index bed69567c..c5a5fd622 100644 --- a/bin/nanocld/src/objects/generic/process.rs +++ b/bin/nanocld/src/objects/generic/process.rs @@ -10,6 +10,7 @@ use nanocl_error::{ use nanocl_stubs::{ system::NativeEventAction, process::{ProcessKind, ProcessPartial, Process}, + cargo::CargoKillOptions, }; use crate::{ @@ -175,6 +176,31 @@ pub trait ObjProcess { Ok(()) } + async fn kill_process_by_kind_pk( + pk: &str, + opts: &CargoKillOptions, + state: &SystemState, + ) -> HttpResult<()> { + let processes = ProcessDb::read_by_kind_key(pk, &state.pool).await?; + processes + .into_iter() + .map(|process| async move { + let id = process.data.id.clone().unwrap_or_default(); + let options = opts.clone().into(); + state + .docker_api + .kill_container(&id, Some(options)) + .await + .map_err(HttpError::from) + }) + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::>>()?; + Ok(()) + } + /// Delete a process by pk async fn del_process_by_pk( pk: &str, diff --git a/bin/nanocld/src/services/cargo.rs b/bin/nanocld/src/services/cargo.rs index d5cbf1491..b66bb1802 100644 --- a/bin/nanocld/src/services/cargo.rs +++ b/bin/nanocld/src/services/cargo.rs @@ -4,7 +4,7 @@ use nanocl_error::{http::HttpResult, io::IoResult}; use nanocl_stubs::{ generic::{GenericNspQuery, GenericListNspQuery}, - cargo::{CargoDeleteQuery, CargoKillOptions, CargoStatsQuery}, + cargo::{CargoDeleteQuery, CargoStatsQuery}, cargo_spec::{CargoSpecPartial, CargoSpecUpdate}, }; @@ -187,34 +187,6 @@ pub async fn patch_cargo( Ok(web::HttpResponse::Ok().json(&cargo)) } -/// Send a signal to a cargo this will kill the cargo if the signal is SIGKILL -#[cfg_attr(feature = "dev", utoipa::path( - post, - tag = "Cargoes", - request_body = CargoKillOptions, - path = "/cargoes/{name}/kill", - params( - ("name" = String, Path, description = "Name of the cargo"), - ("namespace" = Option, Query, description = "Namespace where the cargo belongs"), - ), - responses( - (status = 200, description = "Cargo killed"), - (status = 404, description = "Cargo does not exist"), - ), -))] -#[web::post("/cargoes/{name}/kill")] -pub async fn kill_cargo( - state: web::types::State, - path: web::types::Path<(String, String)>, - payload: web::types::Json, - qs: web::types::Query, -) -> HttpResult { - let namespace = utils::key::resolve_nsp(&qs.namespace); - let key = utils::key::gen_key(&namespace, &path.1); - utils::cargo::kill_by_key(&key, &payload, &state).await?; - Ok(web::HttpResponse::Ok().into()) -} - /// List cargo histories #[cfg_attr(feature = "dev", utoipa::path( get, @@ -314,7 +286,6 @@ pub async fn stats_cargo( pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(create_cargo); config.service(delete_cargo); - config.service(kill_cargo); config.service(patch_cargo); config.service(put_cargo); config.service(list_cargo); @@ -415,7 +386,7 @@ mod tests { ); let res = client .send_post( - &format!("{ENDPOINT}/{main_test_cargo}/kill"), + &format!("/processes/cargo/{main_test_cargo}/kill"), Some(&CargoKillOptions { signal: "SIGINT".to_owned(), }), diff --git a/bin/nanocld/src/services/openapi.rs b/bin/nanocld/src/services/openapi.rs index 58d90983e..893582853 100644 --- a/bin/nanocld/src/services/openapi.rs +++ b/bin/nanocld/src/services/openapi.rs @@ -252,7 +252,6 @@ impl Modify for VersionModifier { cargo::delete_cargo, cargo::put_cargo, cargo::patch_cargo, - cargo::kill_cargo, cargo::list_cargo_history, cargo::revert_cargo, cargo::stats_cargo, @@ -304,6 +303,7 @@ impl Modify for VersionModifier { process::stop_process, process::list_process, process::restart_process, + process::kill_process, // Event event::list_event, ), diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 1450f44e8..aa445d2a8 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -7,6 +7,7 @@ use bollard_next::container::LogsOptions; use nanocl_stubs::{ generic::{GenericNspQuery, GenericFilter, GenericListQuery}, process::{ProcessLogQuery, ProcessOutputLog, ProcessKind}, + cargo::CargoKillOptions, }; use crate::{ @@ -220,12 +221,53 @@ pub async fn stop_process( Ok(web::HttpResponse::Accepted().finish()) } +/// Send a signal to processes of given kind and name +#[cfg_attr(feature = "dev", utoipa::path( + post, + tag = "Processes", + request_body = CargoKillOptions, + path = "/processes/{name}/kill", + params( + ("kind" = String, Path, description = "Kind of the process", example = "cargo"), + ("name" = String, Path, description = "Name of the process", example = "deploy-example"), + ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), + ), + responses( + (status = 200, description = "Cargo killed"), + (status = 404, description = "Cargo does not exist"), + ), +))] +#[web::post("/processes/kind/{name}/kill")] +pub async fn kill_process( + state: web::types::State, + path: web::types::Path<(String, String, String)>, + payload: web::types::Json, + qs: web::types::Query, +) -> HttpResult { + let (_, kind, name) = path.into_inner(); + let kind = kind.parse().map_err(HttpError::bad_request)?; + let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); + match &kind { + ProcessKind::Vm => { + VmDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + } + ProcessKind::Job => { + JobDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + } + ProcessKind::Cargo => { + CargoDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + } + } + Ok(web::HttpResponse::Ok().into()) +} + pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(list_process); config.service(logs_process); config.service(restart_process); config.service(start_process); config.service(stop_process); + config.service(kill_process); } #[cfg(test)] diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index d69d3a6ca..275070223 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -13,7 +13,7 @@ use bollard_next::{ use nanocl_stubs::{ process::Process, generic::{GenericListNspQuery, GenericClause, GenericFilter}, - cargo::{Cargo, CargoSummary, CargoKillOptions, CargoStats, CargoStatsQuery}, + cargo::{Cargo, CargoSummary, CargoStats, CargoStatsQuery}, }; use crate::{ @@ -282,25 +282,6 @@ pub async fn list( Ok(cargo_summaries) } -/// Send a signal to a cargo instance the cargo name can be used if the cargo has only one instance -/// The signal is send to one instance only -pub async fn kill_by_key( - key: &str, - options: &CargoKillOptions, - state: &SystemState, -) -> HttpResult<()> { - let instances = ProcessDb::read_by_kind_key(key, &state.pool).await?; - if instances.is_empty() { - return Err(HttpError::not_found(format!( - "Cargo instance not found: {key}" - ))); - } - let id = instances[0].data.id.clone().unwrap_or_default(); - let options = options.clone().into(); - state.docker_api.kill_container(&id, Some(options)).await?; - Ok(()) -} - /// Get the stats of a cargo instance /// The cargo name can be used if the cargo has only one instance pub fn get_stats( diff --git a/crates/nanocld_client/src/cargo.rs b/crates/nanocld_client/src/cargo.rs index 5c798f7f1..ad87f27ae 100644 --- a/crates/nanocld_client/src/cargo.rs +++ b/crates/nanocld_client/src/cargo.rs @@ -6,8 +6,8 @@ use nanocl_error::http_client::HttpClientResult; use bollard_next::service::ContainerSummary; use nanocl_stubs::generic::GenericNspQuery; use nanocl_stubs::cargo::{ - Cargo, CargoSummary, CargoInspect, CargoKillOptions, CargoDeleteQuery, - CargoStatsQuery, CargoStats, + Cargo, CargoSummary, CargoInspect, CargoDeleteQuery, CargoStatsQuery, + CargoStats, }; use nanocl_stubs::cargo_spec::{CargoSpecUpdate, CargoSpecPartial, CargoSpec}; @@ -237,32 +237,6 @@ impl NanocldClient { Ok(Self::res_stream(res).await) } - /// Kill a cargo by it's name - /// - /// ## Example - /// - /// ```no_run,ignore - /// use nanocld_client::NanocldClient; - /// - /// let client = NanocldClient::connect_to("http://localhost:8585", None); - /// let res = client.kill_cargo("my-cargo", None, None).await; - /// ``` - pub async fn kill_cargo( - &self, - name: &str, - query: Option<&CargoKillOptions>, - namespace: Option<&str>, - ) -> HttpClientResult<()> { - self - .send_post( - &format!("{}/{name}/kill", Self::CARGO_PATH), - query, - Some(GenericNspQuery::new(namespace)), - ) - .await?; - Ok(()) - } - /// List all the instances of a cargo by it's name and namespace /// /// ## Example diff --git a/crates/nanocld_client/src/process.rs b/crates/nanocld_client/src/process.rs index 47cc35438..f38fdb753 100644 --- a/crates/nanocld_client/src/process.rs +++ b/crates/nanocld_client/src/process.rs @@ -1,4 +1,5 @@ use nanocl_error::io::IoError; +use nanocl_stubs::cargo::CargoKillOptions; use ntex::channel::mpsc::Receiver; use nanocl_error::http::HttpResult; @@ -84,7 +85,7 @@ impl NanocldClient { /// use nanocld_client::NanocldClient; /// /// let client = NanocldClient::connect_to("http://localhost:8585", None); - /// let res = client.start_process("cargo", "my-cargo", None).await; + /// let res = client.restart_process("cargo", "my-cargo", None).await; /// ``` pub async fn restart_process( &self, @@ -127,6 +128,33 @@ impl NanocldClient { .await?; Ok(()) } + + /// Kill processes by it's kind and name and namespace + /// + /// ## Example + /// + /// ```no_run,ignore + /// use nanocld_client::NanocldClient; + /// + /// let client = NanocldClient::connect_to("http://localhost:8585", None); + /// let res = client.kill_process("cargo", "my-cargo", None, None).await; + /// ``` + pub async fn kill_cargo( + &self, + kind: &str, + name: &str, + query: Option<&CargoKillOptions>, + namespace: Option<&str>, + ) -> HttpClientResult<()> { + self + .send_post( + &format!("{}/{kind}/{name}/kill", Self::PROCESS_PATH), + query, + Some(GenericNspQuery::new(namespace)), + ) + .await?; + Ok(()) + } } #[cfg(test)] From e7e1fcc2696e418f46db57f94df75d88c91f20e1 Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 15:44:01 +0100 Subject: [PATCH 09/14] refactor/nanocld: mutualised wait --- bin/nanocl/src/commands/job.rs | 11 +-- bin/nanocl/src/models/job.rs | 2 +- bin/nanocld/specs/swagger.yaml | 55 ++++++-------- bin/nanocld/src/services/job.rs | 51 +++---------- bin/nanocld/src/services/openapi.rs | 2 +- bin/nanocld/src/services/process.rs | 96 +++++++++++++++++++++---- bin/nanocld/src/utils/job.rs | 56 +-------------- crates/nanocl_stubs/src/job.rs | 93 ------------------------ crates/nanocl_stubs/src/process.rs | 104 ++++++++++++++++++++++++++- crates/nanocld_client/src/job.rs | 30 +------- crates/nanocld_client/src/process.rs | 27 ++++++- 11 files changed, 258 insertions(+), 269 deletions(-) diff --git a/bin/nanocl/src/commands/job.rs b/bin/nanocl/src/commands/job.rs index 7b7c8ed37..ac963e05c 100644 --- a/bin/nanocl/src/commands/job.rs +++ b/bin/nanocl/src/commands/job.rs @@ -1,8 +1,7 @@ use futures::StreamExt; use nanocl_error::io::{IoResult, FromIo, IoError}; -use nanocld_client::stubs::job::JobWaitQuery; -use nanocld_client::stubs::process::ProcessLogQuery; +use nanocld_client::stubs::process::{ProcessLogQuery, ProcessWaitQuery}; use crate::utils; use crate::config::CliConfig; @@ -86,10 +85,12 @@ async fn exec_job_wait( ) -> IoResult<()> { let client = &cli_conf.client; let mut stream = client - .wait_job( + .wait_process( + "job", &opts.name, - Some(&JobWaitQuery { + Some(&ProcessWaitQuery { condition: opts.condition.clone(), + namespace: None, }), ) .await?; @@ -102,7 +103,7 @@ async fn exec_job_wait( if resp.status_code != 0 { eprintln!( "Job container {}-{} ended with error code {}", - opts.name, resp.container_name, resp.status_code, + opts.name, resp.process_name, resp.status_code, ); has_error = true; } diff --git a/bin/nanocl/src/models/job.rs b/bin/nanocl/src/models/job.rs index 383b9213d..21143c9ea 100644 --- a/bin/nanocl/src/models/job.rs +++ b/bin/nanocl/src/models/job.rs @@ -2,7 +2,7 @@ use tabled::Tabled; use chrono::TimeZone; use clap::{Parser, Subcommand}; -use nanocld_client::stubs::job::{WaitCondition, JobSummary}; +use nanocld_client::stubs::{job::JobSummary, process::WaitCondition}; use super::{DisplayFormat, GenericListOpts}; diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index fe3434a52..54dfe56d0 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -659,25 +659,6 @@ paths: application/json: schema: $ref: '#/components/schemas/JobInspect' - /jobs/{name}/wait: - get: - tags: - - Jobs - summary: Wait for a job to finish - description: Wait for a job to finish - operationId: wait_job - parameters: - - name: name - in: path - description: Name of the job instance usually `name` or `name-number` - required: true - schema: - type: string - responses: - '200': - description: Job wait - '404': - description: Job does not exist /metrics: get: tags: @@ -942,8 +923,6 @@ paths: responses: '200': description: Process instances logs - '404': - description: Process don't exist /processes/{kind}/{name}/restart: post: tags: @@ -975,9 +954,7 @@ paths: nullable: true responses: '202': - description: Process restarted - '404': - description: Process does not exist + description: Process instances restarted /processes/{kind}/{name}/start: post: tags: @@ -1009,9 +986,7 @@ paths: nullable: true responses: '202': - description: Process started - '404': - description: Process does not exist + description: Process instances started /processes/{kind}/{name}/stop: post: tags: @@ -1043,9 +1018,26 @@ paths: nullable: true responses: '202': - description: Process stopped + description: Process instances stopped + /processes/{kind}/{name}/wait: + get: + tags: + - Processes + summary: Wait for a job to finish + description: Wait for a job to finish + operationId: wait_process + parameters: + - name: name + in: path + description: Name of the job instance usually `name` or `name-number` + required: true + schema: + type: string + responses: + '200': + description: Job wait '404': - description: Process does not exist + description: Job does not exist /processes/{name}/kill: post: tags: @@ -1083,9 +1075,7 @@ paths: required: true responses: '200': - description: Cargo killed - '404': - description: Cargo does not exist + description: Process instances killed /resource/kinds: get: tags: @@ -5394,6 +5384,7 @@ components: nullable: true ProcessKind: type: string + description: Kind of process (Vm, Job, Cargo) enum: - vm - job diff --git a/bin/nanocld/src/services/job.rs b/bin/nanocld/src/services/job.rs index ab41d203a..c15362fc4 100644 --- a/bin/nanocld/src/services/job.rs +++ b/bin/nanocld/src/services/job.rs @@ -1,9 +1,7 @@ use ntex::web; -use bollard_next::container::WaitContainerOptions; - use nanocl_error::http::HttpResult; -use nanocl_stubs::job::{JobPartial, JobWaitQuery}; +use nanocl_stubs::job::JobPartial; use crate::{ utils, @@ -92,53 +90,21 @@ pub async fn inspect_job( Ok(web::HttpResponse::Ok().json(&job)) } -/// Wait for a job to finish -#[cfg_attr(feature = "dev", utoipa::path( - get, - tag = "Jobs", - path = "/jobs/{name}/wait", - params( - ("name" = String, Path, description = "Name of the job instance usually `name` or `name-number`"), - ), - responses( - (status = 200, description = "Job wait", content_type = "application/vdn.nanocl.raw-stream"), - (status = 404, description = "Job does not exist"), - ), -))] -#[web::get("/jobs/{name}/wait")] -pub async fn wait_job( - state: web::types::State, - path: web::types::Path<(String, String)>, - qs: web::types::Query, -) -> HttpResult { - let stream = utils::job::wait( - &path.1, - WaitContainerOptions { - condition: qs.condition.clone().unwrap_or_default(), - }, - &state, - ) - .await?; - Ok( - web::HttpResponse::Ok() - .content_type("application/vdn.nanocl.raw-stream") - .streaming(stream), - ) -} - pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(list_job); config.service(create_job); config.service(delete_job); config.service(inspect_job); - config.service(wait_job); } #[cfg(test)] mod tests { use ntex::http; use futures_util::{StreamExt, TryStreamExt}; - use nanocl_stubs::job::{Job, JobWaitResponse, JobSummary}; + use nanocl_stubs::{ + job::{Job, JobSummary}, + process::ProcessWaitResponse, + }; use crate::utils::tests::*; @@ -183,7 +149,10 @@ mod tests { let job = res.json::().await.unwrap(); let job_endpoint = format!("{ENDPOINT}/{}", &job.name); let wait_res = client - .send_get(&format!("{job_endpoint}/wait"), None::) + .send_get( + &format!("/processes/job/{}/wait", &job.name), + None::, + ) .await; test_status_code!( wait_res.status(), @@ -220,7 +189,7 @@ mod tests { let mut stream = wait_res.into_stream(); while let Some(Ok(wait_response)) = stream.next().await { let response = - serde_json::from_slice::(&wait_response).unwrap(); + serde_json::from_slice::(&wait_response).unwrap(); assert_eq!(response.status_code, 0); } let res = client diff --git a/bin/nanocld/src/services/openapi.rs b/bin/nanocld/src/services/openapi.rs index 893582853..353c49e08 100644 --- a/bin/nanocld/src/services/openapi.rs +++ b/bin/nanocld/src/services/openapi.rs @@ -244,7 +244,6 @@ impl Modify for VersionModifier { job::delete_job, job::inspect_job, job::create_job, - job::wait_job, // Cargo cargo::list_cargo, cargo::inspect_cargo, @@ -304,6 +303,7 @@ impl Modify for VersionModifier { process::list_process, process::restart_process, process::kill_process, + process::wait_process, // Event event::list_event, ), diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index aa445d2a8..f243d8dcd 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -3,10 +3,16 @@ use futures_util::{StreamExt, TryStreamExt, stream::select_all}; use nanocl_error::http::{HttpResult, HttpError}; -use bollard_next::container::LogsOptions; +use bollard_next::{ + container::{LogsOptions, WaitContainerOptions}, + service::ContainerWaitExitError, +}; use nanocl_stubs::{ generic::{GenericNspQuery, GenericFilter, GenericListQuery}, - process::{ProcessLogQuery, ProcessOutputLog, ProcessKind}, + process::{ + ProcessLogQuery, ProcessOutputLog, ProcessKind, ProcessWaitQuery, + ProcessWaitResponse, + }, cargo::CargoKillOptions, }; @@ -58,7 +64,6 @@ pub async fn list_process( ), responses( (status = 200, description = "Process instances logs", content_type = "application/vdn.nanocl.raw-stream"), - (status = 404, description = "Process don't exist"), ), ))] #[web::get("/processes/{kind}/{name}/logs")] @@ -118,8 +123,7 @@ async fn logs_process( ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), ), responses( - (status = 202, description = "Process started"), - (status = 404, description = "Process does not exist"), + (status = 202, description = "Process instances started"), ), ))] #[web::post("/processes/{kind}/{name}/start")] @@ -156,8 +160,7 @@ pub async fn start_process( ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), ), responses( - (status = 202, description = "Process restarted"), - (status = 404, description = "Process does not exist"), + (status = 202, description = "Process instances restarted"), ), ))] #[web::post("/processes/{kind}/{name}/restart")] @@ -194,8 +197,7 @@ pub async fn restart_process( ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), ), responses( - (status = 202, description = "Process stopped"), - (status = 404, description = "Process does not exist"), + (status = 202, description = "Process instances stopped"), ), ))] #[web::post("/processes/{kind}/{name}/stop")] @@ -226,18 +228,17 @@ pub async fn stop_process( post, tag = "Processes", request_body = CargoKillOptions, - path = "/processes/{name}/kill", + path = "/processes/{kind}/{name}/kill", params( ("kind" = String, Path, description = "Kind of the process", example = "cargo"), ("name" = String, Path, description = "Name of the process", example = "deploy-example"), ("namespace" = Option, Query, description = "Namespace where the process belongs is needed"), ), responses( - (status = 200, description = "Cargo killed"), - (status = 404, description = "Cargo does not exist"), + (status = 200, description = "Process instances killed"), ), ))] -#[web::post("/processes/kind/{name}/kill")] +#[web::post("/processes/{kind}/{name}/kill")] pub async fn kill_process( state: web::types::State, path: web::types::Path<(String, String, String)>, @@ -261,6 +262,74 @@ pub async fn kill_process( Ok(web::HttpResponse::Ok().into()) } +/// Wait for a job to finish +#[cfg_attr(feature = "dev", utoipa::path( + get, + tag = "Processes", + path = "/processes/{kind}/{name}/wait", + params( + ("name" = String, Path, description = "Name of the job instance usually `name` or `name-number`"), + ), + responses( + (status = 200, description = "Job wait", content_type = "application/vdn.nanocl.raw-stream"), + (status = 404, description = "Job does not exist"), + ), +))] +#[web::get("/processes/{kind}/{name}/wait")] +pub async fn wait_process( + state: web::types::State, + path: web::types::Path<(String, String, String)>, + qs: web::types::Query, +) -> HttpResult { + let (_, kind, name) = path.into_inner(); + let kind = kind.parse().map_err(HttpError::bad_request)?; + let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); + let opts = WaitContainerOptions { + condition: qs.condition.clone().unwrap_or_default(), + }; + let processes = ProcessDb::read_by_kind_key(&kind_pk, &state.pool).await?; + let mut streams = Vec::new(); + for process in processes { + let options = Some(opts.clone()); + let stream = state.docker_api.wait_container(&process.key, options).map( + move |wait_result| match wait_result { + Err(err) => { + if let bollard_next::errors::Error::DockerContainerWaitError { + error, + code, + } = &err + { + return Ok(ProcessWaitResponse { + process_name: process.name.clone(), + status_code: *code, + error: Some(ContainerWaitExitError { + message: Some(error.to_owned()), + }), + }); + } + Err(err) + } + Ok(wait_response) => { + Ok(ProcessWaitResponse::from_container_wait_response( + wait_response, + process.name.clone(), + )) + } + }, + ); + streams.push(stream); + } + let stream = select_all(streams).into_stream(); + Ok( + web::HttpResponse::Ok() + .content_type("application/vdn.nanocl.raw-stream") + .streaming(utils::stream::transform_stream::< + ProcessWaitResponse, + ProcessWaitResponse, + >(stream)), + ) +} + pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(list_process); config.service(logs_process); @@ -268,6 +337,7 @@ pub fn ntex_config(config: &mut web::ServiceConfig) { config.service(start_process); config.service(stop_process); config.service(kill_process); + config.service(wait_process); } #[cfg(test)] diff --git a/bin/nanocld/src/utils/job.rs b/bin/nanocld/src/utils/job.rs index fe47f7c88..acea56e85 100644 --- a/bin/nanocld/src/utils/job.rs +++ b/bin/nanocld/src/utils/job.rs @@ -1,12 +1,6 @@ -use ntex::{web, util::Bytes}; +use ntex::web; use tokio::{fs, io::AsyncWriteExt}; -use futures_util::{ - StreamExt, TryStreamExt, - stream::{FuturesUnordered, select_all}, -}; -use bollard_next::{ - service::ContainerWaitExitError, container::WaitContainerOptions, -}; +use futures_util::{StreamExt, stream::FuturesUnordered}; use nanocl_error::{ io::{FromIo, IoError, IoResult}, @@ -14,7 +8,7 @@ use nanocl_error::{ }; use nanocl_stubs::{ generic::GenericFilter, - job::{Job, JobWaitResponse, WaitCondition, JobSummary}, + job::{Job, JobSummary}, }; use crate::{ @@ -23,8 +17,6 @@ use crate::{ models::{SystemState, ProcessDb, JobDb}, }; -use super::stream::transform_stream; - /// Format the cron job command to start a job at a given time fn format_cron_job_command(job: &Job, state: &SystemState) -> String { let host = state @@ -130,45 +122,3 @@ pub async fn list(state: &SystemState) -> HttpResult> { .collect::>>()?; Ok(job_summaries) } - -/// Wait a job to finish -pub async fn wait( - name: &str, - wait_options: WaitContainerOptions, - state: &SystemState, -) -> HttpResult>> { - let job = JobDb::read_by_pk(name, &state.pool).await?.try_to_spec()?; - let docker_api = state.docker_api.clone(); - let processes = ProcessDb::read_by_kind_key(&job.name, &state.pool).await?; - let mut streams = Vec::new(); - for process in processes { - let options = Some(wait_options.clone()); - let stream = docker_api.wait_container(&process.key, options).map( - move |wait_result| match wait_result { - Err(err) => { - if let bollard_next::errors::Error::DockerContainerWaitError { - error, - code, - } = &err - { - return Ok(JobWaitResponse { - container_name: process.name.clone(), - status_code: *code, - error: Some(ContainerWaitExitError { - message: Some(error.to_owned()), - }), - }); - } - Err(err) - } - Ok(wait_response) => Ok(JobWaitResponse::from_container_wait_response( - wait_response, - process.name.clone(), - )), - }, - ); - streams.push(stream); - } - let stream = select_all(streams).into_stream(); - Ok(transform_stream::(stream)) -} diff --git a/crates/nanocl_stubs/src/job.rs b/crates/nanocl_stubs/src/job.rs index d77fb4c75..4d28c5961 100644 --- a/crates/nanocl_stubs/src/job.rs +++ b/crates/nanocl_stubs/src/job.rs @@ -1,8 +1,6 @@ -use std::io; use serde::{Serialize, Deserialize}; use bollard_next::container::Config; -use bollard_next::service::{ContainerWaitExitError, ContainerWaitResponse}; use crate::process::Process; use crate::system::{EventActorKind, EventActor}; @@ -165,94 +163,3 @@ impl From for JobPartial { job.spec.into() } } - -/// Used to wait for a job to reach a certain state -#[derive(Debug, Clone, Default)] -#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case"))] -pub enum WaitCondition { - NotRunning, - #[default] - NextExit, - Removed, -} - -/// Implement Display for WaitCondition -impl std::fmt::Display for WaitCondition { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WaitCondition::NextExit => write!(f, "next-exit"), - WaitCondition::NotRunning => write!(f, "not-running"), - WaitCondition::Removed => write!(f, "removed"), - } - } -} - -/// Convert a WaitCondition into a String -impl From for std::string::String { - fn from(value: WaitCondition) -> Self { - match value { - WaitCondition::NextExit => "next-exit", - WaitCondition::NotRunning => "not-running", - WaitCondition::Removed => "removed", - } - .to_owned() - } -} - -/// Implement FromStr for WaitCondition -impl std::str::FromStr for WaitCondition { - type Err = io::Error; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "next-exit" => Ok(WaitCondition::NextExit), - "not-running" => Ok(WaitCondition::NotRunning), - "removed" => Ok(WaitCondition::Removed), - _ => Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid wait condition", - )), - } - } -} - -/// Query for the job wait endpoint -#[derive(Debug, Clone, Default)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "serde", serde(rename_all = "PascalCase"))] -pub struct JobWaitQuery { - // Wait condition - pub condition: Option, -} - -/// Stream of wait response of a job -#[derive(Debug)] -#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "serde", serde(rename_all = "PascalCase"))] -pub struct JobWaitResponse { - /// Container id - pub container_name: String, - /// Exit code of the container - pub status_code: i64, - /// Wait error - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -impl JobWaitResponse { - pub fn from_container_wait_response( - response: ContainerWaitResponse, - container_name: String, - ) -> JobWaitResponse { - JobWaitResponse { - container_name, - status_code: response.status_code, - error: response.error, - } - } -} diff --git a/crates/nanocl_stubs/src/process.rs b/crates/nanocl_stubs/src/process.rs index 19154bac0..8aff614ec 100644 --- a/crates/nanocl_stubs/src/process.rs +++ b/crates/nanocl_stubs/src/process.rs @@ -3,9 +3,12 @@ use std::str::FromStr; #[cfg(feature = "serde")] use serde::{Serialize, Deserialize}; -use bollard_next::service::ContainerInspectResponse; +use bollard_next::service::{ + ContainerInspectResponse, ContainerWaitExitError, ContainerWaitResponse, +}; use bollard_next::container::{LogOutput, LogsOptions}; +/// Kind of process (Vm, Job, Cargo) #[derive(Clone, PartialEq, Debug)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] @@ -17,6 +20,7 @@ pub enum ProcessKind { Cargo, } +/// Implement FromStr for ProcessKind for .parse() method impl FromStr for ProcessKind { type Err = std::io::Error; @@ -33,6 +37,7 @@ impl FromStr for ProcessKind { } } +/// Try to convert a string into a ProcessKind impl TryFrom for ProcessKind { type Error = std::io::Error; @@ -49,6 +54,7 @@ impl TryFrom for ProcessKind { } } +/// Implement to_string for ProcessKind impl ToString for ProcessKind { fn to_string(&self) -> String { match self { @@ -139,6 +145,7 @@ pub struct OutputLog { pub data: String, } +/// Convert a LogOutput into an OutputLog impl From for OutputLog { fn from(output: LogOutput) -> Self { match output { @@ -196,6 +203,7 @@ pub struct ProcessLogQuery { } impl ProcessLogQuery { + /// Set namespace of a ProcessLogQuery pub fn of_namespace(nsp: &str) -> ProcessLogQuery { ProcessLogQuery { namespace: Some(nsp.to_owned()), @@ -210,6 +218,7 @@ impl ProcessLogQuery { } } +/// Convert a ProcessLogQuery into a LogsOptions impl From for LogsOptions { fn from(query: ProcessLogQuery) -> LogsOptions { LogsOptions:: { @@ -223,3 +232,96 @@ impl From for LogsOptions { } } } + +/// Used to wait for a process to reach a certain state +#[derive(Debug, Clone, Default)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case"))] +pub enum WaitCondition { + NotRunning, + #[default] + NextExit, + Removed, +} + +/// Implement Display for WaitCondition +impl std::fmt::Display for WaitCondition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WaitCondition::NextExit => write!(f, "next-exit"), + WaitCondition::NotRunning => write!(f, "not-running"), + WaitCondition::Removed => write!(f, "removed"), + } + } +} + +/// Convert a WaitCondition into a String +impl From for std::string::String { + fn from(value: WaitCondition) -> Self { + match value { + WaitCondition::NextExit => "next-exit", + WaitCondition::NotRunning => "not-running", + WaitCondition::Removed => "removed", + } + .to_owned() + } +} + +/// Implement FromStr for WaitCondition +impl std::str::FromStr for WaitCondition { + type Err = std::io::Error; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "next-exit" => Ok(WaitCondition::NextExit), + "not-running" => Ok(WaitCondition::NotRunning), + "removed" => Ok(WaitCondition::Removed), + _ => Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid wait condition", + )), + } + } +} + +/// Query for the process wait endpoint +#[derive(Debug, Clone, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "PascalCase"))] +pub struct ProcessWaitQuery { + // Wait condition + pub condition: Option, + /// Namespace where belong the process + pub namespace: Option, +} + +/// Stream of wait response of a process +#[derive(Debug)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "serde", serde(rename_all = "PascalCase"))] +pub struct ProcessWaitResponse { + /// Process name + pub process_name: String, + /// Exit code of the container + pub status_code: i64, + /// Wait error + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl ProcessWaitResponse { + pub fn from_container_wait_response( + response: ContainerWaitResponse, + container_name: String, + ) -> ProcessWaitResponse { + ProcessWaitResponse { + process_name: container_name, + status_code: response.status_code, + error: response.error, + } + } +} diff --git a/crates/nanocld_client/src/job.rs b/crates/nanocld_client/src/job.rs index c3c5f16e4..0e06624fd 100644 --- a/crates/nanocld_client/src/job.rs +++ b/crates/nanocld_client/src/job.rs @@ -1,11 +1,6 @@ -use ntex::channel::mpsc::Receiver; - -use nanocl_error::http::HttpResult; use nanocl_error::http_client::HttpClientResult; -use nanocl_stubs::job::{ - Job, JobWaitQuery, JobWaitResponse, JobPartial, JobInspect, JobSummary, -}; +use nanocl_stubs::job::{Job, JobPartial, JobInspect, JobSummary}; use super::http_client::NanocldClient; @@ -74,27 +69,6 @@ impl NanocldClient { Self::res_json(res).await } - /// A stream is returned, data are sent when job reach status - /// - /// ## Example - /// - /// ```no_run,ignore - /// use nanocld_client::NanocldClient; - /// - /// let client = NanocldClient::connect_to("http://localhost:8585", None); - /// let stream = client.wait_job("my_job", None).await.unwrap(); - /// ``` - pub async fn wait_job( - &self, - name: &str, - query: Option<&JobWaitQuery>, - ) -> HttpClientResult>> { - let res = self - .send_get(&format!("{}/{name}/wait", Self::JOB_PATH), query) - .await?; - Ok(Self::res_stream(res).await) - } - /// Delete a job by it's name /// /// ## Example @@ -145,7 +119,7 @@ mod tests { .await .unwrap(); assert_eq!(job.name, "my_test_job"); - let mut stream = client.wait_job(&job.name, None).await.unwrap(); + let mut stream = client.wait_process("job", &job.name, None).await.unwrap(); client.start_process("job", &job.name, None).await.unwrap(); while let Some(Ok(_)) = stream.next().await {} let job = client.inspect_job(&job.name).await.unwrap(); diff --git a/crates/nanocld_client/src/process.rs b/crates/nanocld_client/src/process.rs index f38fdb753..a103b153c 100644 --- a/crates/nanocld_client/src/process.rs +++ b/crates/nanocld_client/src/process.rs @@ -6,7 +6,10 @@ use nanocl_error::http::HttpResult; use nanocl_error::http_client::{HttpClientResult, HttpClientError}; use nanocl_stubs::generic::{GenericNspQuery, GenericFilter, GenericListQuery}; -use nanocl_stubs::process::{Process, ProcessLogQuery, ProcessOutputLog}; +use nanocl_stubs::process::{ + Process, ProcessLogQuery, ProcessOutputLog, ProcessWaitQuery, + ProcessWaitResponse, +}; use super::NanocldClient; @@ -155,6 +158,28 @@ impl NanocldClient { .await?; Ok(()) } + + /// A stream is returned, data are sent when processes reach status + /// + /// ## Example + /// + /// ```no_run,ignore + /// use nanocld_client::NanocldClient; + /// + /// let client = NanocldClient::connect_to("http://localhost:8585", None); + /// let stream = client.wait_process("job", "my_job", None).await.unwrap(); + /// ``` + pub async fn wait_process( + &self, + kind: &str, + name: &str, + query: Option<&ProcessWaitQuery>, + ) -> HttpClientResult>> { + let res = self + .send_get(&format!("{}/{kind}/{name}/wait", Self::PROCESS_PATH), query) + .await?; + Ok(Self::res_stream(res).await) + } } #[cfg(test)] From a630b4fc3aabfdc414f5c82280acdc063ad8472e Mon Sep 17 00:00:00 2001 From: leon3s Date: Mon, 8 Jan 2024 16:14:14 +0100 Subject: [PATCH 10/14] fix tests --- bin/nanocld/specs/swagger.yaml | 76 ++++++++++++++++----------------- bin/nanocld/src/services/job.rs | 2 +- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index 54dfe56d0..c0f614d6c 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -854,6 +854,44 @@ paths: type: array items: $ref: '#/components/schemas/Process' + /processes/{kind}/{name}/kill: + post: + tags: + - Processes + summary: Send a signal to processes of given kind and name + description: Send a signal to processes of given kind and name + operationId: kill_process + parameters: + - name: kind + in: path + description: Kind of the process + required: true + schema: + type: string + example: cargo + - name: name + in: path + description: Name of the process + required: true + schema: + type: string + example: deploy-example + - name: namespace + in: query + description: Namespace where the process belongs is needed + required: false + schema: + type: string + nullable: true + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CargoKillOptions' + required: true + responses: + '200': + description: Process instances killed /processes/{kind}/{name}/logs: get: tags: @@ -1038,44 +1076,6 @@ paths: description: Job wait '404': description: Job does not exist - /processes/{name}/kill: - post: - tags: - - Processes - summary: Send a signal to processes of given kind and name - description: Send a signal to processes of given kind and name - operationId: kill_process - parameters: - - name: kind - in: path - description: Kind of the process - required: true - schema: - type: string - example: cargo - - name: name - in: path - description: Name of the process - required: true - schema: - type: string - example: deploy-example - - name: namespace - in: query - description: Namespace where the process belongs is needed - required: false - schema: - type: string - nullable: true - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/CargoKillOptions' - required: true - responses: - '200': - description: Process instances killed /resource/kinds: get: tags: diff --git a/bin/nanocld/src/services/job.rs b/bin/nanocld/src/services/job.rs index c15362fc4..8c1dcce6d 100644 --- a/bin/nanocld/src/services/job.rs +++ b/bin/nanocld/src/services/job.rs @@ -163,7 +163,7 @@ mod tests { let _ = res.json::>().await.unwrap(); let res = client .send_get( - &format!("{job_endpoint}/wait"), + &format!("/processes/job/{}/wait", &job.name), Some(&serde_json::json!({ "Condition": "yoloh" })), From 7fd9daa4f0fdb931afe9b8b0401f4a1ce691ab23 Mon Sep 17 00:00:00 2001 From: leon3s Date: Tue, 9 Jan 2024 12:21:04 +0100 Subject: [PATCH 11/14] move cargo stats utils --- Cargo.lock | 57 +++++++++++++------------------ bin/nanocld/src/services/cargo.rs | 8 +++-- bin/nanocld/src/utils/cargo.rs | 20 ++--------- 3 files changed, 32 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40093a38e..86ede8f69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,9 +335,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9" [[package]] name = "bit-set" @@ -507,9 +507,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.13" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", "clap_derive", @@ -517,9 +517,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.12" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -624,11 +624,10 @@ dependencies = [ [[package]] name = "crossbeam" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eb9105919ca8e40d437fc9cbb8f1975d916f1bd28afe795a48aae32a2cc8920" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" dependencies = [ - "cfg-if", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -638,54 +637,46 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" +checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.17" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-queue" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc6598521bb5a83d491e8c1fe51db7296019d2ca3cb93cc6c2a20369a4d78a2" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-utils" -version = "0.8.18" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1201,9 +1192,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "js-sys", @@ -1627,9 +1618,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "libredox" @@ -3805,9 +3796,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-bag" -version = "1.4.3" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ce5bb364b23e66b528d03168df78b38c0f7b6fe17386928f29d5ab2e7cb2f7" +checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" [[package]] name = "vcpkg" diff --git a/bin/nanocld/src/services/cargo.rs b/bin/nanocld/src/services/cargo.rs index b66bb1802..3df7eea24 100644 --- a/bin/nanocld/src/services/cargo.rs +++ b/bin/nanocld/src/services/cargo.rs @@ -1,10 +1,11 @@ +use bollard_next::container::Stats; use ntex::web; use nanocl_error::{http::HttpResult, io::IoResult}; use nanocl_stubs::{ generic::{GenericNspQuery, GenericListNspQuery}, - cargo::{CargoDeleteQuery, CargoStatsQuery}, + cargo::{CargoStats, CargoDeleteQuery, CargoStatsQuery}, cargo_spec::{CargoSpecPartial, CargoSpecUpdate}, }; @@ -275,7 +276,10 @@ pub async fn stats_cargo( ) -> HttpResult { let namespace = utils::key::resolve_nsp(&qs.namespace); let key = utils::key::gen_key(&namespace, &path.1); - let stream = utils::cargo::get_stats(&key, &qs, &state.docker_api)?; + let stream = state + .docker_api + .stats(&format!("{key}.c"), Some(qs.clone().into())); + let stream = utils::stream::transform_stream::(stream); Ok( web::HttpResponse::Ok() .content_type("application/vdn.nanocl.raw-stream") diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index 275070223..c377ed1a1 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -1,4 +1,3 @@ -use ntex::util::Bytes; use futures::StreamExt; use futures_util::stream::FuturesUnordered; @@ -7,13 +6,13 @@ use nanocl_error::http::{HttpError, HttpResult}; use bollard_next::{ service::{HostConfig, RestartPolicy, RestartPolicyNameEnum}, container::{ - Stats, StartContainerOptions, WaitContainerOptions, RemoveContainerOptions, + StartContainerOptions, WaitContainerOptions, RemoveContainerOptions, }, }; use nanocl_stubs::{ process::Process, generic::{GenericListNspQuery, GenericClause, GenericFilter}, - cargo::{Cargo, CargoSummary, CargoStats, CargoStatsQuery}, + cargo::{Cargo, CargoSummary}, }; use crate::{ @@ -23,8 +22,6 @@ use crate::{ objects::generic::ObjProcess, }; -use super::stream::transform_stream; - /// Container to execute before the cargo instances async fn execute_before(cargo: &Cargo, state: &SystemState) -> HttpResult<()> { match cargo.spec.init_container.clone() { @@ -281,16 +278,3 @@ pub async fn list( } Ok(cargo_summaries) } - -/// Get the stats of a cargo instance -/// The cargo name can be used if the cargo has only one instance -pub fn get_stats( - name: &str, - query: &CargoStatsQuery, - docker_api: &bollard_next::Docker, -) -> HttpResult>> { - let stream = - docker_api.stats(&format!("{name}.c"), Some(query.clone().into())); - let stream = transform_stream::(stream); - Ok(stream) -} From 1fa270f89b55d32f29f8d2a88f95f6f66cead049 Mon Sep 17 00:00:00 2001 From: leon3s Date: Tue, 9 Jan 2024 12:37:36 +0100 Subject: [PATCH 12/14] rename func --- bin/nanocld/src/objects/cargo.rs | 2 +- bin/nanocld/src/objects/generic/process.rs | 8 ++++---- bin/nanocld/src/objects/vm.rs | 4 ++-- bin/nanocld/src/services/process.rs | 24 +++++++++++----------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index 617f0e829..f242c550c 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -136,7 +136,7 @@ impl ObjPutByPk for CargoDb { Ok(instances) => instances, }; // start created containers - match CargoDb::start_process_by_kind_pk(pk, state).await { + match CargoDb::start_process_by_kind_key(pk, state).await { Err(err) => { log::error!( "Unable to start cargo instance {} : {err}", diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs index c5a5fd622..71d1ce6ae 100644 --- a/bin/nanocld/src/objects/generic/process.rs +++ b/bin/nanocld/src/objects/generic/process.rs @@ -107,7 +107,7 @@ pub trait ObjProcess { Process::try_from(process).map_err(HttpError::from) } - async fn start_process_by_kind_pk( + async fn start_process_by_kind_key( kind_pk: &str, state: &SystemState, ) -> HttpResult<()> { @@ -130,7 +130,7 @@ pub trait ObjProcess { Ok(()) } - async fn stop_process_by_kind_pk( + async fn stop_process_by_kind_key( kind_pk: &str, state: &SystemState, ) -> HttpResult<()> { @@ -153,7 +153,7 @@ pub trait ObjProcess { Ok(()) } - async fn restart_process_by_kind_pk( + async fn restart_process_by_kind_key( pk: &str, state: &SystemState, ) -> HttpResult<()> { @@ -176,7 +176,7 @@ pub trait ObjProcess { Ok(()) } - async fn kill_process_by_kind_pk( + async fn kill_process_by_kind_key( pk: &str, opts: &CargoKillOptions, state: &SystemState, diff --git a/bin/nanocld/src/objects/vm.rs b/bin/nanocld/src/objects/vm.rs index ee6692ad7..c0b9051af 100644 --- a/bin/nanocld/src/objects/vm.rs +++ b/bin/nanocld/src/objects/vm.rs @@ -98,7 +98,7 @@ impl ObjPutByPk for VmDb { ) -> HttpResult { let vm = VmDb::transform_read_by_pk(pk, &state.pool).await?; let container_name = format!("{}.v", &vm.spec.vm_key); - VmDb::stop_process_by_kind_pk(pk, state).await?; + VmDb::stop_process_by_kind_key(pk, state).await?; VmDb::del_process_by_pk( &container_name, None::, @@ -114,7 +114,7 @@ impl ObjPutByPk for VmDb { .await?; let image = VmImageDb::read_by_pk(&vm.spec.disk.image, &state.pool).await?; utils::vm::create_instance(&vm, &image, false, state).await?; - VmDb::start_process_by_kind_pk(&vm.spec.vm_key, state).await?; + VmDb::start_process_by_kind_key(&vm.spec.vm_key, state).await?; Ok(vm) } } diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index f243d8dcd..3ef6c09aa 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -137,13 +137,13 @@ pub async fn start_process( let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { - VmDb::start_process_by_kind_pk(&kind_pk, &state).await?; + VmDb::start_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Job => { - JobDb::start_process_by_kind_pk(&kind_pk, &state).await?; + JobDb::start_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Cargo => { - CargoDb::start_process_by_kind_pk(&kind_pk, &state).await?; + CargoDb::start_process_by_kind_key(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) @@ -174,13 +174,13 @@ pub async fn restart_process( let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { - VmDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + VmDb::restart_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Job => { - JobDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + JobDb::restart_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Cargo => { - CargoDb::restart_process_by_kind_pk(&kind_pk, &state).await?; + CargoDb::restart_process_by_kind_key(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) @@ -211,13 +211,13 @@ pub async fn stop_process( let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { - VmDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + VmDb::stop_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Job => { - JobDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + JobDb::stop_process_by_kind_key(&kind_pk, &state).await?; } ProcessKind::Cargo => { - CargoDb::stop_process_by_kind_pk(&kind_pk, &state).await?; + CargoDb::stop_process_by_kind_key(&kind_pk, &state).await?; } } Ok(web::HttpResponse::Accepted().finish()) @@ -250,13 +250,13 @@ pub async fn kill_process( let kind_pk = utils::key::gen_kind_key(&kind, &name, &qs.namespace); match &kind { ProcessKind::Vm => { - VmDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + VmDb::kill_process_by_kind_key(&kind_pk, &payload, &state).await?; } ProcessKind::Job => { - JobDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + JobDb::kill_process_by_kind_key(&kind_pk, &payload, &state).await?; } ProcessKind::Cargo => { - CargoDb::kill_process_by_kind_pk(&kind_pk, &payload, &state).await?; + CargoDb::kill_process_by_kind_key(&kind_pk, &payload, &state).await?; } } Ok(web::HttpResponse::Ok().into()) From afd4a4798f4dc0422132e9c28e61b20764bafe7e Mon Sep 17 00:00:00 2001 From: leon3s Date: Tue, 9 Jan 2024 13:04:20 +0100 Subject: [PATCH 13/14] fix init container error reporting --- bin/nanocld/src/models/cargo.rs | 3 ++ bin/nanocld/src/models/vm.rs | 3 ++ bin/nanocld/src/objects/cargo.rs | 11 ++++- bin/nanocld/src/repositories/cargo.rs | 40 +++++++++++++-- bin/nanocld/src/services/cargo.rs | 4 +- bin/nanocld/src/utils/cargo.rs | 70 +++++++++------------------ bin/nanocld/src/utils/process.rs | 3 +- 7 files changed, 77 insertions(+), 57 deletions(-) diff --git a/bin/nanocld/src/models/cargo.rs b/bin/nanocld/src/models/cargo.rs index e66e3459f..75f92c505 100644 --- a/bin/nanocld/src/models/cargo.rs +++ b/bin/nanocld/src/models/cargo.rs @@ -40,17 +40,20 @@ pub struct CargoUpdateDb { pub spec_key: Option, } +/// Arguments to create a new cargo obj pub struct CargoObjCreateIn { pub namespace: String, pub spec: CargoSpecPartial, pub version: String, } +/// Arguments to create a new cargo history entry pub struct CargoObjPutIn { pub spec: CargoSpecPartial, pub version: String, } +/// Arguments to update a cargo history entry pub struct CargoObjPatchIn { pub spec: CargoSpecUpdate, pub version: String, diff --git a/bin/nanocld/src/models/vm.rs b/bin/nanocld/src/models/vm.rs index 71c123df9..a8f541505 100644 --- a/bin/nanocld/src/models/vm.rs +++ b/bin/nanocld/src/models/vm.rs @@ -41,17 +41,20 @@ pub struct VmUpdateDb { pub spec_key: Option, } +/// Arguments to create a new vm obj pub struct VmObjCreateIn { pub namespace: String, pub spec: VmSpecPartial, pub version: String, } +/// Arguments to create a new vm history entry pub struct VmObjPutIn { pub spec: VmSpecPartial, pub version: String, } +/// Arguments to update a vm history entry pub struct VmObjPatchIn { pub spec: VmSpecUpdate, pub version: String, diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index f242c550c..aba5ead87 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -130,9 +130,13 @@ impl ObjPutByPk for CargoDb { }; let processes = ProcessDb::read_by_kind_key(pk, &state.pool).await?; // Create instance with the new spec + let mut error = None; let new_instances = match utils::cargo::create_instances(&cargo, number, state).await { - Err(_) => Vec::default(), + Err(err) => { + error = Some(err); + Vec::default() + } Ok(instances) => instances, }; // start created containers @@ -160,7 +164,10 @@ impl ObjPutByPk for CargoDb { .await?; } } - Ok(cargo) + match error { + Some(err) => Err(err), + None => Ok(cargo), + } } } diff --git a/bin/nanocld/src/repositories/cargo.rs b/bin/nanocld/src/repositories/cargo.rs index c48699055..ac30f8ebb 100644 --- a/bin/nanocld/src/repositories/cargo.rs +++ b/bin/nanocld/src/repositories/cargo.rs @@ -5,19 +5,21 @@ use diesel::prelude::*; use futures_util::{stream::FuturesUnordered, StreamExt}; use nanocl_error::{ io::{IoError, IoResult}, - http::HttpResult, + http::{HttpResult, HttpError}, }; use nanocl_stubs::{ - generic::{GenericFilter, GenericClause}, - cargo::{Cargo, CargoDeleteQuery}, + generic::{GenericFilter, GenericClause, GenericListNspQuery}, + cargo::{Cargo, CargoDeleteQuery, CargoSummary}, cargo_spec::{CargoSpecPartial, CargoSpec}, }; use crate::{ gen_multiple, gen_where4string, utils, objects::generic::*, - models::{Pool, CargoDb, SpecDb, CargoUpdateDb, SystemState, NamespaceDb}, + models::{ + Pool, CargoDb, SpecDb, CargoUpdateDb, SystemState, NamespaceDb, ProcessDb, + }, schema::cargoes, }; @@ -205,4 +207,34 @@ impl CargoDb { .collect::>>()?; Ok(()) } + + /// List the cargoes for the given query + pub async fn list( + query: &GenericListNspQuery, + state: &SystemState, + ) -> HttpResult> { + let namespace = utils::key::resolve_nsp(&query.namespace); + let filter = GenericFilter::try_from(query.clone()) + .map_err(HttpError::bad_request)? + .r#where("namespace_name", GenericClause::Eq(namespace.clone())); + NamespaceDb::read_by_pk(&namespace, &state.pool).await?; + let cargoes = CargoDb::transform_read_by(&filter, &state.pool).await?; + let mut cargo_summaries = Vec::new(); + for cargo in cargoes { + let spec = SpecDb::read_by_pk(&cargo.spec.key, &state.pool) + .await? + .try_to_cargo_spec()?; + let processes = + ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; + let (_, _, _, running) = utils::process::count_status(&processes); + cargo_summaries.push(CargoSummary { + created_at: cargo.created_at, + namespace_name: cargo.namespace_name, + instance_total: processes.len(), + instance_running: running, + spec: spec.clone(), + }); + } + Ok(cargo_summaries) + } } diff --git a/bin/nanocld/src/services/cargo.rs b/bin/nanocld/src/services/cargo.rs index 3df7eea24..5e1c3f7ff 100644 --- a/bin/nanocld/src/services/cargo.rs +++ b/bin/nanocld/src/services/cargo.rs @@ -1,5 +1,5 @@ -use bollard_next::container::Stats; use ntex::web; +use bollard_next::container::Stats; use nanocl_error::{http::HttpResult, io::IoResult}; @@ -37,7 +37,7 @@ pub async fn list_cargo( state: web::types::State, qs: web::types::Query, ) -> HttpResult { - let cargoes = utils::cargo::list(&qs, &state).await?; + let cargoes = CargoDb::list(&qs, &state).await?; Ok(web::HttpResponse::Ok().json(&cargoes)) } diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index c377ed1a1..4ccd87c54 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -9,16 +9,12 @@ use bollard_next::{ StartContainerOptions, WaitContainerOptions, RemoveContainerOptions, }, }; -use nanocl_stubs::{ - process::Process, - generic::{GenericListNspQuery, GenericClause, GenericFilter}, - cargo::{Cargo, CargoSummary}, -}; +use nanocl_stubs::{cargo::Cargo, process::Process}; use crate::{ utils, repositories::generic::*, - models::{SystemState, CargoDb, ProcessDb, NamespaceDb, SecretDb, SpecDb}, + models::{SystemState, CargoDb, SecretDb}, objects::generic::ObjProcess, }; @@ -68,12 +64,32 @@ async fn execute_before(cargo: &Cargo, state: &SystemState) -> HttpResult<()> { Some(error) => error.message.unwrap_or("Unknown error".into()), None => "Unknown error".into(), }; + state + .docker_api + .remove_container( + &name, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?; return Err(HttpError::internal_server_error(format!( "Error while waiting for before container: {error}" ))); } } Err(err) => { + state + .docker_api + .remove_container( + &name, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?; return Err(HttpError::internal_server_error(format!( "Error while waiting for before container: {err}" ))); @@ -236,45 +252,3 @@ pub async fn delete_instances( .into_iter() .collect::>() } - -/// List the cargoes for the given query -pub async fn list( - query: &GenericListNspQuery, - state: &SystemState, -) -> HttpResult> { - let namespace = utils::key::resolve_nsp(&query.namespace); - let filter = GenericFilter::try_from(query.clone()) - .map_err(|err| { - HttpError::bad_request(format!("Invalid query string: {}", err)) - })? - .r#where("namespace_name", GenericClause::Eq(namespace.clone())); - // ensure namespace exists - NamespaceDb::read_by_pk(&namespace, &state.pool).await?; - let cargoes = CargoDb::transform_read_by(&filter, &state.pool).await?; - let mut cargo_summaries = Vec::new(); - for cargo in cargoes { - let spec = SpecDb::read_by_pk(&cargo.spec.key, &state.pool) - .await? - .try_to_cargo_spec()?; - let instances = - ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; - let mut running_instances = 0; - for instance in &instances { - let state = instance.data.state.clone().unwrap_or_default(); - if state.restarting.unwrap_or_default() { - continue; - } - if state.running.unwrap_or_default() { - running_instances += 1; - } - } - cargo_summaries.push(CargoSummary { - created_at: cargo.created_at, - namespace_name: cargo.namespace_name, - instance_total: instances.len(), - instance_running: running_instances, - spec: spec.clone(), - }); - } - Ok(cargo_summaries) -} diff --git a/bin/nanocld/src/utils/process.rs b/bin/nanocld/src/utils/process.rs index cbfbcf719..7a9f6e99c 100644 --- a/bin/nanocld/src/utils/process.rs +++ b/bin/nanocld/src/utils/process.rs @@ -1,6 +1,7 @@ use nanocl_stubs::process::Process; -/// Count the number of instances running failing or success +/// Count the status for the given instances +/// Return a tuple with the total, failed, success and running instances pub fn count_status(instances: &[Process]) -> (usize, usize, usize, usize) { let mut instance_failed = 0; let mut instance_success = 0; From 989a3fa59edfc9fc8f36ed3f681815d2b638d7da Mon Sep 17 00:00:00 2001 From: leon3s Date: Tue, 9 Jan 2024 13:10:40 +0100 Subject: [PATCH 14/14] remove delete for debug --- bin/nanocld/src/utils/cargo.rs | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index 4ccd87c54..21c3d4734 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -34,6 +34,7 @@ async fn execute_before(cargo: &Cargo, state: &SystemState) -> HttpResult<()> { let mut labels = before.labels.to_owned().unwrap_or_default(); labels.insert("io.nanocl.c".to_owned(), cargo.spec.cargo_key.to_owned()); labels.insert("io.nanocl.n".to_owned(), cargo.namespace_name.to_owned()); + labels.insert("io.nanocl.init-c".to_owned(), "true".to_owned()); labels.insert( "com.docker.compose.project".into(), format!("nanocl_{}", cargo.namespace_name), @@ -64,32 +65,12 @@ async fn execute_before(cargo: &Cargo, state: &SystemState) -> HttpResult<()> { Some(error) => error.message.unwrap_or("Unknown error".into()), None => "Unknown error".into(), }; - state - .docker_api - .remove_container( - &name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await?; return Err(HttpError::internal_server_error(format!( "Error while waiting for before container: {error}" ))); } } Err(err) => { - state - .docker_api - .remove_container( - &name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await?; return Err(HttpError::internal_server_error(format!( "Error while waiting for before container: {err}" )));