Skip to content

Commit

Permalink
chore/nanocld: wip event based process
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s committed Jan 11, 2024
1 parent c4ca619 commit f645a7a
Show file tree
Hide file tree
Showing 21 changed files with 358 additions and 112 deletions.
4 changes: 2 additions & 2 deletions bin/nanocl/src/commands/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ async fn exec_state_apply(
let token = format!("cargo/{}", cargo.name);
if let Some(before) = &cargo.init_container {
let image = before.image.clone().unwrap_or_default();

Check warning on line 499 in bin/nanocl/src/commands/state.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `image`

warning: unused variable: `image` --> bin/nanocl/src/commands/state.rs:499:13 | 499 | let image = before.image.clone().unwrap_or_default(); | ^^^^^ help: if this is intentional, prefix it with an underscore: `_image` | = note: `#[warn(unused_variables)]` on by default
pull_image(&image, opts.force_pull, &client).await?;
// pull_image(&image, opts.force_pull, &client).await?;
}
let image = cargo.container.image.clone().unwrap_or_default();

Check warning on line 502 in bin/nanocl/src/commands/state.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `image`

warning: unused variable: `image` --> bin/nanocl/src/commands/state.rs:502:11 | 502 | let image = cargo.container.image.clone().unwrap_or_default(); | ^^^^^ help: if this is intentional, prefix it with an underscore: `_image`
pull_image(&image, opts.force_pull, &client).await?;
// pull_image(&image, opts.force_pull, &client).await?;
let pg = utils::progress::create_progress(&token, &pg_style);
match client.inspect_cargo(&cargo.name, Some(&namespace)).await {
Err(_) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS "object_process_statuses";
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Your SQL goes here
CREATE TABLE IF NOT EXISTS "object_process_statuses" (
"key" VARCHAR NOT NULL PRIMARY KEY,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"wanted" VARCHAR NOT NULL,
"prev_wanted" VARCHAR NOT NULL,
"actual" VARCHAR NOT NULL,
"prev_actual" VARCHAR NOT NULL
);
1 change: 1 addition & 0 deletions bin/nanocld/migrations/2022-06-17-122356_cargos/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ CREATE TABLE IF NOT EXISTS "cargoes" (
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"name" VARCHAR NOT NULL,
"spec_key" UUID NOT NULL REFERENCES specs("key"),
"status_key" VARCHAR NOT NULL REFERENCES object_process_statuses("key"),
"namespace_name" VARCHAR NOT NULL REFERENCES namespaces("name")
);
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE IF EXISTS "metrics" RENAME COLUMN IF EXISTS "expires_at" TO "expire_at";
ALTER TABLE IF EXISTS "metrics" RENAME COLUMN "expires_at" TO "expire_at";
8 changes: 4 additions & 4 deletions bin/nanocld/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ async fn main() -> std::io::Result<()> {
// Parse command line arguments
let args = cli::Cli::parse();
// Build env logger
#[cfg(any(feature = "dev", feature = "test"))]
{
std::env::set_var("LOG_LEVEL", "nanocld=trace");
}
// #[cfg(any(feature = "dev", feature = "test"))]
// {
// std::env::set_var("LOG_LEVEL", "nanocld=trace");
// }
logger::enable_logger("nanocld");
log::info!(
"nanocld_{}_v{}-{}:{}",
Expand Down
2 changes: 2 additions & 0 deletions bin/nanocld/src/models/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct CargoDb {
pub name: String,
/// The spec key reference
pub spec_key: uuid::Uuid,
/// The status key reference
pub status_key: String,
/// The namespace name
pub namespace_name: String,
}
Expand Down
3 changes: 3 additions & 0 deletions bin/nanocld/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub use event::*;
mod raw_emitter;
pub use raw_emitter::*;

mod object_process_status;
pub use object_process_status::*;

pub type Pool = Arc<R2D2Pool<ConnectionManager<PgConnection>>>;
pub type DBConn = PooledConnection<ConnectionManager<PgConnection>>;

Expand Down
89 changes: 89 additions & 0 deletions bin/nanocld/src/models/object_process_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::str::FromStr;

use diesel::prelude::*;

use crate::schema::object_process_statuses;

#[derive(Debug, Clone, Identifiable, Insertable, Queryable)]
#[diesel(primary_key(key))]
#[diesel(table_name = object_process_statuses)]
pub struct ObjPsStatusDb {
pub key: String,
pub created_at: chrono::NaiveDateTime,
pub updated_at: chrono::NaiveDateTime,
pub wanted: String,
pub prev_wanted: String,
pub actual: String,
pub prev_actual: String,
}

#[derive(Clone, Debug, Default, AsChangeset)]
#[diesel(table_name = object_process_statuses)]
pub struct ObjPsStatusUpdate {
pub wanted: Option<String>,
pub prev_wanted: Option<String>,
pub actual: Option<String>,
pub prev_actual: Option<String>,
}

#[derive(Debug, Default)]
pub enum ObjPsStatusKind {
#[default]
Created,
Started,
Running,
Stopped,
Failed,
Unknown,
}

impl FromStr for ObjPsStatusKind {
type Err = std::io::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"created" => Ok(Self::Created),
"started" => Ok(Self::Started),
"running" => Ok(Self::Running),
"stopped" => Ok(Self::Stopped),
"failed" => Ok(Self::Failed),
_ => Ok(Self::Unknown),
}
}
}

impl ToString for ObjPsStatusKind {
fn to_string(&self) -> String {
match self {
Self::Created => "created",
Self::Started => "started",
Self::Running => "running",
Self::Stopped => "stopped",
Self::Failed => "failed",
Self::Unknown => "unknown",
}
.to_owned()
}
}

pub struct ObjPsStatusPartial {
pub key: String,
pub wanted: ObjPsStatusKind,
pub prev_wanted: ObjPsStatusKind,
pub actual: ObjPsStatusKind,
pub prev_actual: ObjPsStatusKind,
}

impl From<ObjPsStatusPartial> for ObjPsStatusDb {
fn from(partial: ObjPsStatusPartial) -> Self {
Self {
key: partial.key,
created_at: chrono::Utc::now().naive_utc(),
updated_at: chrono::Utc::now().naive_utc(),
wanted: partial.wanted.to_string(),
prev_wanted: partial.prev_wanted.to_string(),
actual: partial.actual.to_string(),
prev_actual: partial.prev_actual.to_string(),
}
}
}
86 changes: 45 additions & 41 deletions bin/nanocld/src/objects/cargo.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/// Handle object creation, deletion, update, read and inspect
/// For a cargo object in the database.
/// An object will emit an event when it is created, updated or deleted.
///
use futures_util::{StreamExt, stream::FuturesUnordered};
use bollard_next::{
service::HostConfig,
container::{RemoveContainerOptions, Config},
container::{Config, RemoveContainerOptions},
};

use nanocl_error::http::HttpResult;
use nanocl_error::http::{HttpResult, HttpError};
use nanocl_stubs::{
process::ProcessKind,
cargo::{Cargo, CargoDeleteQuery, CargoInspect},
Expand All @@ -16,7 +20,7 @@ use crate::{
repositories::generic::*,
models::{
CargoDb, SystemState, CargoObjCreateIn, ProcessDb, SpecDb, CargoObjPutIn,
CargoObjPatchIn,
CargoObjPatchIn, ObjPsStatusPartial, ObjPsStatusKind, ObjPsStatusDb,
},
};

Expand All @@ -36,32 +40,35 @@ impl ObjCreate for CargoDb {
obj: &Self::ObjCreateIn,
state: &SystemState,
) -> HttpResult<Self::ObjCreateOut> {
let cargo = CargoDb::create_from_spec(
&obj.namespace,
&obj.spec,
&obj.version,
&state.pool,
)
.await?;
let number = if let Some(mode) = &cargo.spec.replication {
match mode {
ReplicationMode::Static(replication_static) => {
replication_static.number
}
ReplicationMode::Auto => 1,
ReplicationMode::Unique => 1,
ReplicationMode::UniqueByNode => 1,
_ => 1,
}
} else {
1
};
if let Err(err) =
utils::cargo::create_instances(&cargo, number, state).await
{
CargoDb::del_by_pk(&cargo.spec.cargo_key, &state.pool).await?;
return Err(err);
// test if the name of the cargo include a . in the name and throw error if true
if obj.spec.name.contains('.') {
return Err(HttpError::bad_request("Cargo name cannot contain '.'"));
}
let key = utils::key::gen_key(&obj.namespace, &obj.spec.name);
let new_spec =
SpecDb::try_from_cargo_partial(&key, &obj.version, &obj.spec)?;
let spec = SpecDb::create_from(new_spec, &state.pool)
.await?
.try_to_cargo_spec()?;
let status = ObjPsStatusPartial {
key: key.clone(),
wanted: ObjPsStatusKind::Created,
prev_wanted: ObjPsStatusKind::Created,
actual: ObjPsStatusKind::Created,
prev_actual: ObjPsStatusKind::Created,
};
ObjPsStatusDb::create_from(status, &state.pool).await?;
let new_item = CargoDb {
key: key.clone(),
name: obj.spec.name.clone(),
created_at: chrono::Utc::now().naive_utc(),
namespace_name: obj.namespace.clone(),
status_key: key,
spec_key: spec.key,
};
let cargo = CargoDb::create_from(new_item, &state.pool)
.await?
.with_spec(&spec);
Ok(cargo)
}
}
Expand Down Expand Up @@ -96,8 +103,7 @@ impl ObjDelByPk for CargoDb {
.await
.into_iter()
.collect::<HttpResult<Vec<_>>>()?;
CargoDb::del_by_pk(pk, &state.pool).await?;
SpecDb::del_by_kind_key(pk, &state.pool).await?;
CargoDb::clear_by_pk(pk, &state.pool).await?;
Ok(cargo)
}
}
Expand Down Expand Up @@ -180,10 +186,8 @@ impl ObjPatchByPk for CargoDb {
obj: &Self::ObjPatchIn,
state: &SystemState,
) -> HttpResult<Self::ObjPatchOut> {
let payload = &obj.spec;
let version = &obj.version;
let cargo = CargoDb::transform_read_by_pk(pk, &state.pool).await?;
let container = if let Some(container) = payload.container.clone() {
let container = if let Some(container) = obj.spec.container.clone() {
// merge env and ensure no duplicate key
let new_env = container.env.unwrap_or_default();
let mut env_vars: Vec<String> =
Expand Down Expand Up @@ -262,26 +266,26 @@ impl ObjPatchByPk for CargoDb {
let spec = CargoSpecPartial {
name: cargo.spec.name.clone(),
container,
init_container: if payload.init_container.is_some() {
payload.init_container.clone()
init_container: if obj.spec.init_container.is_some() {
obj.spec.init_container.clone()
} else {
cargo.spec.init_container
},
replication: payload.replication.clone(),
secrets: if payload.secrets.is_some() {
payload.secrets.clone()
replication: obj.spec.replication.clone(),
secrets: if obj.spec.secrets.is_some() {
obj.spec.secrets.clone()
} else {
cargo.spec.secrets
},
metadata: if payload.metadata.is_some() {
payload.metadata.clone()
metadata: if obj.spec.metadata.is_some() {
obj.spec.metadata.clone()
} else {
cargo.spec.metadata
},
};
let obj = &CargoObjPutIn {
spec,
version: version.to_owned(),
version: obj.version.to_owned(),
};
CargoDb::fn_put_obj_by_pk(pk, obj, state).await
}
Expand Down
38 changes: 21 additions & 17 deletions bin/nanocld/src/objects/generic/process.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_util::{StreamExt, stream::FuturesUnordered};
use bollard_next::container::{
RemoveContainerOptions, StartContainerOptions, StopContainerOptions, Config,
CreateContainerOptions, InspectContainerOptions,
RemoveContainerOptions, StopContainerOptions, Config, CreateContainerOptions,
InspectContainerOptions,
};
use nanocl_error::{
io::FromIo,
Expand All @@ -15,7 +15,10 @@ use nanocl_stubs::{

use crate::{
repositories::generic::*,
models::{SystemState, ProcessDb, VmDb, CargoDb, JobDb, JobUpdateDb},
models::{
SystemState, ProcessDb, VmDb, CargoDb, JobDb, JobUpdateDb, ObjPsStatusDb,
ObjPsStatusUpdate, ObjPsStatusKind,
},
};

/// Represent a object that is treated as a process
Expand Down Expand Up @@ -111,22 +114,23 @@ pub trait ObjProcess {
kind_pk: &str,
state: &SystemState,
) -> HttpResult<()> {
let processes = ProcessDb::read_by_kind_key(kind_pk, &state.pool).await?;
log::debug!("start_process_by_kind_pk: {kind_pk}");
for process in processes {
let process_state = process.data.state.unwrap_or_default();
if process_state.running.unwrap_or_default() {
return Ok(());
}
state
.docker_api
.start_container(
&process.data.id.unwrap_or_default(),
None::<StartContainerOptions<String>>,
)
.await?;
let current_status =
ObjPsStatusDb::read_by_pk(kind_pk, &state.pool).await?;
if current_status.actual == ObjPsStatusKind::Running.to_string() {
log::debug!("start_process_by_kind_pk: {kind_pk} already running");
return Ok(());
}
Self::_emit(kind_pk, NativeEventAction::Create, state).await?;
let status_update = ObjPsStatusUpdate {
wanted: Some(ObjPsStatusKind::Running.to_string()),
prev_wanted: Some(current_status.wanted),
actual: Some(ObjPsStatusKind::Started.to_string()),
prev_actual: Some(current_status.actual),
};
log::debug!("start_process_by_kind_pk: {kind_pk} update status");
ObjPsStatusDb::update_pk(kind_pk, status_update, &state.pool).await?;
Self::_emit(kind_pk, NativeEventAction::Start, state).await?;
log::debug!("start emitted !");
Ok(())
}

Expand Down
Loading

0 comments on commit f645a7a

Please sign in to comment.