From 4d29fd53b0329a8252891d267df9834452120c3a Mon Sep 17 00:00:00 2001 From: leon3s Date: Fri, 5 Jan 2024 16:32:50 +0100 Subject: [PATCH] continue mess --- bin/nanocld/specs/swagger.yaml | 49 ----- bin/nanocld/src/models/cargo.rs | 12 +- bin/nanocld/src/models/vm.rs | 12 +- bin/nanocld/src/objects/cargo.rs | 207 +++++++++++++++++- bin/nanocld/src/objects/generic/delete.rs | 6 +- bin/nanocld/src/objects/generic/mod.rs | 2 + bin/nanocld/src/objects/generic/put.rs | 28 +++ bin/nanocld/src/objects/resource.rs | 16 ++ bin/nanocld/src/objects/secret.rs | 10 +- bin/nanocld/src/objects/vm.rs | 113 +++++++++- bin/nanocld/src/services/cargo.rs | 142 ++----------- bin/nanocld/src/services/openapi.rs | 4 +- bin/nanocld/src/services/resource.rs | 7 +- bin/nanocld/src/services/vm.rs | 10 +- bin/nanocld/src/utils/cargo.rs | 247 +--------------------- bin/nanocld/src/utils/resource.rs | 15 +- bin/nanocld/src/utils/vm.rs | 93 +------- crates/nanocl_stubs/src/cargo.rs | 14 -- 18 files changed, 426 insertions(+), 561 deletions(-) create mode 100644 bin/nanocld/src/objects/generic/put.rs diff --git a/bin/nanocld/specs/swagger.yaml b/bin/nanocld/specs/swagger.yaml index f470c23b8..de8ab6861 100644 --- a/bin/nanocld/specs/swagger.yaml +++ b/bin/nanocld/specs/swagger.yaml @@ -499,46 +499,6 @@ paths: description: Cargo restarted '404': description: Cargo does not exist - /cargoes/{name}/scale: - patch: - tags: - - Cargoes - summary: Scale or Downscale number of instances - description: Scale or Downscale number of instances - operationId: scale_cargo - parameters: - - name: name - in: path - description: Name of the cargo - required: true - schema: - type: string - - name: namespace - in: query - description: Namespace where the cargo belongs - required: false - schema: - type: string - nullable: true - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/CargoScale' - required: true - responses: - '200': - description: Cargo scaled - content: - application/json: - schema: - $ref: '#/components/schemas/Cargo' - '404': - description: Cargo does not exist - content: - application/json: - schema: - $ref: '#/components/schemas/ApiError' /cargoes/{name}/stats: get: tags: @@ -2168,15 +2128,6 @@ components: signal: type: string description: 'Signal to send to the container default: SIGKILL' - CargoScale: - type: object - description: Payload for the cargo scale endpoint - required: - - Replicas - properties: - Replicas: - type: integer - description: Number of replicas to scale up or down can be negative value CargoSpec: type: object description: |- diff --git a/bin/nanocld/src/models/cargo.rs b/bin/nanocld/src/models/cargo.rs index 893fb35c5..e66e3459f 100644 --- a/bin/nanocld/src/models/cargo.rs +++ b/bin/nanocld/src/models/cargo.rs @@ -1,5 +1,5 @@ use diesel::prelude::*; -use nanocl_stubs::cargo_spec::CargoSpecPartial; +use nanocl_stubs::cargo_spec::{CargoSpecPartial, CargoSpecUpdate}; use crate::schema::cargoes; @@ -45,3 +45,13 @@ pub struct CargoObjCreateIn { pub spec: CargoSpecPartial, pub version: String, } + +pub struct CargoObjPutIn { + pub spec: CargoSpecPartial, + pub version: String, +} + +pub struct CargoObjPatchIn { + pub spec: CargoSpecUpdate, + pub version: String, +} diff --git a/bin/nanocld/src/models/vm.rs b/bin/nanocld/src/models/vm.rs index 99750ffd1..71c123df9 100644 --- a/bin/nanocld/src/models/vm.rs +++ b/bin/nanocld/src/models/vm.rs @@ -1,5 +1,5 @@ use diesel::prelude::*; -use nanocl_stubs::vm_spec::VmSpecPartial; +use nanocl_stubs::vm_spec::{VmSpecPartial, VmSpecUpdate}; use crate::schema::vms; @@ -46,3 +46,13 @@ pub struct VmObjCreateIn { pub spec: VmSpecPartial, pub version: String, } + +pub struct VmObjPutIn { + pub spec: VmSpecPartial, + pub version: String, +} + +pub struct VmObjPatchIn { + pub spec: VmSpecUpdate, + pub version: String, +} diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index 3317ca276..077899438 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -1,16 +1,23 @@ use futures_util::{stream::FuturesUnordered, StreamExt}; -use bollard_next::container::RemoveContainerOptions; +use bollard_next::{ + container::{RemoveContainerOptions, Config}, + service::HostConfig, +}; use nanocl_error::http::{HttpResult, HttpError}; use nanocl_stubs::{ cargo::{Cargo, CargoDeleteQuery}, - cargo_spec::ReplicationMode, + cargo_spec::{ReplicationMode, CargoSpecPartial}, + process::ProcessKind, }; use crate::{ utils, repositories::generic::*, - models::{CargoDb, SystemState, CargoObjCreateIn, ProcessDb, SpecDb}, + models::{ + CargoDb, SystemState, CargoObjCreateIn, ProcessDb, SpecDb, CargoObjPutIn, + CargoObjPatchIn, + }, }; use super::generic::*; @@ -58,11 +65,11 @@ impl ObjDelByPk for CargoDb { type ObjDelOpts = CargoDeleteQuery; async fn fn_del_obj_by_pk( - key: &str, + pk: &str, opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult { - let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; + let cargo = CargoDb::transform_read_by_pk(pk, &state.pool).await?; let processes = ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?; processes @@ -83,8 +90,194 @@ impl ObjDelByPk for CargoDb { .await .into_iter() .collect::, _>>()?; - CargoDb::del_by_pk(key, &state.pool).await?; - SpecDb::del_by_kind_key(key, &state.pool).await?; + CargoDb::del_by_pk(pk, &state.pool).await?; + SpecDb::del_by_kind_key(pk, &state.pool).await?; Ok(cargo) } } + +impl ObjPutByPk for CargoDb { + type ObjPutIn = CargoObjPutIn; + type ObjPutOut = Cargo; + + async fn fn_put_obj_by_pk( + pk: &str, + obj: &Self::ObjPutIn, + state: &SystemState, + ) -> HttpResult { + let cargo = + CargoDb::update_from_spec(pk, &obj.spec, &obj.version, &state.pool) + .await?; + // Get the number of instance to create + let number = if let Some(mode) = &cargo.spec.replication { + match mode { + ReplicationMode::Static(replication_static) => { + replication_static.number + } + ReplicationMode::Auto => 1, + ReplicationMode::Unique => 1, + ReplicationMode::UniqueByNode => 1, + _ => 1, + } + } else { + 1 + }; + let processes = ProcessDb::read_by_kind_key(pk, &state.pool).await?; + utils::cargo::restore_instances_backup(&processes, state).await?; + // Create instance with the new spec + let new_instances = + match utils::cargo::create_instances(&cargo, number, state).await { + // If the creation of the new instance failed, we rename the old containers + Err(err) => { + log::warn!("Unable to create cargo instance: {}", err); + log::warn!("Rollback to previous instance"); + utils::cargo::rename_instances_original(&processes, state).await?; + Vec::default() + } + Ok(instances) => instances, + }; + // start created containers + match utils::process::start_by_kind(&ProcessKind::Cargo, pk, state).await { + Err(err) => { + log::error!( + "Unable to start cargo instance {} : {err}", + cargo.spec.cargo_key + ); + utils::cargo::delete_instances( + &new_instances + .iter() + .map(|i| i.key.clone()) + .collect::>(), + state, + ) + .await?; + utils::cargo::rename_instances_original(&processes, state).await?; + } + Ok(_) => { + // Delete old containers + utils::cargo::delete_instances( + &processes.iter().map(|c| c.key.clone()).collect::>(), + state, + ) + .await?; + } + } + Ok(cargo) + } +} + +impl ObjPatchByPk for CargoDb { + type ObjPatchIn = CargoObjPatchIn; + type ObjPatchOut = Cargo; + + async fn fn_patch_obj_by_pk( + pk: &str, + obj: &Self::ObjPatchIn, + state: &SystemState, + ) -> HttpResult { + let payload = &obj.spec; + let version = &obj.version; + let cargo = CargoDb::transform_read_by_pk(pk, &state.pool).await?; + let container = if let Some(container) = payload.container.clone() { + // merge env and ensure no duplicate key + let new_env = container.env.unwrap_or_default(); + let mut env_vars: Vec = + cargo.spec.container.env.unwrap_or_default(); + // Merge environment variables from new_env into the merged array + for env_var in new_env { + let parts: Vec<&str> = env_var.split('=').collect(); + if parts.len() < 2 { + continue; + } + let name = parts[0].to_owned(); + let value = parts[1..].join("="); + if let Some(pos) = env_vars + .iter() + .position(|x| x.starts_with(&format!("{name}="))) + { + let old_value = env_vars[pos].to_owned(); + log::trace!( + "env var: {name} old_value: {old_value} new_value: {value}" + ); + if old_value != value && !value.is_empty() { + // Update the value if it has changed + env_vars[pos] = format!("{}={}", name, value); + } else if value.is_empty() { + // Remove the variable if the value is empty + env_vars.remove(pos); + } + } else { + // Add new environment variables + env_vars.push(env_var); + } + } + // merge volumes and ensure no duplication + let new_volumes = container + .host_config + .clone() + .unwrap_or_default() + .binds + .unwrap_or_default(); + let mut volumes: Vec = cargo + .spec + .container + .host_config + .clone() + .unwrap_or_default() + .binds + .unwrap_or_default(); + for volume in new_volumes { + if !volumes.contains(&volume) { + volumes.push(volume); + } + } + let image = if let Some(image) = container.image.clone() { + Some(image) + } else { + cargo.spec.container.image + }; + let cmd = if let Some(cmd) = container.cmd { + Some(cmd) + } else { + cargo.spec.container.cmd + }; + Config { + cmd, + image, + env: Some(env_vars), + host_config: Some(HostConfig { + binds: Some(volumes), + ..cargo.spec.container.host_config.unwrap_or_default() + }), + ..cargo.spec.container + } + } else { + cargo.spec.container + }; + let spec = CargoSpecPartial { + name: cargo.spec.name.clone(), + container, + init_container: if payload.init_container.is_some() { + payload.init_container.clone() + } else { + cargo.spec.init_container + }, + replication: payload.replication.clone(), + secrets: if payload.secrets.is_some() { + payload.secrets.clone() + } else { + cargo.spec.secrets + }, + metadata: if payload.metadata.is_some() { + payload.metadata.clone() + } else { + cargo.spec.metadata + }, + }; + let obj = &CargoObjPutIn { + spec, + version: version.to_owned(), + }; + CargoDb::fn_put_obj_by_pk(pk, obj, state).await + } +} diff --git a/bin/nanocld/src/objects/generic/delete.rs b/bin/nanocld/src/objects/generic/delete.rs index 5c4ad9672..890d1ce59 100644 --- a/bin/nanocld/src/objects/generic/delete.rs +++ b/bin/nanocld/src/objects/generic/delete.rs @@ -8,20 +8,20 @@ pub trait ObjDelByPk { type ObjDelOpts; async fn fn_del_obj_by_pk( - key: &str, + pk: &str, opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult; async fn del_obj_by_pk( - key: &str, + pk: &str, opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult where Self::ObjDelOut: Into + Clone, { - let obj = Self::fn_del_obj_by_pk(key, opts, state).await?; + let obj = Self::fn_del_obj_by_pk(pk, opts, state).await?; state.emit_normal_native_action(&obj, NativeEventAction::Delete); Ok(obj) } diff --git a/bin/nanocld/src/objects/generic/mod.rs b/bin/nanocld/src/objects/generic/mod.rs index 6af577bf6..3c2ef7780 100644 --- a/bin/nanocld/src/objects/generic/mod.rs +++ b/bin/nanocld/src/objects/generic/mod.rs @@ -1,7 +1,9 @@ mod create; mod delete; mod patch; +mod put; pub use create::*; pub use delete::*; pub use patch::*; +pub use put::*; diff --git a/bin/nanocld/src/objects/generic/put.rs b/bin/nanocld/src/objects/generic/put.rs new file mode 100644 index 000000000..f3b497051 --- /dev/null +++ b/bin/nanocld/src/objects/generic/put.rs @@ -0,0 +1,28 @@ +use nanocl_error::http::HttpResult; +use nanocl_stubs::system::{EventActor, NativeEventAction}; + +use crate::models::SystemState; + +pub trait ObjPutByPk { + type ObjPutIn; + type ObjPutOut; + + async fn fn_put_obj_by_pk( + pk: &str, + obj: &Self::ObjPutIn, + state: &SystemState, + ) -> HttpResult; + + async fn put_obj_by_pk( + pk: &str, + obj: &Self::ObjPutIn, + state: &SystemState, + ) -> HttpResult + where + Self::ObjPutOut: Into + Clone, + { + let obj = Self::fn_put_obj_by_pk(pk, obj, state).await?; + state.emit_normal_native_action(&obj, NativeEventAction::Update); + Ok(obj) + } +} diff --git a/bin/nanocld/src/objects/resource.rs b/bin/nanocld/src/objects/resource.rs index e5531afb5..2d473acb3 100644 --- a/bin/nanocld/src/objects/resource.rs +++ b/bin/nanocld/src/objects/resource.rs @@ -51,3 +51,19 @@ impl ObjDelByPk for ResourceDb { Ok(resource) } } + +impl ObjPutByPk for ResourceDb { + type ObjPutIn = ResourcePartial; + type ObjPutOut = Resource; + + async fn fn_put_obj_by_pk( + pk: &str, + obj: &Self::ObjPutIn, + state: &SystemState, + ) -> HttpResult { + ResourceDb::read_by_pk(pk, &state.pool).await?; + let resource = utils::resource::hook_create(obj, &state.pool).await?; + let resource = ResourceDb::update_from_spec(&resource, &state.pool).await?; + Ok(resource) + } +} diff --git a/bin/nanocld/src/objects/secret.rs b/bin/nanocld/src/objects/secret.rs index 7d8bdab92..1cb8ff0ea 100644 --- a/bin/nanocld/src/objects/secret.rs +++ b/bin/nanocld/src/objects/secret.rs @@ -27,12 +27,12 @@ impl ObjDelByPk for SecretDb { type ObjDelOpts = (); async fn fn_del_obj_by_pk( - key: &str, + pk: &str, _opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult { - let secret = SecretDb::transform_read_by_pk(key, &state.pool).await?; - SecretDb::del_by_pk(key, &state.pool).await?; + let secret = SecretDb::transform_read_by_pk(pk, &state.pool).await?; + SecretDb::del_by_pk(pk, &state.pool).await?; Ok(secret) } } @@ -42,11 +42,11 @@ impl ObjPatchByPk for SecretDb { type ObjPatchOut = Secret; async fn fn_patch_obj_by_pk( - key: &str, + pk: &str, obj: &Self::ObjPatchIn, state: &SystemState, ) -> HttpResult { - let secret = SecretDb::update_pk(key, obj, &state.pool) + let secret = SecretDb::update_pk(pk, obj, &state.pool) .await? .try_into()?; Ok(secret) diff --git a/bin/nanocld/src/objects/vm.rs b/bin/nanocld/src/objects/vm.rs index ced670658..086ed1642 100644 --- a/bin/nanocld/src/objects/vm.rs +++ b/bin/nanocld/src/objects/vm.rs @@ -1,10 +1,15 @@ +use bollard_next::container::RemoveContainerOptions; + use nanocl_error::http::{HttpResult, HttpError}; -use nanocl_stubs::vm::Vm; +use nanocl_stubs::{vm::Vm, process::ProcessKind, vm_spec::VmSpecPartial}; use crate::{ utils, repositories::generic::*, - models::{VmDb, SystemState, VmObjCreateIn, VmImageDb, SpecDb}, + models::{ + VmDb, SystemState, VmObjCreateIn, VmImageDb, SpecDb, VmObjPutIn, + VmObjPatchIn, + }, }; use super::generic::*; @@ -54,20 +59,114 @@ impl ObjDelByPk for VmDb { type ObjDelOut = Vm; async fn fn_del_obj_by_pk( - key: &str, + pk: &str, _opts: &Self::ObjDelOpts, state: &SystemState, ) -> HttpResult { - let vm = VmDb::transform_read_by_pk(key, &state.pool).await?; + let vm = VmDb::transform_read_by_pk(pk, &state.pool).await?; let options = bollard_next::container::RemoveContainerOptions { force: true, ..Default::default() }; - let container_name = format!("{}.v", key); + let container_name = format!("{}.v", pk); utils::process::remove(&container_name, Some(options), state).await?; - VmDb::del_by_pk(key, &state.pool).await?; - SpecDb::del_by_kind_key(key, &state.pool).await?; + VmDb::del_by_pk(pk, &state.pool).await?; + SpecDb::del_by_kind_key(pk, &state.pool).await?; utils::vm_image::delete_by_name(&vm.spec.disk.image, &state.pool).await?; Ok(vm) } } + +impl ObjPutByPk for VmDb { + type ObjPutIn = VmObjPutIn; + type ObjPutOut = Vm; + + async fn fn_put_obj_by_pk( + pk: &str, + obj: &Self::ObjPutIn, + state: &SystemState, + ) -> HttpResult { + let vm = VmDb::transform_read_by_pk(pk, &state.pool).await?; + let container_name = format!("{}.v", &vm.spec.vm_key); + utils::process::stop_by_kind(&ProcessKind::Vm, pk, state).await?; + utils::process::remove( + &container_name, + None::, + state, + ) + .await?; + let vm = VmDb::update_from_spec( + &vm.spec.vm_key, + &obj.spec, + &obj.version, + &state.pool, + ) + .await?; + let image = VmImageDb::read_by_pk(&vm.spec.disk.image, &state.pool).await?; + utils::vm::create_instance(&vm, &image, false, state).await?; + utils::process::start_by_kind(&ProcessKind::Vm, &vm.spec.vm_key, state) + .await?; + Ok(vm) + } +} + +impl ObjPatchByPk for VmDb { + type ObjPatchIn = VmObjPatchIn; + type ObjPatchOut = Vm; + + async fn fn_patch_obj_by_pk( + pk: &str, + obj: &Self::ObjPatchIn, + state: &SystemState, + ) -> HttpResult { + let spec = &obj.spec; + let version = &obj.version; + let vm = VmDb::transform_read_by_pk(pk, &state.pool).await?; + let old_spec = SpecDb::read_by_pk(&vm.spec.key, &state.pool) + .await? + .try_to_vm_spec()?; + let vm_partial = VmSpecPartial { + name: spec.name.to_owned().unwrap_or(vm.spec.name.clone()), + disk: old_spec.disk, + host_config: Some( + spec.host_config.to_owned().unwrap_or(old_spec.host_config), + ), + hostname: if spec.hostname.is_some() { + spec.hostname.clone() + } else { + old_spec.hostname + }, + user: if spec.user.is_some() { + spec.user.clone() + } else { + old_spec.user + }, + password: if spec.password.is_some() { + spec.password.clone() + } else { + old_spec.password + }, + ssh_key: if spec.ssh_key.is_some() { + spec.ssh_key.clone() + } else { + old_spec.ssh_key + }, + mac_address: old_spec.mac_address, + labels: if spec.labels.is_some() { + spec.labels.clone() + } else { + old_spec.labels + }, + metadata: if spec.metadata.is_some() { + spec.metadata.clone() + } else { + old_spec.metadata + }, + }; + let obj = &VmObjPutIn { + spec: vm_partial, + version: version.to_owned(), + }; + VmDb::fn_put_obj_by_pk(pk, obj, state).await + } +} diff --git a/bin/nanocld/src/services/cargo.rs b/bin/nanocld/src/services/cargo.rs index fb091f012..c299db0a3 100644 --- a/bin/nanocld/src/services/cargo.rs +++ b/bin/nanocld/src/services/cargo.rs @@ -4,7 +4,7 @@ use nanocl_error::{http::HttpResult, io::IoResult}; use nanocl_stubs::{ generic::{GenericNspQuery, GenericListNspQuery}, - cargo::{CargoDeleteQuery, CargoKillOptions, CargoStatsQuery, CargoScale}, + cargo::{CargoDeleteQuery, CargoKillOptions, CargoStatsQuery}, cargo_spec::{CargoSpecPartial, CargoSpecUpdate}, }; @@ -12,7 +12,10 @@ use crate::{ utils, objects::generic::*, repositories::generic::*, - models::{SystemState, SpecDb, CargoObjCreateIn, CargoDb}, + models::{ + SystemState, SpecDb, CargoObjCreateIn, CargoDb, CargoObjPutIn, + CargoObjPatchIn, + }, }; /// List cargoes @@ -170,7 +173,11 @@ pub(crate) async fn put_cargo( ) -> HttpResult { let namespace = utils::key::resolve_nsp(&qs.namespace); let key = utils::key::gen_key(&namespace, &path.1); - let cargo = utils::cargo::put(&key, &payload, &path.0, &state).await?; + let obj = &CargoObjPutIn { + spec: payload.into_inner(), + version: path.0.clone(), + }; + let cargo = CargoDb::put_obj_by_pk(&key, obj, &state).await?; Ok(web::HttpResponse::Ok().json(&cargo)) } @@ -198,7 +205,11 @@ pub(crate) async fn patch_cargo( ) -> HttpResult { let namespace = utils::key::resolve_nsp(&qs.namespace); let key = utils::key::gen_key(&namespace, &path.1); - let cargo = utils::cargo::patch(&key, &payload, &path.0, &state).await?; + let obj = &CargoObjPatchIn { + spec: payload.into_inner(), + version: path.0.clone(), + }; + let cargo = CargoDb::patch_obj_by_pk(&key, obj, &state).await?; Ok(web::HttpResponse::Ok().json(&cargo)) } @@ -286,9 +297,11 @@ pub(crate) async fn revert_cargo( let spec = SpecDb::read_by_pk(&path.2, &state.pool) .await? .try_to_cargo_spec()?; - let cargo = - utils::cargo::put(&cargo_key, &spec.clone().into(), &path.0, &state) - .await?; + let obj = &CargoObjPutIn { + spec: spec.into(), + version: path.0.clone(), + }; + let cargo = CargoDb::put_obj_by_pk(&cargo_key, obj, &state).await?; Ok(web::HttpResponse::Ok().json(&cargo)) } @@ -324,34 +337,6 @@ pub(crate) async fn stats_cargo( ) } -/// Scale or Downscale number of instances -#[cfg_attr(feature = "dev", utoipa::path( - patch, - tag = "Cargoes", - request_body = CargoScale, - path = "/cargoes/{name}/scale", - params( - ("name" = String, Path, description = "Name of the cargo"), - ("namespace" = Option, Query, description = "Namespace where the cargo belongs"), - ), - responses( - (status = 200, description = "Cargo scaled", body = Cargo), - (status = 404, description = "Cargo does not exist", body = ApiError), - ), -))] -#[web::patch("/cargoes/{name}/scale")] -pub(crate) async fn scale_cargo( - state: web::types::State, - path: web::types::Path<(String, String)>, - qs: web::types::Query, - payload: web::types::Json, -) -> HttpResult { - let namespace = utils::key::resolve_nsp(&qs.namespace); - let key = utils::key::gen_key(&namespace, &path.1); - utils::cargo::scale(&key, &payload, &state).await?; - Ok(web::HttpResponse::Ok().into()) -} - pub(crate) fn ntex_config(config: &mut web::ServiceConfig) { config.service(create_cargo); config.service(delete_cargo); @@ -363,7 +348,6 @@ pub(crate) fn ntex_config(config: &mut web::ServiceConfig) { config.service(inspect_cargo); config.service(list_cargo_history); config.service(revert_cargo); - config.service(scale_cargo); config.service(stats_cargo); } @@ -373,8 +357,7 @@ mod tests { use nanocl_stubs::cargo_spec::{CargoSpec, CargoSpecPartial}; use nanocl_stubs::cargo::{ - Cargo, CargoSummary, CargoInspect, CargoDeleteQuery, CargoScale, - CargoKillOptions, + Cargo, CargoSummary, CargoInspect, CargoDeleteQuery, CargoKillOptions, }; use crate::utils::tests::*; @@ -566,87 +549,4 @@ mod tests { ); } } - - #[ntex::test] - async fn scale() { - const CARGO_NAME: &str = "api-test-scale"; - let client = gen_default_test_client().await; - let res = client - .send_post( - ENDPOINT, - Some(&CargoSpecPartial { - name: CARGO_NAME.to_owned(), - container: bollard_next::container::Config { - image: Some( - "ghcr.io/next-hat/nanocl-get-started:latest".to_owned(), - ), - ..Default::default() - }, - ..Default::default() - }), - None::, - ) - .await; - test_status_code!( - res.status(), - http::StatusCode::CREATED, - "scale cargo create" - ); - let res = client - .send_post( - &format!("/processes/cargo/{CARGO_NAME}/start"), - None::, - None::, - ) - .await; - test_status_code!( - res.status(), - http::StatusCode::ACCEPTED, - "scale cargo start" - ); - let res = client - .send_patch( - &format!("{ENDPOINT}/{CARGO_NAME}/scale"), - Some(&CargoScale { replicas: 2 }), - None::, - ) - .await; - test_status_code!( - res.status(), - http::StatusCode::OK, - "scale cargo scale up" - ); - let res = client - .send_patch( - &format!("{ENDPOINT}/{CARGO_NAME}/scale"), - Some(&CargoScale { replicas: -1 }), - None::, - ) - .await; - test_status_code!( - res.status(), - http::StatusCode::OK, - "scale cargo scale down" - ); - let res = client - .send_post( - &format!("/processes/cargo/{CARGO_NAME}/stop"), - None::, - None::, - ) - .await; - test_status_code!( - res.status(), - http::StatusCode::ACCEPTED, - "scale cargo stop" - ); - let res = client - .send_delete(&format!("{ENDPOINT}/{CARGO_NAME}"), None::) - .await; - test_status_code!( - res.status(), - http::StatusCode::ACCEPTED, - "scale cargo delete" - ); - } } diff --git a/bin/nanocld/src/services/openapi.rs b/bin/nanocld/src/services/openapi.rs index 13ec01be2..ce3a25096 100644 --- a/bin/nanocld/src/services/openapi.rs +++ b/bin/nanocld/src/services/openapi.rs @@ -50,7 +50,7 @@ use nanocl_stubs::namespace::{ use nanocl_stubs::job::{Job, JobPartial, JobInspect, JobSummary}; use nanocl_stubs::cargo::{ Cargo, CargoInspect, CargoSummary, CargoKillOptions, CreateExecOptions, - CargoScale, CargoStats, + CargoStats, }; use nanocl_stubs::cargo_spec::{ CargoSpec, CargoSpecPartial, CargoSpecUpdate, ReplicationMode, @@ -256,7 +256,6 @@ impl Modify for VersionModifier { cargo::kill_cargo, cargo::list_cargo_history, cargo::revert_cargo, - cargo::scale_cargo, cargo::stats_cargo, // Exec exec::create_exec_command, @@ -371,7 +370,6 @@ impl Modify for VersionModifier { CargoSpecPartial, CargoSpecUpdate, ReplicationStatic, - CargoScale, CargoStats, PidsStats, NetworkStats, diff --git a/bin/nanocld/src/services/resource.rs b/bin/nanocld/src/services/resource.rs index cc03dc84e..18c5dfdbd 100644 --- a/bin/nanocld/src/services/resource.rs +++ b/bin/nanocld/src/services/resource.rs @@ -11,7 +11,6 @@ use nanocl_stubs::{ }; use crate::{ - utils, objects::generic::*, repositories::generic::*, models::{SystemState, SpecDb, ResourceDb}, @@ -130,7 +129,8 @@ pub(crate) async fn put_resource( data: payload.data.clone(), metadata: payload.metadata.clone(), }; - let resource = utils::resource::put(&new_resource, &state).await?; + let resource = + ResourceDb::put_obj_by_pk(&path.1, &new_resource, &state).await?; Ok(web::HttpResponse::Ok().json(&resource)) } @@ -189,7 +189,8 @@ pub(crate) async fn revert_resource( data: history.data, metadata: history.metadata, }; - let resource = utils::resource::put(&new_resource, &state).await?; + let resource = + ResourceDb::put_obj_by_pk(&path.1, &new_resource, &state).await?; Ok(web::HttpResponse::Ok().json(&resource)) } diff --git a/bin/nanocld/src/services/vm.rs b/bin/nanocld/src/services/vm.rs index 9d6199d97..98592cb02 100644 --- a/bin/nanocld/src/services/vm.rs +++ b/bin/nanocld/src/services/vm.rs @@ -24,7 +24,9 @@ use nanocl_stubs::{ use crate::{ utils, objects::generic::*, - models::{SystemState, WsConState, SpecDb, VmDb, VmObjCreateIn}, + models::{ + SystemState, WsConState, SpecDb, VmDb, VmObjCreateIn, VmObjPatchIn, + }, }; /// List virtual machines @@ -185,7 +187,11 @@ pub(crate) async fn patch_vm( let namespace = utils::key::resolve_nsp(&qs.namespace); let key = utils::key::gen_key(&namespace, &path.1); let version = path.0.clone(); - let vm = utils::vm::patch(&key, &payload, &version, &state).await?; + let obj = &VmObjPatchIn { + spec: payload.into_inner(), + version: version.clone(), + }; + let vm = VmDb::patch_obj_by_pk(&key, obj, &state).await?; Ok(web::HttpResponse::Ok().json(&vm)) } diff --git a/bin/nanocld/src/utils/cargo.rs b/bin/nanocld/src/utils/cargo.rs index 4bde66a3d..2053d841f 100644 --- a/bin/nanocld/src/utils/cargo.rs +++ b/bin/nanocld/src/utils/cargo.rs @@ -12,20 +12,19 @@ use bollard_next::{ }, }; use nanocl_stubs::{ + process::Process, generic::{GenericListNspQuery, GenericClause, GenericFilter}, - process::{Process, ProcessKind}, cargo::{ - Cargo, CargoSummary, CargoInspect, CargoKillOptions, CargoScale, - CargoStats, CargoStatsQuery, CargoDeleteQuery, + Cargo, CargoSummary, CargoInspect, CargoKillOptions, CargoStats, + CargoStatsQuery, CargoDeleteQuery, }, - cargo_spec::{CargoSpecPartial, CargoSpecUpdate, ReplicationMode, Config}, }; use crate::{ utils, + objects::generic::*, repositories::generic::*, models::{SystemState, CargoDb, ProcessDb, NamespaceDb, SecretDb, SpecDb}, - objects::generic::ObjDelByPk, }; use super::stream::transform_stream; @@ -212,7 +211,7 @@ pub(crate) async fn create_instances( /// Restore the instances backup. The instances are restored in parallel. /// It's happenning if when a cargo fail to updates. -async fn restore_instances_backup( +pub(crate) async fn restore_instances_backup( instances: &[Process], state: &SystemState, ) -> HttpResult<()> { @@ -243,7 +242,7 @@ async fn restore_instances_backup( /// Rename the containers of the given cargo by adding `-backup` to the name /// of the container to mark them as backup. /// In case of failure, the backup containers are restored. -async fn rename_instances_original( +pub(crate) async fn rename_instances_original( instances: &[Process], state: &SystemState, ) -> HttpResult<()> { @@ -273,7 +272,7 @@ async fn rename_instances_original( /// The instances (containers) are deleted but the cargo is not. /// The cargo is not deleted because it can be used to restore the containers. -async fn delete_instances( +pub(crate) async fn delete_instances( instances: &[String], state: &SystemState, ) -> HttpResult<()> { @@ -319,73 +318,6 @@ pub(crate) async fn restart(key: &str, state: &SystemState) -> HttpResult<()> { Ok(()) } -/// A new history entry is added and the containers are updated -/// with the new cargo specification -pub(crate) async fn put( - cargo_key: &str, - cargo_partial: &CargoSpecPartial, - version: &str, - state: &SystemState, -) -> HttpResult { - let cargo = - CargoDb::update_from_spec(cargo_key, cargo_partial, version, &state.pool) - .await?; - // Get the number of instance to create - let number = if let Some(mode) = &cargo.spec.replication { - match mode { - ReplicationMode::Static(replication_static) => replication_static.number, - ReplicationMode::Auto => 1, - ReplicationMode::Unique => 1, - ReplicationMode::UniqueByNode => 1, - _ => 1, - } - } else { - 1 - }; - let processes = ProcessDb::read_by_kind_key(cargo_key, &state.pool).await?; - restore_instances_backup(&processes, state).await?; - // Create instance with the new spec - let new_instances = match create_instances(&cargo, number, state).await { - // If the creation of the new instance failed, we rename the old containers - Err(err) => { - log::warn!("Unable to create cargo instance: {}", err); - log::warn!("Rollback to previous instance"); - rename_instances_original(&processes, state).await?; - Vec::default() - } - Ok(instances) => instances, - }; - // start created containers - match utils::process::start_by_kind(&ProcessKind::Cargo, cargo_key, state) - .await - { - Err(err) => { - log::error!( - "Unable to start cargo instance {} : {err}", - cargo.spec.cargo_key - ); - delete_instances( - &new_instances - .iter() - .map(|i| i.key.clone()) - .collect::>(), - state, - ) - .await?; - rename_instances_original(&processes, state).await?; - } - Ok(_) => { - // Delete old containers - delete_instances( - &processes.iter().map(|c| c.key.clone()).collect::>(), - state, - ) - .await?; - } - } - Ok(cargo) -} - /// List the cargoes for the given query pub(crate) async fn list( query: &GenericListNspQuery, @@ -492,113 +424,6 @@ pub(crate) async fn kill_by_key( Ok(()) } -/// Merge the given cargo spec with the existing one -pub async fn patch( - key: &str, - payload: &CargoSpecUpdate, - version: &str, - state: &SystemState, -) -> HttpResult { - let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; - let container = if let Some(container) = payload.container.clone() { - // merge env and ensure no duplicate key - let new_env = container.env.unwrap_or_default(); - let mut env_vars: Vec = - cargo.spec.container.env.unwrap_or_default(); - // Merge environment variables from new_env into the merged array - for env_var in new_env { - let parts: Vec<&str> = env_var.split('=').collect(); - if parts.len() < 2 { - continue; - } - let name = parts[0].to_owned(); - let value = parts[1..].join("="); - if let Some(pos) = env_vars - .iter() - .position(|x| x.starts_with(&format!("{name}="))) - { - let old_value = env_vars[pos].to_owned(); - log::trace!( - "env var: {name} old_value: {old_value} new_value: {value}" - ); - if old_value != value && !value.is_empty() { - // Update the value if it has changed - env_vars[pos] = format!("{}={}", name, value); - } else if value.is_empty() { - // Remove the variable if the value is empty - env_vars.remove(pos); - } - } else { - // Add new environment variables - env_vars.push(env_var); - } - } - // merge volumes and ensure no duplication - let new_volumes = container - .host_config - .clone() - .unwrap_or_default() - .binds - .unwrap_or_default(); - let mut volumes: Vec = cargo - .spec - .container - .host_config - .clone() - .unwrap_or_default() - .binds - .unwrap_or_default(); - for volume in new_volumes { - if !volumes.contains(&volume) { - volumes.push(volume); - } - } - let image = if let Some(image) = container.image.clone() { - Some(image) - } else { - cargo.spec.container.image - }; - let cmd = if let Some(cmd) = container.cmd { - Some(cmd) - } else { - cargo.spec.container.cmd - }; - Config { - cmd, - image, - env: Some(env_vars), - host_config: Some(HostConfig { - binds: Some(volumes), - ..cargo.spec.container.host_config.unwrap_or_default() - }), - ..cargo.spec.container - } - } else { - cargo.spec.container - }; - let spec = CargoSpecPartial { - name: cargo.spec.name.clone(), - container, - init_container: if payload.init_container.is_some() { - payload.init_container.clone() - } else { - cargo.spec.init_container - }, - replication: payload.replication.clone(), - secrets: if payload.secrets.is_some() { - payload.secrets.clone() - } else { - cargo.spec.secrets - }, - metadata: if payload.metadata.is_some() { - payload.metadata.clone() - } else { - cargo.spec.metadata - }, - }; - utils::cargo::put(key, &spec, version, state).await -} - /// Get the stats of a cargo instance /// The cargo name can be used if the cargo has only one instance pub(crate) fn get_stats( @@ -611,61 +436,3 @@ pub(crate) fn get_stats( let stream = transform_stream::(stream); Ok(stream) } - -/// Scale a cargo instance up or down to the given number of instances (containers, replicas) -pub async fn scale( - key: &str, - options: &CargoScale, - state: &SystemState, -) -> HttpResult<()> { - let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?; - let instances = ProcessDb::read_by_kind_key(key, &state.pool).await?; - let is_equal = usize::try_from(options.replicas) - .map(|replica| instances.len() == replica) - .unwrap_or(false); - if is_equal { - return Ok(()); - } - if options.replicas.is_negative() { - let to_remove = options.replicas.unsigned_abs(); - instances - .iter() - .take(to_remove) - .map(|instance| { - utils::process::remove( - &instance.key, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - state, - ) - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::, HttpError>>()?; - } else { - let to_add = options.replicas.unsigned_abs(); - let created_instances = create_instances(&cargo, to_add, state).await?; - created_instances - .iter() - .map(|instance| async { - state - .docker_api - .start_container::(&instance.key, None) - .await?; - Ok::<_, HttpError>(()) - }) - .collect::>() - .collect::>>() - .await - .into_iter() - .collect::, HttpError>>()?; - } - // state - // .event_emitter - // .spawn_emit_to_event(&cargo, NativeEventAction::Patched); - Ok(()) -} diff --git a/bin/nanocld/src/utils/resource.rs b/bin/nanocld/src/utils/resource.rs index ddb687463..82d3cfc61 100644 --- a/bin/nanocld/src/utils/resource.rs +++ b/bin/nanocld/src/utils/resource.rs @@ -3,12 +3,11 @@ use jsonschema::{Draft, JSONSchema}; use nanocl_error::http::{HttpError, HttpResult}; use nanocl_stubs::{ - system::NativeEventAction, resource_kind::ResourceKind, resource::{Resource, ResourcePartial}, }; -use crate::models::{Pool, SystemState, SpecDb, ResourceDb}; +use crate::models::{Pool, SpecDb, ResourceDb}; use super::ctrl_client::CtrlClient; @@ -72,15 +71,3 @@ pub(crate) async fn hook_delete( } Ok(()) } - -/// This function patch a resource. -/// It will call the hook_create_resource function to hook the resource. -pub(crate) async fn put( - resource: &ResourcePartial, - state: &SystemState, -) -> HttpResult { - let resource = hook_create(resource, &state.pool).await?; - let res = ResourceDb::update_from_spec(&resource, &state.pool).await?; - state.emit_normal_native_action(&res, NativeEventAction::Update); - Ok(res) -} diff --git a/bin/nanocld/src/utils/vm.rs b/bin/nanocld/src/utils/vm.rs index 7df9a3153..6d1c313ec 100644 --- a/bin/nanocld/src/utils/vm.rs +++ b/bin/nanocld/src/utils/vm.rs @@ -1,18 +1,10 @@ use std::collections::HashMap; -use bollard_next::{ - service::{HostConfig, DeviceMapping}, - container::RemoveContainerOptions, -}; +use bollard_next::service::{HostConfig, DeviceMapping}; use nanocl_error::http::HttpResult; -use nanocl_stubs::{ - system::NativeEventAction, - process::ProcessKind, - vm_spec::{VmSpecPartial, VmSpecUpdate}, - vm::{Vm, VmSummary, VmInspect}, -}; +use nanocl_stubs::vm::{Vm, VmSummary, VmInspect}; use crate::{ utils, @@ -168,84 +160,3 @@ pub(crate) async fn create_instance( utils::process::create(&name, "vm", &vm.spec.vm_key, spec, state).await?; Ok(()) } - -/// Patch a VM specification from a `VmSpecUpdate` in the given namespace. -/// This will merge the new specification with the old one. -pub(crate) async fn patch( - vm_key: &str, - spec: &VmSpecUpdate, - version: &str, - state: &SystemState, -) -> HttpResult { - let vm = VmDb::transform_read_by_pk(vm_key, &state.pool).await?; - let old_spec = SpecDb::read_by_pk(&vm.spec.key, &state.pool) - .await? - .try_to_vm_spec()?; - let vm_partial = VmSpecPartial { - name: spec.name.to_owned().unwrap_or(vm.spec.name.clone()), - disk: old_spec.disk, - host_config: Some( - spec.host_config.to_owned().unwrap_or(old_spec.host_config), - ), - hostname: if spec.hostname.is_some() { - spec.hostname.clone() - } else { - old_spec.hostname - }, - user: if spec.user.is_some() { - spec.user.clone() - } else { - old_spec.user - }, - password: if spec.password.is_some() { - spec.password.clone() - } else { - old_spec.password - }, - ssh_key: if spec.ssh_key.is_some() { - spec.ssh_key.clone() - } else { - old_spec.ssh_key - }, - mac_address: old_spec.mac_address, - labels: if spec.labels.is_some() { - spec.labels.clone() - } else { - old_spec.labels - }, - metadata: if spec.metadata.is_some() { - spec.metadata.clone() - } else { - old_spec.metadata - }, - }; - put(vm_key, &vm_partial, version, state).await -} - -/// Put a VM specification from a `VmSpecPartial` in the given namespace. -/// This will replace the old specification with the new one. -pub(crate) async fn put( - vm_key: &str, - vm_partial: &VmSpecPartial, - version: &str, - state: &SystemState, -) -> HttpResult { - let vm = VmDb::transform_read_by_pk(vm_key, &state.pool).await?; - let container_name = format!("{}.v", &vm.spec.vm_key); - utils::process::stop_by_kind(&ProcessKind::Vm, vm_key, state).await?; - utils::process::remove( - &container_name, - None::, - state, - ) - .await?; - let vm = - VmDb::update_from_spec(&vm.spec.vm_key, vm_partial, version, &state.pool) - .await?; - let image = VmImageDb::read_by_pk(&vm.spec.disk.image, &state.pool).await?; - create_instance(&vm, &image, false, state).await?; - utils::process::start_by_kind(&ProcessKind::Vm, &vm.spec.vm_key, state) - .await?; - state.emit_normal_native_action(&vm, NativeEventAction::Update); - Ok(vm) -} diff --git a/crates/nanocl_stubs/src/cargo.rs b/crates/nanocl_stubs/src/cargo.rs index d2f09d548..247f59cc3 100644 --- a/crates/nanocl_stubs/src/cargo.rs +++ b/crates/nanocl_stubs/src/cargo.rs @@ -159,17 +159,3 @@ impl From for StatsOptions { } } } - -/// Payload for the cargo scale endpoint -#[derive(Debug, Clone, Default)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr( - feature = "serde", - serde(deny_unknown_fields, rename_all = "PascalCase") -)] -#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -pub struct CargoScale { - /// Number of replicas to scale up or down can be negative value - pub replicas: isize, -}