Skip to content

Commit

Permalink
refactor/nanocld: utils to be great again (#806)
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s authored Jan 7, 2024
1 parent 3e92e47 commit c9f9c83
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 115 deletions.
6 changes: 6 additions & 0 deletions bin/nanocld/src/models/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ pub struct EventManager {
pub emitter: SystemEventEmitter,
}

impl Default for EventManager {
fn default() -> Self {
Self::new()
}
}

impl EventManager {
pub fn new() -> Self {
let (sx, rx) = mpsc::unbounded();
Expand Down
58 changes: 55 additions & 3 deletions bin/nanocld/src/repositories/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ use std::sync::Arc;

use diesel::prelude::*;

use nanocl_error::io::{IoError, IoResult};
use futures_util::{stream::FuturesUnordered, StreamExt};
use nanocl_error::{
io::{IoError, IoResult},
http::HttpResult,
};

use nanocl_stubs::{
generic::{GenericFilter, GenericClause},
cargo::Cargo,
cargo::{Cargo, CargoDeleteQuery, CargoInspect},
cargo_spec::{CargoSpecPartial, CargoSpec},
};

use crate::{
gen_multiple, gen_where4string, utils,
models::{Pool, CargoDb, SpecDb, CargoUpdateDb},
objects::generic::*,
models::{
Pool, CargoDb, SpecDb, CargoUpdateDb, SystemState, NamespaceDb, ProcessDb,
},
schema::cargoes,
};

Expand Down Expand Up @@ -173,4 +180,49 @@ impl CargoDb {
.await?;
Ok(count)
}

/// Return detailed information about the cargo for the given key
pub async fn inspect_by_pk(
key: &str,
state: &SystemState,
) -> HttpResult<CargoInspect> {
let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?;
let processes = ProcessDb::read_by_kind_key(key, &state.pool).await?;
let (_, _, _, running_instances) = utils::process::count_status(&processes);
Ok(CargoInspect {
created_at: cargo.created_at,
namespace_name: cargo.namespace_name,
instance_total: processes.len(),
instance_running: running_instances,
spec: cargo.spec,
instances: processes,
})
}

/// This remove all cargo in the given namespace and all their instances (containers)
/// from the system (database and docker).
pub async fn delete_by_namespace(
namespace: &str,
state: &SystemState,
) -> HttpResult<()> {
let namespace = NamespaceDb::read_by_pk(namespace, &state.pool).await?;
let cargoes =
CargoDb::read_by_namespace(&namespace.name, &state.pool).await?;
cargoes
.into_iter()
.map(|cargo| async move {
CargoDb::del_obj_by_pk(
&cargo.spec.cargo_key,
&CargoDeleteQuery::default(),
state,
)
.await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<HttpResult<_>>>()
.await
.into_iter()
.collect::<HttpResult<Vec<_>>>()?;
Ok(())
}
}
57 changes: 54 additions & 3 deletions bin/nanocld/src/repositories/vm.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use diesel::prelude::*;

use nanocl_error::io::{IoError, IoResult};
use nanocl_error::{
io::{IoError, IoResult},
http::HttpResult,
};

use nanocl_stubs::{
generic::{GenericFilter, GenericClause},
vm::Vm,
vm::{Vm, VmInspect, VmSummary},
vm_spec::{VmSpecPartial, VmSpec},
};

use crate::{
gen_multiple, gen_where4string, utils,
schema::vms,
models::{Pool, VmDb, VmUpdateDb, SpecDb},
models::{
Pool, VmDb, VmUpdateDb, SpecDb, SystemState, ProcessDb, NamespaceDb,
},
};

use super::generic::*;
Expand Down Expand Up @@ -141,4 +146,50 @@ impl VmDb {
.r#where("namespace_name", GenericClause::Eq(name.to_owned()));
VmDb::transform_read_by(&filter, pool).await
}

/// Get detailed information about a VM by his key
pub async fn inspect_by_pk(
vm_key: &str,
state: &SystemState,
) -> HttpResult<VmInspect> {
let vm = VmDb::transform_read_by_pk(vm_key, &state.pool).await?;
let processes =
ProcessDb::read_by_kind_key(&vm.spec.vm_key, &state.pool).await?;
let (_, _, _, running_instances) = utils::process::count_status(&processes);
Ok(VmInspect {
created_at: vm.created_at,
namespace_name: vm.namespace_name,
spec: vm.spec,
instance_total: processes.len(),
instance_running: running_instances,
instances: processes,
})
}

/// List VMs by namespace
pub async fn list_by_namespace(
nsp: &str,
pool: &Pool,
) -> HttpResult<Vec<VmSummary>> {
let namespace = NamespaceDb::read_by_pk(nsp, pool).await?;
let vmes = VmDb::read_by_namespace(&namespace.name, pool).await?;
let mut vm_summaries = Vec::new();
for vm in vmes {
let spec = SpecDb::read_by_pk(&vm.spec.key, pool)
.await?
.try_to_vm_spec()?;
let processes =
ProcessDb::read_by_kind_key(&vm.spec.vm_key, pool).await?;
let (_, _, _, running_instances) =
utils::process::count_status(&processes);
vm_summaries.push(VmSummary {
created_at: vm.created_at,
namespace_name: vm.namespace_name,
instance_total: processes.len(),
instance_running: running_instances,
spec: spec.clone(),
});
}
Ok(vm_summaries)
}
}
2 changes: 1 addition & 1 deletion bin/nanocld/src/services/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn inspect_cargo(
) -> HttpResult<web::HttpResponse> {
let namespace = utils::key::resolve_nsp(&qs.namespace);
let key = utils::key::gen_key(&namespace, &path.1);
let cargo = utils::cargo::inspect_by_key(&key, &state).await?;
let cargo = CargoDb::inspect_by_pk(&key, &state).await?;
Ok(web::HttpResponse::Ok().json(&cargo))
}

Expand Down
4 changes: 2 additions & 2 deletions bin/nanocld/src/services/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn list_vm(
qs: web::types::Query<GenericNspQuery>,
) -> HttpResult<web::HttpResponse> {
let namespace = utils::key::resolve_nsp(&qs.namespace);
let vms = utils::vm::list_by_namespace(&namespace, &state.pool).await?;
let vms = VmDb::list_by_namespace(&namespace, &state.pool).await?;
Ok(web::HttpResponse::Ok().json(&vms))
}

Expand All @@ -73,7 +73,7 @@ pub async fn inspect_vm(
let name = path.1.to_owned();
let namespace = utils::key::resolve_nsp(&qs.namespace);
let key = utils::key::gen_key(&namespace, &name);
let vm = utils::vm::inspect_by_key(&key, &state).await?;
let vm = VmDb::inspect_by_pk(&key, &state).await?;
Ok(web::HttpResponse::Ok().json(&vm))
}

Expand Down
57 changes: 4 additions & 53 deletions bin/nanocld/src/utils/cargo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ use bollard_next::{
use nanocl_stubs::{
process::Process,
generic::{GenericListNspQuery, GenericClause, GenericFilter},
cargo::{
Cargo, CargoSummary, CargoInspect, CargoKillOptions, CargoStats,
CargoStatsQuery, CargoDeleteQuery,
},
cargo::{Cargo, CargoSummary, CargoKillOptions, CargoStats, CargoStatsQuery},
};

use crate::{
utils,
objects::generic::*,
repositories::generic::*,
models::{SystemState, CargoDb, ProcessDb, NamespaceDb, SecretDb, SpecDb},
};
Expand Down Expand Up @@ -252,7 +248,7 @@ pub async fn delete_instances(

/// Restart cargo instances (containers) by key
pub async fn restart(key: &str, state: &SystemState) -> HttpResult<()> {
let cargo = utils::cargo::inspect_by_key(key, state).await?;
let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?;
let processes =
ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state.pool).await?;
processes
Expand All @@ -265,10 +261,10 @@ pub async fn restart(key: &str, state: &SystemState) -> HttpResult<()> {
.map_err(HttpError::from)
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<(), HttpError>>>()
.collect::<Vec<HttpResult<()>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
.collect::<HttpResult<Vec<_>>>()?;
Ok(())
}

Expand Down Expand Up @@ -314,51 +310,6 @@ pub async fn list(
Ok(cargo_summaries)
}

/// Return detailed information about the cargo for the given key
pub async fn inspect_by_key(
key: &str,
state: &SystemState,
) -> HttpResult<CargoInspect> {
let cargo = CargoDb::transform_read_by_pk(key, &state.pool).await?;
let processes = ProcessDb::read_by_kind_key(key, &state.pool).await?;
let (_, _, _, running_instances) = utils::process::count_status(&processes);
Ok(CargoInspect {
created_at: cargo.created_at,
namespace_name: cargo.namespace_name,
instance_total: processes.len(),
instance_running: running_instances,
spec: cargo.spec,
instances: processes,
})
}

/// This remove all cargo in the given namespace and all their instances (containers)
/// from the system (database and docker).
pub async fn delete_by_namespace(
namespace: &str,
state: &SystemState,
) -> HttpResult<()> {
let namespace = NamespaceDb::read_by_pk(namespace, &state.pool).await?;
let cargoes =
CargoDb::read_by_namespace(&namespace.name, &state.pool).await?;
cargoes
.into_iter()
.map(|cargo| async move {
CargoDb::del_obj_by_pk(
&cargo.spec.cargo_key,
&CargoDeleteQuery::default(),
state,
)
.await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, HttpError>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, HttpError>>()?;
Ok(())
}

/// Send a signal to a cargo instance the cargo name can be used if the cargo has only one instance
/// The signal is send to one instance only
pub async fn kill_by_key(
Expand Down
6 changes: 2 additions & 4 deletions bin/nanocld/src/utils/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use nanocl_stubs::{
};

use crate::{
utils,
repositories::generic::*,
models::{Pool, SystemState, CargoDb, NamespaceDb},
};
Expand Down Expand Up @@ -56,7 +55,7 @@ pub async fn create(

/// Delete a namespace by name and remove all associated cargo and vm.
pub async fn delete_by_name(name: &str, state: &SystemState) -> HttpResult<()> {
utils::cargo::delete_by_namespace(name, state).await?;
CargoDb::delete_by_namespace(name, state).await?;
NamespaceDb::del_by_pk(name, &state.pool).await?;
if let Err(err) = state.docker_api.remove_network(name).await {
log::error!("Unable to remove network {} got error: {}", name, err);
Expand Down Expand Up @@ -124,8 +123,7 @@ pub async fn inspect_by_name(
let models = CargoDb::read_by_namespace(&namespace.name, &state.pool).await?;
let mut cargoes = Vec::new();
for cargo in models {
let cargo =
utils::cargo::inspect_by_key(&cargo.spec.cargo_key, state).await?;
let cargo = CargoDb::inspect_by_pk(&cargo.spec.cargo_key, state).await?;
cargoes.push(cargo);
}
let network = state
Expand Down
51 changes: 2 additions & 49 deletions bin/nanocld/src/utils/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,13 @@ use bollard_next::service::{HostConfig, DeviceMapping};

use nanocl_error::http::HttpResult;

use nanocl_stubs::vm::{Vm, VmSummary, VmInspect};
use nanocl_stubs::vm::Vm;

use crate::{
utils,
repositories::generic::*,
models::{
Pool, VmImageDb, SystemState, ProcessDb, NamespaceDb, VmDb, SpecDb,
},
models::{VmImageDb, SystemState},
};

/// Get detailed information about a VM by his key
pub async fn inspect_by_key(
vm_key: &str,
state: &SystemState,
) -> HttpResult<VmInspect> {
let vm = VmDb::transform_read_by_pk(vm_key, &state.pool).await?;
let processes =
ProcessDb::read_by_kind_key(&vm.spec.vm_key, &state.pool).await?;
let (_, _, _, running_instances) = utils::process::count_status(&processes);
Ok(VmInspect {
created_at: vm.created_at,
namespace_name: vm.namespace_name,
spec: vm.spec,
instance_total: processes.len(),
instance_running: running_instances,
instances: processes,
})
}

/// List VMs by namespace
pub async fn list_by_namespace(
nsp: &str,
pool: &Pool,
) -> HttpResult<Vec<VmSummary>> {
let namespace = NamespaceDb::read_by_pk(nsp, pool).await?;
let vmes = VmDb::read_by_namespace(&namespace.name, pool).await?;
let mut vm_summaries = Vec::new();
for vm in vmes {
let spec = SpecDb::read_by_pk(&vm.spec.key, pool)
.await?
.try_to_vm_spec()?;
let processes = ProcessDb::read_by_kind_key(&vm.spec.vm_key, pool).await?;
let (_, _, _, running_instances) = utils::process::count_status(&processes);
vm_summaries.push(VmSummary {
created_at: vm.created_at,
namespace_name: vm.namespace_name,
instance_total: processes.len(),
instance_running: running_instances,
spec: spec.clone(),
});
}
Ok(vm_summaries)
}

/// Create a VM instance from a VM image
pub async fn create_instance(
vm: &Vm,
Expand Down

0 comments on commit c9f9c83

Please sign in to comment.