From 4e8a48e107b26afaf137ab06fd839ad1a5a8bcdc Mon Sep 17 00:00:00 2001 From: leon3s Date: Fri, 5 Jan 2024 15:39:05 +0100 Subject: [PATCH] more mess --- bin/nanocld/specs/swagger.yaml | 10 +-- bin/nanocld/src/models/system.rs | 24 +++++- bin/nanocld/src/models/vm.rs | 7 ++ bin/nanocld/src/objects/cargo.rs | 66 ++++++++++++++-- bin/nanocld/src/objects/generic/create.rs | 8 +- bin/nanocld/src/objects/generic/delete.rs | 8 +- bin/nanocld/src/objects/generic/patch.rs | 8 +- bin/nanocld/src/objects/job.rs | 91 +++++++++++++++++++++++ bin/nanocld/src/objects/mod.rs | 2 + bin/nanocld/src/objects/resource.rs | 5 +- bin/nanocld/src/objects/vm.rs | 73 ++++++++++++++++++ bin/nanocld/src/services/job.rs | 15 ++-- bin/nanocld/src/services/resource.rs | 16 ++-- bin/nanocld/src/services/vm.rs | 14 +++- bin/nanocld/src/subsystem/event.rs | 3 +- bin/nanocld/src/subsystem/init.rs | 6 +- bin/nanocld/src/utils/cargo.rs | 75 +++---------------- bin/nanocld/src/utils/event_emitter.rs | 25 ------- bin/nanocld/src/utils/job.rs | 88 +++------------------- bin/nanocld/src/utils/mod.rs | 1 - bin/nanocld/src/utils/process.rs | 4 +- bin/nanocld/src/utils/resource.rs | 14 ++-- bin/nanocld/src/utils/vm.rs | 73 +----------------- bin/ncproxy/src/subsystem/event.rs | 4 +- bin/ncvpnkit/src/main.rs | 2 +- crates/nanocl_stubs/src/job.rs | 15 ++++ crates/nanocl_stubs/src/system.rs | 6 +- crates/nanocld_client/src/resource.rs | 4 +- 28 files changed, 346 insertions(+), 321 deletions(-) create mode 100644 bin/nanocld/src/objects/job.rs create mode 100644 bin/nanocld/src/objects/vm.rs delete mode 100644 bin/nanocld/src/utils/event_emitter.rs diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index 60b1c29ba..f470c23b8 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -1298,13 +1298,13 @@ paths: put: tags: - Resources - summary: Patch a resource (update its version and/or spec) and create a new history - description: Patch a resource (update its version and/or spec) and create a new history + summary: Create a new resource spec and add history entry + description: Create a new resource spec and add history entry operationId: put_resource parameters: - name: name in: path - description: The resource name to patch + description: Name of the resource required: true schema: type: string @@ -1316,13 +1316,13 @@ paths: required: true responses: '200': - description: The patched resource + description: Resource updated content: application/json: schema: $ref: '#/components/schemas/Resource' '404': - description: Resource is not existing + description: Resource does not exit content: application/json: schema: diff --git a/bin/nanocld/src/models/system.rs b/bin/nanocld/src/models/system.rs index e1e32566c..445812bb0 100644 --- a/bin/nanocld/src/models/system.rs +++ b/bin/nanocld/src/models/system.rs @@ -9,7 +9,7 @@ use futures_util::{SinkExt, StreamExt}; use nanocl_error::io::{IoResult, FromIo, IoError}; use nanocl_stubs::{ config::DaemonConfig, - system::{Event, EventPartial}, + system::{Event, EventPartial, NativeEventAction, EventActor, EventKind}, }; use crate::{version, utils, repositories::generic::*}; @@ -206,4 +206,26 @@ impl SystemState { pub fn subscribe_raw(&self) -> IoResult { self.event_manager.raw.subscribe() } + + pub fn emit_normal_native_action( + &self, + actor: &A, + action: NativeEventAction, + ) where + A: Into + Clone, + { + let actor = actor.clone().into(); + let event = EventPartial { + reporting_controller: "nanocl.io/core".to_owned(), + reporting_node: self.config.hostname.clone(), + kind: EventKind::Normal, + action: action.to_string(), + related: None, + reason: "state_sync".to_owned(), + note: None, + metadata: None, + actor: Some(actor), + }; + self.spawn_emit_event(event); + } } diff --git a/bin/nanocld/src/models/vm.rs b/bin/nanocld/src/models/vm.rs index da098f700..99750ffd1 100644 --- a/bin/nanocld/src/models/vm.rs +++ b/bin/nanocld/src/models/vm.rs @@ -1,4 +1,5 @@ use diesel::prelude::*; +use nanocl_stubs::vm_spec::VmSpecPartial; use crate::schema::vms; @@ -39,3 +40,9 @@ pub struct VmUpdateDb { /// The spec key reference pub spec_key: Option, } + +pub struct VmObjCreateIn { + pub namespace: String, + pub spec: VmSpecPartial, + pub version: String, +} diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index 840026afa..3317ca276 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -1,9 +1,16 @@ -use nanocl_error::http::HttpResult; -use nanocl_stubs::cargo::{Cargo, CargoDeleteQuery}; +use futures_util::{stream::FuturesUnordered, StreamExt}; +use bollard_next::container::RemoveContainerOptions; + +use nanocl_error::http::{HttpResult, HttpError}; +use nanocl_stubs::{ + cargo::{Cargo, CargoDeleteQuery}, + cargo_spec::ReplicationMode, +}; use crate::{ utils, - models::{CargoDb, SystemState, CargoObjCreateIn}, + repositories::generic::*, + models::{CargoDb, SystemState, CargoObjCreateIn, ProcessDb, SpecDb}, }; use super::generic::*; @@ -16,9 +23,32 @@ impl ObjCreate for CargoDb { obj: &Self::ObjCreateIn, state: &SystemState, ) -> HttpResult { - let cargo = - utils::cargo::create(&obj.namespace, &obj.spec, &obj.version, state) - .await?; + let cargo = CargoDb::create_from_spec( + &obj.namespace, + &obj.spec, + &obj.version, + &state.pool, + ) + .await?; + let number = if let Some(mode) = &cargo.spec.replication { + match mode { + ReplicationMode::Static(replication_static) => { + replication_static.number + } + ReplicationMode::Auto => 1, + ReplicationMode::Unique => 1, + ReplicationMode::UniqueByNode => 1, + _ => 1, + } + } else { + 1 + }; + if let Err(err) = + utils::cargo::create_instances(&cargo, number, state).await + { + CargoDb::del_by_pk(&cargo.spec.cargo_key, &state.pool).await?; + return Err(err); + } Ok(cargo) } } @@ -32,7 +62,29 @@ impl ObjDelByPk for CargoDb { opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult { - let cargo = utils::cargo::delete_by_key(key, opts.force, state).await?; + let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; + let processes = + ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; + processes + .into_iter() + .map(|process| async move { + utils::process::remove( + &process.key, + Some(RemoveContainerOptions { + force: opts.force.unwrap_or(false), + ..Default::default() + }), + state, + ) + .await + }) + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + CargoDb::del_by_pk(key, &state.pool).await?; + SpecDb::del_by_kind_key(key, &state.pool).await?; Ok(cargo) } } diff --git a/bin/nanocld/src/objects/generic/create.rs b/bin/nanocld/src/objects/generic/create.rs index 0015177ba..7add99db4 100644 --- a/bin/nanocld/src/objects/generic/create.rs +++ b/bin/nanocld/src/objects/generic/create.rs @@ -1,8 +1,6 @@ use nanocl_error::http::HttpResult; use nanocl_stubs::system::{EventActor, NativeEventAction}; -use crate::utils; - use crate::models::SystemState; /// A Create trait for all objects in Nanocl @@ -28,11 +26,7 @@ pub trait ObjCreate { Self::ObjCreateOut: Into + Clone, { let obj = Self::fn_create_obj(obj, state).await?; - utils::event_emitter::emit_normal_native_action( - &obj, - NativeEventAction::Create, - state, - ); + state.emit_normal_native_action(&obj, NativeEventAction::Create); Ok(obj) } } diff --git a/bin/nanocld/src/objects/generic/delete.rs b/bin/nanocld/src/objects/generic/delete.rs index bc3abe246..5c4ad9672 100644 --- a/bin/nanocld/src/objects/generic/delete.rs +++ b/bin/nanocld/src/objects/generic/delete.rs @@ -1,8 +1,6 @@ use nanocl_error::http::HttpResult; use nanocl_stubs::system::{EventActor, NativeEventAction}; -use crate::utils; - use crate::models::SystemState; pub trait ObjDelByPk { @@ -24,11 +22,7 @@ pub trait ObjDelByPk { Self::ObjDelOut: Into + Clone, { let obj = Self::fn_del_obj_by_pk(key, opts, state).await?; - utils::event_emitter::emit_normal_native_action( - &obj, - NativeEventAction::Delete, - state, - ); + state.emit_normal_native_action(&obj, NativeEventAction::Delete); Ok(obj) } } diff --git a/bin/nanocld/src/objects/generic/patch.rs b/bin/nanocld/src/objects/generic/patch.rs index b34da38a7..223f44806 100644 --- a/bin/nanocld/src/objects/generic/patch.rs +++ b/bin/nanocld/src/objects/generic/patch.rs @@ -1,8 +1,6 @@ use nanocl_error::http::HttpResult; use nanocl_stubs::system::{EventActor, NativeEventAction}; -use crate::utils; - use crate::models::SystemState; pub trait ObjPatchByPk { @@ -24,11 +22,7 @@ pub trait ObjPatchByPk { Self::ObjPatchOut: Into + Clone, { let obj = Self::fn_patch_obj_by_pk(pk, obj, state).await?; - utils::event_emitter::emit_normal_native_action( - &obj, - NativeEventAction::Patch, - state, - ); + state.emit_normal_native_action(&obj, NativeEventAction::Update); Ok(obj) } } diff --git a/bin/nanocld/src/objects/job.rs b/bin/nanocld/src/objects/job.rs new file mode 100644 index 000000000..22766d73a --- /dev/null +++ b/bin/nanocld/src/objects/job.rs @@ -0,0 +1,91 @@ +use bollard_next::container::RemoveContainerOptions; +use futures_util::{StreamExt, stream::FuturesUnordered}; + +use nanocl_error::http::{HttpResult, HttpError}; +use nanocl_stubs::job::{Job, JobPartial}; + +use crate::{ + utils, + repositories::generic::*, + models::{JobDb, ProcessDb}, +}; + +use super::generic::*; + +impl ObjCreate for JobDb { + type ObjCreateIn = JobPartial; + type ObjCreateOut = Job; + + async fn fn_create_obj( + obj: &Self::ObjCreateIn, + state: &crate::models::SystemState, + ) -> HttpResult { + let db_model = JobDb::try_from_partial(obj)?; + let job = JobDb::create_from(db_model, &state.pool) + .await? + .to_spec(obj); + job + .containers + .iter() + .map(|container| { + let job_name = job.name.clone(); + async move { + let mut container = container.clone(); + let mut labels = container.labels.clone().unwrap_or_default(); + labels.insert("io.nanocl.j".to_owned(), job_name.clone()); + container.labels = Some(labels); + let short_id = utils::key::generate_short_id(6); + let name = format!("{job_name}-{short_id}.j"); + utils::process::create(&name, "job", &job_name, container, state) + .await?; + Ok::<_, HttpError>(()) + } + }) + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + if let Some(schedule) = &job.schedule { + utils::job::add_cron_rule(&job, schedule, state).await?; + } + Ok(job) + } +} + +impl ObjDelByPk for JobDb { + type ObjDelOpts = (); + type ObjDelOut = Job; + + async fn fn_del_obj_by_pk( + pk: &str, + _opts: &Self::ObjDelOpts, + state: &crate::models::SystemState, + ) -> HttpResult { + let job = JobDb::read_by_pk(pk, &state.pool).await?.try_to_spec()?; + let processes = ProcessDb::read_by_kind_key(pk, &state.pool).await?; + processes + .into_iter() + .map(|process| async move { + utils::process::remove( + &process.key, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + state, + ) + .await + }) + .collect::>() + .collect::>>() + .await + .into_iter() + .collect::, _>>()?; + JobDb::del_by_pk(&job.name, &state.pool).await?; + if job.schedule.is_some() { + utils::job::remove_cron_rule(&job, state).await?; + } + Ok(job) + } +} diff --git a/bin/nanocld/src/objects/mod.rs b/bin/nanocld/src/objects/mod.rs index c1ca66d12..f707ff247 100644 --- a/bin/nanocld/src/objects/mod.rs +++ b/bin/nanocld/src/objects/mod.rs @@ -1,4 +1,6 @@ +mod job; mod cargo; +mod vm; mod secret; mod resource; diff --git a/bin/nanocld/src/objects/resource.rs b/bin/nanocld/src/objects/resource.rs index b05e5b7ad..e5531afb5 100644 --- a/bin/nanocld/src/objects/resource.rs +++ b/bin/nanocld/src/objects/resource.rs @@ -26,7 +26,7 @@ impl ObjCreate for ResourceDb { &obj.name ))); } - let obj = utils::resource::hook_create_resource(obj, &state.pool).await?; + let obj = utils::resource::hook_create(obj, &state.pool).await?; let resource = ResourceDb::create_from_spec(&obj, &state.pool).await?; Ok(resource) } @@ -42,8 +42,7 @@ impl ObjDelByPk for ResourceDb { state: &SystemState, ) -> HttpResult { let resource = ResourceDb::transform_read_by_pk(key, &state.pool).await?; - if let Err(err) = - utils::resource::hook_delete_resource(&resource, &state.pool).await + if let Err(err) = utils::resource::hook_delete(&resource, &state.pool).await { log::warn!("{err}"); } diff --git a/bin/nanocld/src/objects/vm.rs b/bin/nanocld/src/objects/vm.rs new file mode 100644 index 000000000..ced670658 --- /dev/null +++ b/bin/nanocld/src/objects/vm.rs @@ -0,0 +1,73 @@ +use nanocl_error::http::{HttpResult, HttpError}; +use nanocl_stubs::vm::Vm; + +use crate::{ + utils, + repositories::generic::*, + models::{VmDb, SystemState, VmObjCreateIn, VmImageDb, SpecDb}, +}; +use super::generic::*; + +impl ObjCreate for VmDb { + type ObjCreateIn = VmObjCreateIn; + type ObjCreateOut = Vm; + + async fn fn_create_obj( + obj: &Self::ObjCreateIn, + state: &SystemState, + ) -> HttpResult { + let name = &obj.spec.name; + let namespace = &obj.namespace; + let version = &obj.version; + log::debug!( + "Creating VM {name} in namespace {namespace} with version: {version}", + ); + let vm_key = utils::key::gen_key(namespace, name); + let mut vm = obj.spec.clone(); + if VmDb::read_by_pk(&vm_key, &state.pool).await.is_ok() { + return Err(HttpError::conflict(format!( + "VM with name {name} already exists in namespace {namespace}", + ))); + } + let image = VmImageDb::read_by_pk(&vm.disk.image, &state.pool).await?; + if image.kind.as_str() != "Base" { + return Err(HttpError::bad_request(format!("Image {} is not a base image please convert the snapshot into a base image first", &vm.disk.image))); + } + let snapname = format!("{}.{vm_key}", &image.name); + let size = vm.disk.size.unwrap_or(20); + log::debug!("Creating snapshot {snapname} with size {size}"); + let image = + utils::vm_image::create_snap(&snapname, size, &image, state).await?; + log::debug!("Snapshot {snapname} created"); + // Use the snapshot image + vm.disk.image = image.name.clone(); + vm.disk.size = Some(size); + let vm = + VmDb::create_from_spec(namespace, &vm, version, &state.pool).await?; + utils::vm::create_instance(&vm, &image, true, state).await?; + Ok(vm) + } +} + +impl ObjDelByPk for VmDb { + type ObjDelOpts = (); + type ObjDelOut = Vm; + + async fn fn_del_obj_by_pk( + key: &str, + _opts: &Self::ObjDelOpts, + state: &SystemState, + ) -> HttpResult { + let vm = VmDb::transform_read_by_pk(key, &state.pool).await?; + let options = bollard_next::container::RemoveContainerOptions { + force: true, + ..Default::default() + }; + let container_name = format!("{}.v", key); + utils::process::remove(&container_name, Some(options), state).await?; + VmDb::del_by_pk(key, &state.pool).await?; + SpecDb::del_by_kind_key(key, &state.pool).await?; + utils::vm_image::delete_by_name(&vm.spec.disk.image, &state.pool).await?; + Ok(vm) + } +} diff --git a/bin/nanocld/src/services/job.rs b/bin/nanocld/src/services/job.rs index 3b7ac0b97..3f0eb6f54 100644 --- a/bin/nanocld/src/services/job.rs +++ b/bin/nanocld/src/services/job.rs @@ -1,12 +1,15 @@ use ntex::web; -use nanocl_error::http::HttpResult; - use bollard_next::container::WaitContainerOptions; + +use nanocl_error::http::HttpResult; use nanocl_stubs::job::{JobPartial, JobWaitQuery}; -use crate::utils; -use crate::models::SystemState; +use crate::{ + utils, + objects::generic::*, + models::{SystemState, JobDb}, +}; /// List jobs #[cfg_attr(feature = "dev", utoipa::path( @@ -42,7 +45,7 @@ pub(crate) async fn create_job( _version: web::types::Path, payload: web::types::Json, ) -> HttpResult { - let job = utils::job::create(&payload, &state).await?; + let job = JobDb::create_obj(&payload, &state).await?; Ok(web::HttpResponse::Created().json(&job)) } @@ -64,7 +67,7 @@ pub(crate) async fn delete_job( state: web::types::State, path: web::types::Path<(String, String)>, ) -> HttpResult { - utils::job::delete_by_name(&path.1, &state).await?; + JobDb::del_obj_by_pk(&path.1, &(), &state).await?; Ok(web::HttpResponse::Accepted().finish()) } diff --git a/bin/nanocld/src/services/resource.rs b/bin/nanocld/src/services/resource.rs index a5e60ed82..cc03dc84e 100644 --- a/bin/nanocld/src/services/resource.rs +++ b/bin/nanocld/src/services/resource.rs @@ -103,21 +103,21 @@ pub(crate) async fn delete_resource( Ok(web::HttpResponse::Accepted().finish()) } -/// Patch a resource (update its version and/or spec) and create a new history +/// Create a new resource spec and add history entry #[cfg_attr(feature = "dev", utoipa::path( put, request_body = ResourceUpdate, tag = "Resources", path = "/resources/{name}", params( - ("name" = String, Path, description = "The resource name to patch") + ("name" = String, Path, description = "Name of the resource") ), responses( - (status = 200, description = "The patched resource", body = Resource), - (status = 404, description = "Resource is not existing", body = ApiError), + (status = 200, description = "Resource updated", body = Resource), + (status = 404, description = "Resource does not exit", body = ApiError), ), ))] -#[web::patch("/resources/{name}")] +#[web::put("/resources/{name}")] pub(crate) async fn put_resource( state: web::types::State, path: web::types::Path<(String, String)>, @@ -130,7 +130,7 @@ pub(crate) async fn put_resource( data: payload.data.clone(), metadata: payload.metadata.clone(), }; - let resource = utils::resource::patch(&new_resource, &state).await?; + let resource = utils::resource::put(&new_resource, &state).await?; Ok(web::HttpResponse::Ok().json(&resource)) } @@ -189,7 +189,7 @@ pub(crate) async fn revert_resource( data: history.data, metadata: history.metadata, }; - let resource = utils::resource::patch(&new_resource, &state).await?; + let resource = utils::resource::put(&new_resource, &state).await?; Ok(web::HttpResponse::Ok().json(&resource)) } @@ -379,7 +379,7 @@ mod tests { metadata: None, }; let mut res = client - .send_patch( + .send_put( &format!("{ENDPOINT}/{TEST_RESOURCE}"), Some(&new_resource), None::, diff --git a/bin/nanocld/src/services/vm.rs b/bin/nanocld/src/services/vm.rs index dd3c61809..9d6199d97 100644 --- a/bin/nanocld/src/services/vm.rs +++ b/bin/nanocld/src/services/vm.rs @@ -23,7 +23,8 @@ use nanocl_stubs::{ use crate::{ utils, - models::{SystemState, WsConState, SpecDb}, + objects::generic::*, + models::{SystemState, WsConState, SpecDb, VmDb, VmObjCreateIn}, }; /// List virtual machines @@ -96,7 +97,7 @@ pub(crate) async fn delete_vm( let name = path.1.to_owned(); let namespace = utils::key::resolve_nsp(&qs.namespace); let key = utils::key::gen_key(&namespace, &name); - utils::vm::delete_by_key(&key, true, &state).await?; + VmDb::del_obj_by_pk(&key, &(), &state).await?; Ok(web::HttpResponse::Ok().finish()) } @@ -121,8 +122,13 @@ pub(crate) async fn create_vm( qs: web::types::Query, ) -> HttpResult { let namespace = utils::key::resolve_nsp(&qs.namespace); - let item = utils::vm::create(&payload, &namespace, &path, &state).await?; - Ok(web::HttpResponse::Ok().json(&item)) + let obj = VmObjCreateIn { + namespace, + spec: payload.into_inner(), + version: path.into_inner(), + }; + let vm = VmDb::create_obj(&obj, &state).await?; + Ok(web::HttpResponse::Ok().json(&vm)) } /// List virtual machine histories diff --git a/bin/nanocld/src/subsystem/event.rs b/bin/nanocld/src/subsystem/event.rs index 1700c3b3c..0ef6b8d80 100644 --- a/bin/nanocld/src/subsystem/event.rs +++ b/bin/nanocld/src/subsystem/event.rs @@ -9,6 +9,7 @@ use nanocl_stubs::system::{Event, EventActorKind, NativeEventAction}; use crate::{ utils, + objects::generic::*, repositories::generic::*, models::{ SystemState, JobDb, ProcessDb, SystemEventReceiver, SystemEventKind, @@ -54,7 +55,7 @@ async fn job_ttl(e: Event, state: &SystemState) -> IoResult<()> { rt::spawn(async move { log::debug!("event::job_ttl: {} will be deleted in {ttl}s", job.name); ntex::time::sleep(std::time::Duration::from_secs(ttl as u64)).await; - let _ = utils::job::delete_by_name(&job.name, &state).await; + let _ = JobDb::del_obj_by_pk(&job.name, &(), &state).await; }); } Ok(()) diff --git a/bin/nanocld/src/subsystem/init.rs b/bin/nanocld/src/subsystem/init.rs index ddeaa5682..2d9e4bf51 100644 --- a/bin/nanocld/src/subsystem/init.rs +++ b/bin/nanocld/src/subsystem/init.rs @@ -178,10 +178,9 @@ mod tests { rt::spawn(async move { ntex::time::sleep(std::time::Duration::from_secs(1)).await; let actor = Resource::default(); - utils::event_emitter::emit_normal_native_action( + state_ptr.emit_normal_native_action( &actor, nanocl_stubs::system::NativeEventAction::Create, - &state_ptr, ); }); raw_sub.next().await; @@ -189,10 +188,9 @@ mod tests { rt::spawn(async move { ntex::time::sleep(std::time::Duration::from_secs(1)).await; let actor = Resource::default(); - utils::event_emitter::emit_normal_native_action( + state_ptr.emit_normal_native_action( &actor, nanocl_stubs::system::NativeEventAction::Create, - &state_ptr, ); }); let mut sub = state.subscribe().await.unwrap(); diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index a8f518bea..4bde66a3d 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -16,7 +16,7 @@ use nanocl_stubs::{ process::{Process, ProcessKind}, cargo::{ Cargo, CargoSummary, CargoInspect, CargoKillOptions, CargoScale, - CargoStats, CargoStatsQuery, + CargoStats, CargoStatsQuery, CargoDeleteQuery, }, cargo_spec::{CargoSpecPartial, CargoSpecUpdate, ReplicationMode, Config}, }; @@ -25,6 +25,7 @@ use crate::{ utils, repositories::generic::*, models::{SystemState, CargoDb, ProcessDb, NamespaceDb, SecretDb, SpecDb}, + objects::generic::ObjDelByPk, }; use super::stream::transform_stream; @@ -91,7 +92,7 @@ async fn execute_before( /// Example: cargo-key-1, cargo-key-2, cargo-key-3 /// If the number of instances is equal to 1, the container will be named with /// the cargo key. -async fn create_instances( +pub(crate) async fn create_instances( cargo: &Cargo, number: usize, state: &SystemState, @@ -296,34 +297,6 @@ async fn delete_instances( .collect::>() } -/// Create a cargo based on the given partial spec -/// And create his instances (containers). -pub(crate) async fn create( - namespace: &str, - spec: &CargoSpecPartial, - version: &str, - state: &SystemState, -) -> HttpResult { - let cargo = - CargoDb::create_from_spec(namespace, spec, version, &state.pool).await?; - let number = if let Some(mode) = &cargo.spec.replication { - match mode { - ReplicationMode::Static(replication_static) => replication_static.number, - ReplicationMode::Auto => 1, - ReplicationMode::Unique => 1, - ReplicationMode::UniqueByNode => 1, - _ => 1, - } - } else { - 1 - }; - if let Err(err) = create_instances(&cargo, number, state).await { - CargoDb::del_by_pk(&cargo.spec.cargo_key, &state.pool).await?; - return Err(err); - } - Ok(cargo) -} - /// Restart cargo instances (containers) by key pub(crate) async fn restart(key: &str, state: &SystemState) -> HttpResult<()> { let cargo = utils::cargo::inspect_by_key(key, state).await?; @@ -346,38 +319,6 @@ pub(crate) async fn restart(key: &str, state: &SystemState) -> HttpResult<()> { Ok(()) } -/// Delete a cargo by key with his given instances (containers). -pub(crate) async fn delete_by_key( - key: &str, - force: Option, - state: &SystemState, -) -> HttpResult { - let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; - let processes = - ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; - processes - .into_iter() - .map(|process| async move { - utils::process::remove( - &process.key, - Some(RemoveContainerOptions { - force: force.unwrap_or(false), - ..Default::default() - }), - state, - ) - .await - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - CargoDb::del_by_pk(key, &state.pool).await?; - SpecDb::del_by_kind_key(key, &state.pool).await?; - Ok(cargo) -} - /// A new history entry is added and the containers are updated /// with the new cargo specification pub(crate) async fn put( @@ -442,9 +383,6 @@ pub(crate) async fn put( .await?; } } - // state - // .event_emitter - // .spawn_emit_to_event(&cargo, NativeEventAction::Patched); Ok(cargo) } @@ -520,7 +458,12 @@ pub(crate) async fn delete_by_namespace( cargoes .into_iter() .map(|cargo| async move { - delete_by_key(&cargo.spec.cargo_key, None, state).await + CargoDb::del_obj_by_pk( + &cargo.spec.cargo_key, + &CargoDeleteQuery::default(), + state, + ) + .await }) .collect::>() .collect::>>() diff --git a/bin/nanocld/src/utils/event_emitter.rs b/bin/nanocld/src/utils/event_emitter.rs deleted file mode 100644 index 5c336c638..000000000 --- a/bin/nanocld/src/utils/event_emitter.rs +++ /dev/null @@ -1,25 +0,0 @@ -use nanocl_stubs::system::{NativeEventAction, EventActor, EventPartial, EventKind}; - -use crate::models::SystemState; - -pub fn emit_normal_native_action( - actor: &A, - action: NativeEventAction, - state: &SystemState, -) where - A: Into + Clone, -{ - let actor = actor.clone().into(); - let event = EventPartial { - reporting_controller: "nanocl.io/core".to_owned(), - reporting_node: state.config.hostname.clone(), - kind: EventKind::Normal, - action: action.to_string(), - related: None, - reason: "state_sync".to_owned(), - note: None, - metadata: None, - actor: Some(actor), - }; - state.spawn_emit_event(event); -} diff --git a/bin/nanocld/src/utils/job.rs b/bin/nanocld/src/utils/job.rs index 9328d0e96..f3fa18716 100644 --- a/bin/nanocld/src/utils/job.rs +++ b/bin/nanocld/src/utils/job.rs @@ -4,21 +4,17 @@ use futures_util::{ StreamExt, TryStreamExt, stream::{FuturesUnordered, select_all}, }; +use bollard_next::{ + service::ContainerWaitExitError, container::WaitContainerOptions, +}; use nanocl_error::{ io::{FromIo, IoError, IoResult}, http::{HttpError, HttpResult}, }; - -use bollard_next::{ - service::ContainerWaitExitError, - container::{RemoveContainerOptions, WaitContainerOptions}, -}; use nanocl_stubs::{ generic::GenericFilter, - job::{ - Job, JobPartial, JobInspect, JobWaitResponse, WaitCondition, JobSummary, - }, + job::{Job, JobInspect, JobWaitResponse, WaitCondition, JobSummary}, }; use crate::{ @@ -59,7 +55,7 @@ async fn exec_crontab() -> IoResult<()> { } /// Add a cron rule to the crontab to start a job at a given time -async fn add_cron_rule( +pub(crate) async fn add_cron_rule( item: &Job, schedule: &str, state: &SystemState, @@ -85,7 +81,10 @@ async fn add_cron_rule( } /// Remove a cron rule from the crontab for the given job -async fn remove_cron_rule(item: &Job, state: &SystemState) -> IoResult<()> { +pub(crate) async fn remove_cron_rule( + item: &Job, + state: &SystemState, +) -> IoResult<()> { let mut content = fs::read_to_string("/var/spool/cron/crontabs/root") .await .map_err(|err| err.map_err_context(|| "Cron job"))?; @@ -103,43 +102,6 @@ async fn remove_cron_rule(item: &Job, state: &SystemState) -> IoResult<()> { Ok(()) } -/// Create a job and with it's containers -pub(crate) async fn create( - item: &JobPartial, - state: &SystemState, -) -> HttpResult { - let db_model = JobDb::try_from_partial(item)?; - let job = JobDb::create_from(db_model, &state.pool) - .await? - .to_spec(item); - job - .containers - .iter() - .map(|container| { - let job_name = job.name.clone(); - async move { - let mut container = container.clone(); - let mut labels = container.labels.clone().unwrap_or_default(); - labels.insert("io.nanocl.j".to_owned(), job_name.clone()); - container.labels = Some(labels); - let short_id = utils::key::generate_short_id(6); - let name = format!("{job_name}-{short_id}.j"); - utils::process::create(&name, "job", &job_name, container, state) - .await?; - Ok::<_, HttpError>(()) - } - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - if let Some(schedule) = &job.schedule { - add_cron_rule(&job, schedule, state).await?; - } - Ok(job) -} - /// List all jobs pub(crate) async fn list(state: &SystemState) -> HttpResult> { let jobs = JobDb::read_by(&GenericFilter::default(), &state.pool).await?; @@ -172,38 +134,6 @@ pub(crate) async fn list(state: &SystemState) -> HttpResult> { Ok(job_summaries) } -/// Delete a job by name with his given instances (containers). -pub(crate) async fn delete_by_name( - name: &str, - state: &SystemState, -) -> HttpResult<()> { - let job = JobDb::read_by_pk(name, &state.pool).await?.try_to_spec()?; - let processes = ProcessDb::read_by_kind_key(name, &state.pool).await?; - processes - .into_iter() - .map(|process| async move { - utils::process::remove( - &process.key, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - state, - ) - .await - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::, _>>()?; - JobDb::del_by_pk(&job.name, &state.pool).await?; - if job.schedule.is_some() { - remove_cron_rule(&job, state).await?; - } - Ok(()) -} - /// Inspect a job by name and return a detailed view of the job pub(crate) async fn inspect_by_name( name: &str, diff --git a/bin/nanocld/src/utils/mod.rs b/bin/nanocld/src/utils/mod.rs index a8e9853fa..9e7428a17 100644 --- a/bin/nanocld/src/utils/mod.rs +++ b/bin/nanocld/src/utils/mod.rs @@ -17,7 +17,6 @@ pub(crate) mod metric; pub(crate) mod ctrl_client; pub(crate) mod process; pub(crate) mod server; -pub(crate) mod event_emitter; #[cfg(test)] pub mod tests { diff --git a/bin/nanocld/src/utils/process.rs b/bin/nanocld/src/utils/process.rs index ae830dcbf..ebc2b8987 100644 --- a/bin/nanocld/src/utils/process.rs +++ b/bin/nanocld/src/utils/process.rs @@ -27,11 +27,11 @@ async fn after( match kind { ProcessKind::Vm => { let vm = VmDb::transform_read_by_pk(kind_key, &state.pool).await?; - super::event_emitter::emit_normal_native_action(&vm, action, state); + state.emit_normal_native_action(&vm, action); } ProcessKind::Cargo => { let cargo = CargoDb::transform_read_by_pk(kind_key, &state.pool).await?; - super::event_emitter::emit_normal_native_action(&cargo, action, state); + state.emit_normal_native_action(&cargo, action); } ProcessKind::Job => { JobDb::update_pk( diff --git a/bin/nanocld/src/utils/resource.rs b/bin/nanocld/src/utils/resource.rs index a4abf3719..ddb687463 100644 --- a/bin/nanocld/src/utils/resource.rs +++ b/bin/nanocld/src/utils/resource.rs @@ -17,7 +17,7 @@ use super::ctrl_client::CtrlClient; /// If the resource is a Kind Kind, it will create a resource Kind with an associated version. /// To call a custom controller, the resource Kind must have a Url field in his config. /// Unless it must have a Schema field in his config that is a JSONSchema to validate the resource. -pub(crate) async fn hook_create_resource( +pub(crate) async fn hook_create( resource: &ResourcePartial, pool: &Pool, ) -> HttpResult { @@ -55,7 +55,7 @@ pub(crate) async fn hook_create_resource( /// This hook is called when a resource is deleted. /// It call a custom controller at a specific url. /// If the resource is a Kind Kind, it will delete the resource Kind with an associated version. -pub(crate) async fn hook_delete_resource( +pub(crate) async fn hook_delete( resource: &Resource, pool: &Pool, ) -> HttpResult<()> { @@ -75,16 +75,12 @@ pub(crate) async fn hook_delete_resource( /// This function patch a resource. /// It will call the hook_create_resource function to hook the resource. -pub(crate) async fn patch( +pub(crate) async fn put( resource: &ResourcePartial, state: &SystemState, ) -> HttpResult { - let resource = hook_create_resource(resource, &state.pool).await?; + let resource = hook_create(resource, &state.pool).await?; let res = ResourceDb::update_from_spec(&resource, &state.pool).await?; - super::event_emitter::emit_normal_native_action( - &res, - NativeEventAction::Patch, - state, - ); + state.emit_normal_native_action(&res, NativeEventAction::Update); Ok(res) } diff --git a/bin/nanocld/src/utils/vm.rs b/bin/nanocld/src/utils/vm.rs index ff8ef7f71..7df9a3153 100644 --- a/bin/nanocld/src/utils/vm.rs +++ b/bin/nanocld/src/utils/vm.rs @@ -5,7 +5,7 @@ use bollard_next::{ container::RemoveContainerOptions, }; -use nanocl_error::http::{HttpError, HttpResult}; +use nanocl_error::http::HttpResult; use nanocl_stubs::{ system::NativeEventAction, @@ -41,30 +41,6 @@ pub(crate) async fn inspect_by_key( }) } -/// Delete a VM by his key -pub(crate) async fn delete_by_key( - vm_key: &str, - force: bool, - state: &SystemState, -) -> HttpResult<()> { - let vm = VmDb::transform_read_by_pk(vm_key, &state.pool).await?; - let options = bollard_next::container::RemoveContainerOptions { - force, - ..Default::default() - }; - let container_name = format!("{}.v", vm_key); - utils::process::remove(&container_name, Some(options), state).await?; - VmDb::del_by_pk(vm_key, &state.pool).await?; - SpecDb::del_by_kind_key(vm_key, &state.pool).await?; - utils::vm_image::delete_by_name(&vm.spec.disk.image, &state.pool).await?; - super::event_emitter::emit_normal_native_action( - &vm, - NativeEventAction::Delete, - state, - ); - Ok(()) -} - /// List VMs by namespace pub(crate) async fn list_by_namespace( nsp: &str, @@ -193,47 +169,6 @@ pub(crate) async fn create_instance( Ok(()) } -/// Create a VM from a `VmSpecPartial` in the given namespace -pub(crate) async fn create( - vm: &VmSpecPartial, - namespace: &str, - version: &str, - state: &SystemState, -) -> HttpResult { - let name = &vm.name; - log::debug!( - "Creating VM {name} in namespace {namespace} with version: {version}", - ); - let vm_key = utils::key::gen_key(namespace, name); - let mut vm = vm.clone(); - if VmDb::read_by_pk(&vm_key, &state.pool).await.is_ok() { - return Err(HttpError::conflict(format!( - "VM with name {name} already exists in namespace {namespace}", - ))); - } - let image = VmImageDb::read_by_pk(&vm.disk.image, &state.pool).await?; - if image.kind.as_str() != "Base" { - return Err(HttpError::bad_request(format!("Image {} is not a base image please convert the snapshot into a base image first", &vm.disk.image))); - } - let snapname = format!("{}.{vm_key}", &image.name); - let size = vm.disk.size.unwrap_or(20); - log::debug!("Creating snapshot {snapname} with size {size}"); - let image = - utils::vm_image::create_snap(&snapname, size, &image, state).await?; - log::debug!("Snapshot {snapname} created"); - // Use the snapshot image - vm.disk.image = image.name.clone(); - vm.disk.size = Some(size); - let vm = VmDb::create_from_spec(namespace, &vm, version, &state.pool).await?; - create_instance(&vm, &image, true, state).await?; - super::event_emitter::emit_normal_native_action( - &vm, - NativeEventAction::Create, - state, - ); - Ok(vm) -} - /// Patch a VM specification from a `VmSpecUpdate` in the given namespace. /// This will merge the new specification with the old one. pub(crate) async fn patch( @@ -311,10 +246,6 @@ pub(crate) async fn put( create_instance(&vm, &image, false, state).await?; utils::process::start_by_kind(&ProcessKind::Vm, &vm.spec.vm_key, state) .await?; - super::event_emitter::emit_normal_native_action( - &vm, - NativeEventAction::Patch, - state, - ); + state.emit_normal_native_action(&vm, NativeEventAction::Update); Ok(vm) } diff --git a/bin/ncproxy/src/subsystem/event.rs b/bin/ncproxy/src/subsystem/event.rs index 8b3be20e8..45060ab44 100644 --- a/bin/ncproxy/src/subsystem/event.rs +++ b/bin/ncproxy/src/subsystem/event.rs @@ -94,7 +94,7 @@ async fn on_event(event: &Event, state: &SystemStateRef) -> IoResult<()> { log::trace!("event::on_event: {actor_kind} {action}"); match (actor_kind, action) { (EventActorKind::Cargo, NativeEventAction::Start) - | (EventActorKind::Cargo, NativeEventAction::Patch) => { + | (EventActorKind::Cargo, NativeEventAction::Update) => { let (name, namespace) = get_cargo_attributes(&actor.attributes)?; update_cargo_rule(&name, &namespace, state).await?; let _ = state.event_emitter.emit_reload().await; @@ -108,7 +108,7 @@ async fn on_event(event: &Event, state: &SystemStateRef) -> IoResult<()> { Ok(()) } (EventActorKind::Secret, NativeEventAction::Create) - | (EventActorKind::Secret, NativeEventAction::Patch) => { + | (EventActorKind::Secret, NativeEventAction::Update) => { let resources = utils::resource::list_by_secret( &actor.key.unwrap_or_default(), &state.client, diff --git a/bin/ncvpnkit/src/main.rs b/bin/ncvpnkit/src/main.rs index 64f2e5dc9..f16e696a4 100644 --- a/bin/ncvpnkit/src/main.rs +++ b/bin/ncvpnkit/src/main.rs @@ -114,7 +114,7 @@ async fn on_event( return Ok(()); } match action { - NativeEventAction::Create | NativeEventAction::Patch => { + NativeEventAction::Create | NativeEventAction::Update => { let key = actor.key.unwrap_or_default(); let resource = nanocl_client.inspect_resource(&key).await?; let r_proxy_rule = resource_to_proxy_rule(&resource)?; diff --git a/crates/nanocl_stubs/src/job.rs b/crates/nanocl_stubs/src/job.rs index a273b2bbe..d77fb4c75 100644 --- a/crates/nanocl_stubs/src/job.rs +++ b/crates/nanocl_stubs/src/job.rs @@ -5,6 +5,7 @@ use bollard_next::container::Config; use bollard_next::service::{ContainerWaitExitError, ContainerWaitResponse}; use crate::process::Process; +use crate::system::{EventActorKind, EventActor}; /// Job partial is used to create a new job #[derive(Debug, Default, Clone, PartialEq)] @@ -103,6 +104,20 @@ pub struct Job { pub containers: Vec, } +/// Convert a Job into an EventActor +impl From for EventActor { + fn from(job: Job) -> Self { + Self { + key: Some(job.name.clone()), + kind: EventActorKind::Job, + attributes: Some(serde_json::json!({ + "Name": job.name, + "Metadata": job.metadata, + })), + } + } +} + /// Summary of a job (used in list) #[derive(Debug)] #[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] diff --git a/crates/nanocl_stubs/src/system.rs b/crates/nanocl_stubs/src/system.rs index d636da2b7..aba7fd6c4 100644 --- a/crates/nanocl_stubs/src/system.rs +++ b/crates/nanocl_stubs/src/system.rs @@ -75,7 +75,7 @@ impl std::fmt::Display for EventActorKind { #[cfg_attr(feature = "serde", serde(rename_all = "lowercase"))] pub enum NativeEventAction { Create, - Patch, + Update, Start, Stop, Delete, @@ -89,7 +89,7 @@ impl FromStr for NativeEventAction { fn from_str(s: &str) -> Result { match s { "create" => Ok(NativeEventAction::Create), - "patch" => Ok(NativeEventAction::Patch), + "patch" => Ok(NativeEventAction::Update), "start" => Ok(NativeEventAction::Start), "stop" => Ok(NativeEventAction::Stop), "delete" => Ok(NativeEventAction::Delete), @@ -103,7 +103,7 @@ impl std::fmt::Display for NativeEventAction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { NativeEventAction::Create => write!(f, "create"), - NativeEventAction::Patch => write!(f, "patch"), + NativeEventAction::Update => write!(f, "patch"), NativeEventAction::Start => write!(f, "start"), NativeEventAction::Stop => write!(f, "stop"), NativeEventAction::Delete => write!(f, "delete"), diff --git a/crates/nanocld_client/src/resource.rs b/crates/nanocld_client/src/resource.rs index a367e3fdb..88ad30182 100644 --- a/crates/nanocld_client/src/resource.rs +++ b/crates/nanocld_client/src/resource.rs @@ -84,7 +84,7 @@ impl NanocldClient { Self::res_json(res).await } - /// Patch an existing resource + /// Update the new resource spec and add an history entry /// /// ## Example /// @@ -100,7 +100,7 @@ impl NanocldClient { config: &ResourceUpdate, ) -> HttpClientResult { let res = self - .send_patch( + .send_put( &format!("{}/{key}", Self::RESOURCE_PATH), Some(config), None::,