diff --git a/bin/nanocl/src/commands/state.rs b/bin/nanocl/src/commands/state.rs index ec59d4567..3d5c98ef5 100644 --- a/bin/nanocl/src/commands/state.rs +++ b/bin/nanocl/src/commands/state.rs @@ -497,10 +497,10 @@ async fn exec_state_apply( let token = format!("cargo/{}", cargo.name); if let Some(before) = &cargo.init_container { let image = before.image.clone().unwrap_or_default(); - pull_image(&image, opts.force_pull, &client).await?; + // pull_image(&image, opts.force_pull, &client).await?; } let image = cargo.container.image.clone().unwrap_or_default(); - pull_image(&image, opts.force_pull, &client).await?; + // pull_image(&image, opts.force_pull, &client).await?; let pg = utils::progress::create_progress(&token, &pg_style); match client.inspect_cargo(&cargo.name, Some(&namespace)).await { Err(_) => { diff --git a/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/down.sql b/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/down.sql new file mode 100644 index 000000000..47ca1872a --- /dev/null +++ b/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS "object_process_statuses"; diff --git a/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/up.sql b/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/up.sql new file mode 100644 index 000000000..9c99329f0 --- /dev/null +++ b/bin/nanocld/migrations/2022-01-10-150631_object_process_statuses/up.sql @@ -0,0 +1,10 @@ +-- Your SQL goes here +CREATE TABLE IF NOT EXISTS "object_process_statuses" ( + "key" VARCHAR NOT NULL PRIMARY KEY, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + "wanted" VARCHAR NOT NULL, + "prev_wanted" VARCHAR NOT NULL, + "actual" VARCHAR NOT NULL, + "prev_actual" VARCHAR NOT NULL +); diff --git a/bin/nanocld/migrations/2022-06-17-122356_cargos/up.sql b/bin/nanocld/migrations/2022-06-17-122356_cargos/up.sql index 6108393bb..3ece57281 100644 --- a/bin/nanocld/migrations/2022-06-17-122356_cargos/up.sql +++ b/bin/nanocld/migrations/2022-06-17-122356_cargos/up.sql @@ -3,5 +3,6 @@ CREATE TABLE IF NOT EXISTS "cargoes" ( "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), "name" VARCHAR NOT NULL, "spec_key" UUID NOT NULL REFERENCES specs("key"), + "status_key" VARCHAR NOT NULL REFERENCES object_process_statuses("key"), "namespace_name" VARCHAR NOT NULL REFERENCES namespaces("name") ); diff --git a/bin/nanocld/migrations/2024-01-02-135246_metrics_rename_expire/down.sql b/bin/nanocld/migrations/2024-01-02-135246_metrics_rename_expire/down.sql index b25f84de1..7357b0954 100644 --- a/bin/nanocld/migrations/2024-01-02-135246_metrics_rename_expire/down.sql +++ b/bin/nanocld/migrations/2024-01-02-135246_metrics_rename_expire/down.sql @@ -1,2 +1,2 @@ -- This file should undo anything in `up.sql` -ALTER TABLE IF EXISTS "metrics" RENAME COLUMN IF EXISTS "expires_at" TO "expire_at"; +ALTER TABLE IF EXISTS "metrics" RENAME COLUMN "expires_at" TO "expire_at"; diff --git a/bin/nanocld/src/main.rs b/bin/nanocld/src/main.rs index 0188e9e38..f0c2bdb73 100644 --- a/bin/nanocld/src/main.rs +++ b/bin/nanocld/src/main.rs @@ -24,10 +24,10 @@ async fn main() -> std::io::Result<()> { // Parse command line arguments let args = cli::Cli::parse(); // Build env logger - #[cfg(any(feature = "dev", feature = "test"))] - { - std::env::set_var("LOG_LEVEL", "nanocld=trace"); - } + // #[cfg(any(feature = "dev", feature = "test"))] + // { + // std::env::set_var("LOG_LEVEL", "nanocld=trace"); + // } logger::enable_logger("nanocld"); log::info!( "nanocld_{}_v{}-{}:{}", diff --git a/bin/nanocld/src/models/cargo.rs b/bin/nanocld/src/models/cargo.rs index 75f92c505..29ef0dede 100644 --- a/bin/nanocld/src/models/cargo.rs +++ b/bin/nanocld/src/models/cargo.rs @@ -22,6 +22,8 @@ pub struct CargoDb { pub name: String, /// The spec key reference pub spec_key: uuid::Uuid, + /// The status key reference + pub status_key: String, /// The namespace name pub namespace_name: String, } diff --git a/bin/nanocld/src/models/mod.rs b/bin/nanocld/src/models/mod.rs index 7e2618cda..b1f421bd9 100644 --- a/bin/nanocld/src/models/mod.rs +++ b/bin/nanocld/src/models/mod.rs @@ -51,6 +51,9 @@ pub use event::*; mod raw_emitter; pub use raw_emitter::*; +mod object_process_status; +pub use object_process_status::*; + pub type Pool = Arc>>; pub type DBConn = PooledConnection>; diff --git a/bin/nanocld/src/models/object_process_status.rs b/bin/nanocld/src/models/object_process_status.rs new file mode 100644 index 000000000..1e8a23a70 --- /dev/null +++ b/bin/nanocld/src/models/object_process_status.rs @@ -0,0 +1,89 @@ +use std::str::FromStr; + +use diesel::prelude::*; + +use crate::schema::object_process_statuses; + +#[derive(Debug, Clone, Identifiable, Insertable, Queryable)] +#[diesel(primary_key(key))] +#[diesel(table_name = object_process_statuses)] +pub struct ObjPsStatusDb { + pub key: String, + pub created_at: chrono::NaiveDateTime, + pub updated_at: chrono::NaiveDateTime, + pub wanted: String, + pub prev_wanted: String, + pub actual: String, + pub prev_actual: String, +} + +#[derive(Clone, Debug, Default, AsChangeset)] +#[diesel(table_name = object_process_statuses)] +pub struct ObjPsStatusUpdate { + pub wanted: Option, + pub prev_wanted: Option, + pub actual: Option, + pub prev_actual: Option, +} + +#[derive(Debug, Default)] +pub enum ObjPsStatusKind { + #[default] + Created, + Started, + Running, + Stopped, + Failed, + Unknown, +} + +impl FromStr for ObjPsStatusKind { + type Err = std::io::Error; + + fn from_str(s: &str) -> Result { + match s { + "created" => Ok(Self::Created), + "started" => Ok(Self::Started), + "running" => Ok(Self::Running), + "stopped" => Ok(Self::Stopped), + "failed" => Ok(Self::Failed), + _ => Ok(Self::Unknown), + } + } +} + +impl ToString for ObjPsStatusKind { + fn to_string(&self) -> String { + match self { + Self::Created => "created", + Self::Started => "started", + Self::Running => "running", + Self::Stopped => "stopped", + Self::Failed => "failed", + Self::Unknown => "unknown", + } + .to_owned() + } +} + +pub struct ObjPsStatusPartial { + pub key: String, + pub wanted: ObjPsStatusKind, + pub prev_wanted: ObjPsStatusKind, + pub actual: ObjPsStatusKind, + pub prev_actual: ObjPsStatusKind, +} + +impl From for ObjPsStatusDb { + fn from(partial: ObjPsStatusPartial) -> Self { + Self { + key: partial.key, + created_at: chrono::Utc::now().naive_utc(), + updated_at: chrono::Utc::now().naive_utc(), + wanted: partial.wanted.to_string(), + prev_wanted: partial.prev_wanted.to_string(), + actual: partial.actual.to_string(), + prev_actual: partial.prev_actual.to_string(), + } + } +} diff --git a/bin/nanocld/src/objects/cargo.rs b/bin/nanocld/src/objects/cargo.rs index beefc805d..df8f9e423 100644 --- a/bin/nanocld/src/objects/cargo.rs +++ b/bin/nanocld/src/objects/cargo.rs @@ -1,10 +1,14 @@ +/// Handle object creation, deletion, update, read and inspect +/// For a cargo object in the database. +/// An object will emit an event when it is created, updated or deleted. +/// use futures_util::{StreamExt, stream::FuturesUnordered}; use bollard_next::{ service::HostConfig, - container::{RemoveContainerOptions, Config}, + container::{Config, RemoveContainerOptions}, }; -use nanocl_error::http::HttpResult; +use nanocl_error::http::{HttpResult, HttpError}; use nanocl_stubs::{ process::ProcessKind, cargo::{Cargo, CargoDeleteQuery, CargoInspect}, @@ -16,7 +20,7 @@ use crate::{ repositories::generic::*, models::{ CargoDb, SystemState, CargoObjCreateIn, ProcessDb, SpecDb, CargoObjPutIn, - CargoObjPatchIn, + CargoObjPatchIn, ObjPsStatusPartial, ObjPsStatusKind, ObjPsStatusDb, }, }; @@ -36,32 +40,35 @@ impl ObjCreate for CargoDb { obj: &Self::ObjCreateIn, state: &SystemState, ) -> HttpResult { - let cargo = CargoDb::create_from_spec( - &obj.namespace, - &obj.spec, - &obj.version, - &state.pool, - ) - .await?; - let number = if let Some(mode) = &cargo.spec.replication { - match mode { - ReplicationMode::Static(replication_static) => { - replication_static.number - } - ReplicationMode::Auto => 1, - ReplicationMode::Unique => 1, - ReplicationMode::UniqueByNode => 1, - _ => 1, - } - } else { - 1 - }; - if let Err(err) = - utils::cargo::create_instances(&cargo, number, state).await - { - CargoDb::del_by_pk(&cargo.spec.cargo_key, &state.pool).await?; - return Err(err); + // test if the name of the cargo include a . in the name and throw error if true + if obj.spec.name.contains('.') { + return Err(HttpError::bad_request("Cargo name cannot contain '.'")); } + let key = utils::key::gen_key(&obj.namespace, &obj.spec.name); + let new_spec = + SpecDb::try_from_cargo_partial(&key, &obj.version, &obj.spec)?; + let spec = SpecDb::create_from(new_spec, &state.pool) + .await? + .try_to_cargo_spec()?; + let status = ObjPsStatusPartial { + key: key.clone(), + wanted: ObjPsStatusKind::Created, + prev_wanted: ObjPsStatusKind::Created, + actual: ObjPsStatusKind::Created, + prev_actual: ObjPsStatusKind::Created, + }; + ObjPsStatusDb::create_from(status, &state.pool).await?; + let new_item = CargoDb { + key: key.clone(), + name: obj.spec.name.clone(), + created_at: chrono::Utc::now().naive_utc(), + namespace_name: obj.namespace.clone(), + status_key: key, + spec_key: spec.key, + }; + let cargo = CargoDb::create_from(new_item, &state.pool) + .await? + .with_spec(&spec); Ok(cargo) } } @@ -96,8 +103,7 @@ impl ObjDelByPk for CargoDb { .await .into_iter() .collect::>>()?; - CargoDb::del_by_pk(pk, &state.pool).await?; - SpecDb::del_by_kind_key(pk, &state.pool).await?; + CargoDb::clear_by_pk(pk, &state.pool).await?; Ok(cargo) } } @@ -180,10 +186,8 @@ impl ObjPatchByPk for CargoDb { obj: &Self::ObjPatchIn, state: &SystemState, ) -> HttpResult { - let payload = &obj.spec; - let version = &obj.version; let cargo = CargoDb::transform_read_by_pk(pk, &state.pool).await?; - let container = if let Some(container) = payload.container.clone() { + let container = if let Some(container) = obj.spec.container.clone() { // merge env and ensure no duplicate key let new_env = container.env.unwrap_or_default(); let mut env_vars: Vec = @@ -262,26 +266,26 @@ impl ObjPatchByPk for CargoDb { let spec = CargoSpecPartial { name: cargo.spec.name.clone(), container, - init_container: if payload.init_container.is_some() { - payload.init_container.clone() + init_container: if obj.spec.init_container.is_some() { + obj.spec.init_container.clone() } else { cargo.spec.init_container }, - replication: payload.replication.clone(), - secrets: if payload.secrets.is_some() { - payload.secrets.clone() + replication: obj.spec.replication.clone(), + secrets: if obj.spec.secrets.is_some() { + obj.spec.secrets.clone() } else { cargo.spec.secrets }, - metadata: if payload.metadata.is_some() { - payload.metadata.clone() + metadata: if obj.spec.metadata.is_some() { + obj.spec.metadata.clone() } else { cargo.spec.metadata }, }; let obj = &CargoObjPutIn { spec, - version: version.to_owned(), + version: obj.version.to_owned(), }; CargoDb::fn_put_obj_by_pk(pk, obj, state).await } diff --git a/bin/nanocld/src/objects/generic/process.rs b/bin/nanocld/src/objects/generic/process.rs index ea457443d..d97cbae90 100644 --- a/bin/nanocld/src/objects/generic/process.rs +++ b/bin/nanocld/src/objects/generic/process.rs @@ -1,7 +1,7 @@ use futures_util::{StreamExt, stream::FuturesUnordered}; use bollard_next::container::{ - RemoveContainerOptions, StartContainerOptions, StopContainerOptions, Config, - CreateContainerOptions, InspectContainerOptions, + RemoveContainerOptions, StopContainerOptions, Config, CreateContainerOptions, + InspectContainerOptions, }; use nanocl_error::{ io::FromIo, @@ -15,7 +15,10 @@ use nanocl_stubs::{ use crate::{ repositories::generic::*, - models::{SystemState, ProcessDb, VmDb, CargoDb, JobDb, JobUpdateDb}, + models::{ + SystemState, ProcessDb, VmDb, CargoDb, JobDb, JobUpdateDb, ObjPsStatusDb, + ObjPsStatusUpdate, ObjPsStatusKind, + }, }; /// Represent a object that is treated as a process @@ -111,22 +114,23 @@ pub trait ObjProcess { kind_pk: &str, state: &SystemState, ) -> HttpResult<()> { - let processes = ProcessDb::read_by_kind_key(kind_pk, &state.pool).await?; log::debug!("start_process_by_kind_pk: {kind_pk}"); - for process in processes { - let process_state = process.data.state.unwrap_or_default(); - if process_state.running.unwrap_or_default() { - return Ok(()); - } - state - .docker_api - .start_container( - &process.data.id.unwrap_or_default(), - None::>, - ) - .await?; + let current_status = + ObjPsStatusDb::read_by_pk(kind_pk, &state.pool).await?; + if current_status.actual == ObjPsStatusKind::Running.to_string() { + log::debug!("start_process_by_kind_pk: {kind_pk} already running"); + return Ok(()); } - Self::_emit(kind_pk, NativeEventAction::Create, state).await?; + let status_update = ObjPsStatusUpdate { + wanted: Some(ObjPsStatusKind::Running.to_string()), + prev_wanted: Some(current_status.wanted), + actual: Some(ObjPsStatusKind::Started.to_string()), + prev_actual: Some(current_status.actual), + }; + log::debug!("start_process_by_kind_pk: {kind_pk} update status"); + ObjPsStatusDb::update_pk(kind_pk, status_update, &state.pool).await?; + Self::_emit(kind_pk, NativeEventAction::Start, state).await?; + log::debug!("start emitted !"); Ok(()) } diff --git a/bin/nanocld/src/repositories/cargo.rs b/bin/nanocld/src/repositories/cargo.rs index ac30f8ebb..b31eb89ed 100644 --- a/bin/nanocld/src/repositories/cargo.rs +++ b/bin/nanocld/src/repositories/cargo.rs @@ -19,6 +19,7 @@ use crate::{ objects::generic::*, models::{ Pool, CargoDb, SpecDb, CargoUpdateDb, SystemState, NamespaceDb, ProcessDb, + ObjPsStatusDb, }, schema::cargoes, }; @@ -98,39 +99,6 @@ impl WithSpec for CargoDb { } impl CargoDb { - /// Create a new cargo from its specification. - pub async fn create_from_spec( - nsp: &str, - item: &CargoSpecPartial, - version: &str, - pool: &Pool, - ) -> IoResult { - let nsp = nsp.to_owned(); - let item = item.to_owned(); - let version = version.to_owned(); - // test if the name of the cargo include a . in the name and throw error if true - if item.name.contains('.') { - return Err(IoError::invalid_input( - "CargoSpecPartial", - "Name cannot contain a dot.", - )); - } - let key = utils::key::gen_key(&nsp, &item.name); - let new_spec = SpecDb::try_from_cargo_partial(&key, &version, &item)?; - let spec = SpecDb::create_from(new_spec, pool) - .await? - .try_to_cargo_spec()?; - let new_item = CargoDb { - key, - name: item.name, - created_at: chrono::Utc::now().naive_utc(), - namespace_name: nsp, - spec_key: spec.key, - }; - let item = CargoDb::create_from(new_item, pool).await?.with_spec(&spec); - Ok(item) - } - /// Update a cargo from its specification. pub async fn update_from_spec( key: &str, @@ -237,4 +205,12 @@ impl CargoDb { } Ok(cargo_summaries) } + + /// Delete a cargo and it's relations (Spec, ObjPsStatus). + pub async fn clear_by_pk(pk: &str, pool: &Pool) -> IoResult<()> { + CargoDb::del_by_pk(pk, pool).await?; + SpecDb::del_by_kind_key(pk, pool).await?; + ObjPsStatusDb::del_by_pk(pk, pool).await?; + Ok(()) + } } diff --git a/bin/nanocld/src/repositories/mod.rs b/bin/nanocld/src/repositories/mod.rs index 2d8daab55..a899ce416 100644 --- a/bin/nanocld/src/repositories/mod.rs +++ b/bin/nanocld/src/repositories/mod.rs @@ -11,5 +11,6 @@ mod metric; mod vm; mod vm_image; mod event; +mod object_process_status; pub mod generic; diff --git a/bin/nanocld/src/repositories/object_process_status.rs b/bin/nanocld/src/repositories/object_process_status.rs new file mode 100644 index 000000000..918e4785b --- /dev/null +++ b/bin/nanocld/src/repositories/object_process_status.rs @@ -0,0 +1,60 @@ +use diesel::prelude::*; + +use nanocl_stubs::generic::GenericFilter; + +use crate::{ + models::{ObjPsStatusDb, ObjPsStatusUpdate}, + schema::object_process_statuses, + gen_where4string, gen_multiple, +}; + +use super::generic::*; + +impl RepositoryBase for ObjPsStatusDb {} + +impl RepositoryCreate for ObjPsStatusDb {} + +impl RepositoryDelByPk for ObjPsStatusDb {} + +impl RepositoryUpdate for ObjPsStatusDb { + type UpdateItem = ObjPsStatusUpdate; +} + +impl RepositoryReadBy for ObjPsStatusDb { + type Output = ObjPsStatusDb; + + fn get_pk() -> &'static str { + "key" + } + + fn gen_read_query( + filter: &GenericFilter, + is_multiple: bool, + ) -> impl diesel::query_dsl::methods::LoadQuery< + 'static, + diesel::pg::PgConnection, + Self::Output, + > { + let r#where = filter.r#where.clone().unwrap_or_default(); + let mut query = object_process_statuses::table.into_boxed(); + if let Some(value) = r#where.get("key") { + gen_where4string!(query, object_process_statuses::key, value); + } + if let Some(value) = r#where.get("wanted") { + gen_where4string!(query, object_process_statuses::wanted, value); + } + if let Some(value) = r#where.get("prev_wanted") { + gen_where4string!(query, object_process_statuses::prev_wanted, value); + } + if let Some(value) = r#where.get("actual") { + gen_where4string!(query, object_process_statuses::actual, value); + } + if let Some(value) = r#where.get("prev_actual") { + gen_where4string!(query, object_process_statuses::prev_actual, value); + } + if is_multiple { + gen_multiple!(query, object_process_statuses::created_at, filter); + } + query + } +} diff --git a/bin/nanocld/src/schema.rs b/bin/nanocld/src/schema.rs index ea5410b23..457afd164 100644 --- a/bin/nanocld/src/schema.rs +++ b/bin/nanocld/src/schema.rs @@ -6,6 +6,7 @@ diesel::table! { created_at -> Timestamptz, name -> Varchar, spec_key -> Uuid, + status_key -> Varchar, namespace_name -> Varchar, } } @@ -78,6 +79,18 @@ diesel::table! { } } +diesel::table! { + object_process_statuses (key) { + key -> Varchar, + created_at -> Timestamptz, + updated_at -> Timestamptz, + wanted -> Varchar, + prev_wanted -> Varchar, + actual -> Varchar, + prev_actual -> Varchar, + } +} + diesel::table! { processes (key) { key -> Varchar, @@ -156,6 +169,7 @@ diesel::table! { } diesel::joinable!(cargoes -> namespaces (namespace_name)); +diesel::joinable!(cargoes -> object_process_statuses (status_key)); diesel::joinable!(cargoes -> specs (spec_key)); diesel::joinable!(node_group_links -> node_groups (node_group_name)); diesel::joinable!(node_group_links -> nodes (node_name)); @@ -173,6 +187,7 @@ diesel::allow_tables_to_appear_in_same_query!( node_group_links, node_groups, nodes, + object_process_statuses, processes, resource_kinds, resources, diff --git a/bin/nanocld/src/services/process.rs b/bin/nanocld/src/services/process.rs index 3ef6c09aa..7a706cbe6 100644 --- a/bin/nanocld/src/services/process.rs +++ b/bin/nanocld/src/services/process.rs @@ -4,7 +4,7 @@ use futures_util::{StreamExt, TryStreamExt, stream::select_all}; use nanocl_error::http::{HttpResult, HttpError}; use bollard_next::{ - container::{LogsOptions, WaitContainerOptions}, + container::{LogsOptions, WaitContainerOptions, StartContainerOptions}, service::ContainerWaitExitError, }; use nanocl_stubs::{ @@ -112,6 +112,33 @@ async fn logs_process( ) } +/// Start process by it's pk +/// Internal endpoint used for multi node communication +#[cfg_attr(feature = "dev", utoipa::path( + post, + tag = "Processes", + path = "/processes/{pk}/start", + params( + ("pk" = String, Path, description = "Pk of the process", example = "1234567890"), + ), + responses( + (status = 202, description = "Process instances started"), + ), +))] +#[web::post("/processes/{pk}/start")] +pub async fn start_process_by_pk( + state: web::types::State, + path: web::types::Path<(String, String)>, +) -> HttpResult { + let (_, pk) = path.into_inner(); + let process = ProcessDb::read_by_pk(&pk, &state.pool).await?; + state + .docker_api + .start_container(&process.key, None::>) + .await?; + Ok(web::HttpResponse::Accepted().finish()) +} + /// Start processes of given kind and name #[cfg_attr(feature = "dev", utoipa::path( post, diff --git a/bin/nanocld/src/subsystem/docker_event.rs b/bin/nanocld/src/subsystem/docker_event.rs index 2747087ed..1bec77ab4 100644 --- a/bin/nanocld/src/subsystem/docker_event.rs +++ b/bin/nanocld/src/subsystem/docker_event.rs @@ -34,7 +34,6 @@ async fn exec_docker( } let action = event.action.clone().unwrap_or_default(); let id = actor.id.unwrap_or_default(); - log::debug!("event::exec_docker: {action}"); let action = action.as_str(); let mut event = EventPartial { reporting_controller: vars::CONTROLLER_NAME.to_owned(), diff --git a/bin/nanocld/src/subsystem/event.rs b/bin/nanocld/src/subsystem/event.rs index 6f97c62d7..be79a401c 100644 --- a/bin/nanocld/src/subsystem/event.rs +++ b/bin/nanocld/src/subsystem/event.rs @@ -1,5 +1,6 @@ use std::str::FromStr; +use bollard_next::container::StartContainerOptions; use ntex::rt; use futures_util::StreamExt; @@ -13,18 +14,19 @@ use crate::{ repositories::generic::*, models::{ SystemState, JobDb, ProcessDb, SystemEventReceiver, SystemEventKind, + CargoDb, ObjPsStatusUpdate, ObjPsStatusDb, ObjPsStatusKind, }, }; /// Remove a job after when finished and ttl is set -async fn job_ttl(e: Event, state: &SystemState) -> IoResult<()> { - let Some(actor) = e.actor else { +async fn job_ttl(e: &Event, state: &SystemState) -> IoResult<()> { + let Some(ref actor) = e.actor else { return Ok(()); }; if actor.kind != EventActorKind::Process { return Ok(()); } - let attributes = actor.attributes.unwrap_or_default(); + let attributes = actor.attributes.clone().unwrap_or_default(); let job_id = match attributes.get("io.nanocl.j") { None => return Ok(()), Some(job_id) => job_id.as_str().unwrap_or_default(), @@ -61,9 +63,54 @@ async fn job_ttl(e: Event, state: &SystemState) -> IoResult<()> { Ok(()) } +async fn start(e: &Event, state: &SystemState) -> IoResult<()> { + let action = NativeEventAction::from_str(e.action.as_str())?; + // If it's not a start action, we don't care + if action != NativeEventAction::Start { + return Ok(()); + } + // If there is no actor, we don't care + let Some(ref actor) = e.actor else { + return Ok(()); + }; + let key = actor.key.clone().unwrap_or_default(); + match actor.kind { + EventActorKind::Cargo => { + log::debug!("handling start event for cargo {key}"); + let cargo = CargoDb::transform_read_by_pk(&key, &state.pool).await?; + let mut processes = + ProcessDb::read_by_kind_key(&key, &state.pool).await?; + if processes.is_empty() { + processes = utils::cargo::create_instances(&cargo, 1, state).await?; + } + for process in processes { + let _ = state + .docker_api + .start_container(&process.key, None::>) + .await; + } + let cur_status = ObjPsStatusDb::read_by_pk(&key, &state.pool).await?; + let new_status = ObjPsStatusUpdate { + wanted: Some(ObjPsStatusKind::Running.to_string()), + prev_wanted: Some(cur_status.wanted), + actual: Some(ObjPsStatusKind::Running.to_string()), + prev_actual: Some(cur_status.actual), + }; + ObjPsStatusDb::update_pk(&key, new_status, &state.pool).await?; + state.emit_normal_native_action(&cargo, NativeEventAction::Running); + } + EventActorKind::Vm => {} + EventActorKind::Job => {} + _ => {} + } + Ok(()) +} + /// Take action when event is received async fn exec_event(e: Event, state: &SystemState) -> IoResult<()> { - job_ttl(e, state).await?; + log::debug!("exec_event: {} {}", e.kind, e.action); + job_ttl(&e, state).await?; + start(&e, state).await?; Ok(()) } diff --git a/bin/nanocld/src/utils/system.rs b/bin/nanocld/src/utils/system.rs index db1ba1fb0..5d55dffd9 100644 --- a/bin/nanocld/src/utils/system.rs +++ b/bin/nanocld/src/utils/system.rs @@ -18,6 +18,7 @@ use crate::{ repositories::generic::*, models::{ SystemState, CargoDb, ProcessDb, NamespaceDb, VmImageDb, ProcessUpdateDb, + CargoObjCreateIn, }, objects::generic::ObjCreate, }; @@ -168,13 +169,12 @@ pub async fn sync_processes(state: &SystemState) -> IoResult<()> { log::trace!( "system::sync_processes: create cargo {name} in namespace {namespace}", ); - CargoDb::create_from_spec( - namespace, - &new_cargo, - &format!("v{}", vars::VERSION), - &state.pool, - ) - .await?; + let obj = &CargoObjCreateIn { + namespace: namespace.to_owned(), + spec: new_cargo.clone(), + version: format!("v{}", vars::VERSION), + }; + CargoDb::create_obj(obj, state).await?; } // If the cargo is already in our store and the config is different we update it Ok(cargo) => { diff --git a/bin/ncproxy/src/subsystem/event.rs b/bin/ncproxy/src/subsystem/event.rs index 45060ab44..06c83824d 100644 --- a/bin/ncproxy/src/subsystem/event.rs +++ b/bin/ncproxy/src/subsystem/event.rs @@ -93,7 +93,7 @@ async fn on_event(event: &Event, state: &SystemStateRef) -> IoResult<()> { let actor_kind = &actor.kind; log::trace!("event::on_event: {actor_kind} {action}"); match (actor_kind, action) { - (EventActorKind::Cargo, NativeEventAction::Start) + (EventActorKind::Cargo, NativeEventAction::Running) | (EventActorKind::Cargo, NativeEventAction::Update) => { let (name, namespace) = get_cargo_attributes(&actor.attributes)?; update_cargo_rule(&name, &namespace, state).await?; diff --git a/crates/nanocl_stubs/src/system.rs b/crates/nanocl_stubs/src/system.rs index 56cf3104e..dcde7213f 100644 --- a/crates/nanocl_stubs/src/system.rs +++ b/crates/nanocl_stubs/src/system.rs @@ -82,6 +82,8 @@ pub enum NativeEventAction { Stop, Delete, Restart, + Running, + Finished, Other(String), } @@ -96,6 +98,8 @@ impl FromStr for NativeEventAction { "stop" => Ok(NativeEventAction::Stop), "delete" => Ok(NativeEventAction::Delete), "restart" => Ok(NativeEventAction::Restart), + "running" => Ok(NativeEventAction::Running), + "finished" => Ok(NativeEventAction::Finished), _ => Ok(NativeEventAction::Other(s.to_owned())), } } @@ -110,6 +114,8 @@ impl std::fmt::Display for NativeEventAction { NativeEventAction::Stop => write!(f, "stop"), NativeEventAction::Delete => write!(f, "delete"), NativeEventAction::Restart => write!(f, "restart"), + NativeEventAction::Running => write!(f, "running"), + NativeEventAction::Finished => write!(f, "finished"), NativeEventAction::Other(s) => write!(f, "{}", s), } }