Skip to content

Commit

Permalink
yoloh task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s committed Jan 13, 2024
1 parent 2b3d74a commit fbb6e13
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 74 deletions.
3 changes: 3 additions & 0 deletions bin/nanocld/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub use event::*;
mod raw_emitter;
pub use raw_emitter::*;

mod task_manager;
pub use task_manager::*;

mod object_process_status;
pub use object_process_status::*;

Expand Down
5 changes: 4 additions & 1 deletion bin/nanocld/src/models/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use nanocl_stubs::{

use crate::{vars, utils, repositories::generic::*};

use super::{Pool, EventDb, RawEventEmitter, RawEventClient};
use super::{Pool, EventDb, RawEventEmitter, RawEventClient, TaskManager};

#[derive(Debug)]
pub enum SystemEventKind {
Expand Down Expand Up @@ -144,6 +144,8 @@ pub struct SystemState {
pub config: DaemonConfig,
/// Event manager that run the event loop
pub event_manager: EventManager,
/// Manager of the tasks
pub task_manager: TaskManager,
/// Latest version of the daemon
pub version: String,
}
Expand All @@ -165,6 +167,7 @@ impl SystemState {
docker_api: docker.clone(),
config: conf.to_owned(),
event_manager: EventManager::new(),
task_manager: TaskManager::new(),
version: vars::VERSION.to_owned(),
};
Ok(system_state)
Expand Down
89 changes: 89 additions & 0 deletions bin/nanocld/src/models/task_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{
sync::{Arc, Mutex},
collections::HashMap,
};

use ntex::{rt, web};
use futures_util::Future;

use nanocl_error::io::{IoResult, IoError};

use nanocl_stubs::system::NativeEventAction;

#[derive(Clone)]

Check warning on line 13 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L13

Added line #L13 was not covered by tests
pub struct ObjTask {
pub kind: NativeEventAction,
pub fut: Arc<rt::JoinHandle<IoResult<()>>>,
}

impl ObjTask {
pub fn new<F>(kind: NativeEventAction, task: F) -> Self
where
F: Future<Output = IoResult<()>> + 'static,
{
let fut = Arc::new(rt::spawn(task));
Self { kind, fut }
}

Check warning on line 26 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L20-L26

Added lines #L20 - L26 were not covered by tests
}

#[derive(Clone, Default)]
pub struct TaskManager {
pub tasks: Arc<Mutex<HashMap<String, ObjTask>>>,
}

impl TaskManager {
pub fn new() -> Self {
Self::default()
}

pub async fn add_task(&self, key: &str, task: ObjTask) -> IoResult<()> {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
web::block(move || {
let mut tasks = tasks
.lock()
.map_err(|err| IoError::interupted("Task", err.to_string().as_str()))?;
tasks.insert(key, task);
Ok::<_, IoError>(())
})
.await?;
Ok(())
}

Check warning on line 51 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L39-L51

Added lines #L39 - L51 were not covered by tests

pub async fn remove_task(&self, key: &str) -> IoResult<()> {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
web::block(move || {
let mut tasks = tasks
.lock()
.map_err(|err| IoError::interupted("Task", err.to_string().as_str()))?;
let task = tasks.get(&key);
if let Some(task) = task {
task.fut.abort();
}
tasks.remove(&key);
Ok::<_, IoError>(())
})
.await?;
Ok(())
}

Check warning on line 69 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L53-L69

Added lines #L53 - L69 were not covered by tests

pub async fn get_task(&self, key: &str) -> Option<ObjTask> {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
let res = web::block(move || {
let tasks = tasks
.lock()
.map_err(|err| IoError::interupted("Task", err.to_string().as_str()))?;
Ok::<_, IoError>(tasks.get(&key).cloned())
})
.await;
match res {
Ok(res) => res,
Err(err) => {
log::error!("Failed to get task: {}", err);
None

Check warning on line 85 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L71-L85

Added lines #L71 - L85 were not covered by tests
}
}
}

Check warning on line 88 in bin/nanocld/src/models/task_manager.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/models/task_manager.rs#L88

Added line #L88 was not covered by tests
}
224 changes: 151 additions & 73 deletions bin/nanocld/src/subsystem/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use bollard_next::container::{
StartContainerOptions, RemoveContainerOptions, WaitContainerOptions,
};

use nanocl_error::{io::IoResult, http::HttpError};
use nanocl_error::{
io::{IoResult, IoError},
http::HttpError,
};
use nanocl_stubs::{
system::{Event, EventActorKind, NativeEventAction, ObjPsStatusKind},
process::ProcessKind,
Expand All @@ -18,7 +21,7 @@ use crate::{
repositories::generic::*,
models::{
SystemState, JobDb, ProcessDb, SystemEventReceiver, SystemEventKind,
CargoDb, ObjPsStatusUpdate, ObjPsStatusDb,
CargoDb, ObjPsStatusUpdate, ObjPsStatusDb, ObjTask,
},
};

Expand Down Expand Up @@ -85,63 +88,102 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> {
EventActorKind::Cargo => {
log::debug!("handling start event for cargo {key}");
let cargo = CargoDb::transform_read_by_pk(&key, &state.pool).await?;
let mut processes =
ProcessDb::read_by_kind_key(&key, &state.pool).await?;
if processes.is_empty() {
processes = utils::cargo::create_instances(&cargo, 1, state).await?;
}
for process in processes {
let _ = state
.docker_api
.start_container(&process.key, None::<StartContainerOptions<String>>)
.await;
let state_ptr = state.clone();
let curr_task = state.task_manager.get_task(&key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&key).await?;
}
let cur_status = ObjPsStatusDb::read_by_pk(&key, &state.pool).await?;
let new_status = ObjPsStatusUpdate {
wanted: Some(ObjPsStatusKind::Running.to_string()),
prev_wanted: Some(cur_status.wanted),
actual: Some(ObjPsStatusKind::Running.to_string()),
prev_actual: Some(cur_status.actual),
};
ObjPsStatusDb::update_pk(&key, new_status, &state.pool).await?;
state.emit_normal_native_action(&cargo, NativeEventAction::Start);
let task = ObjTask::new(action, async move {
let mut processes =
ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state_ptr.pool)
.await?;
if processes.is_empty() {
processes =
utils::cargo::create_instances(&cargo, 1, &state_ptr).await?;
}
for process in processes {
let _ = state_ptr
.docker_api
.start_container(
&process.key,
None::<StartContainerOptions<String>>,
)
.await;

Check warning on line 111 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L89-L111

Added lines #L89 - L111 were not covered by tests
}
let cur_status =
ObjPsStatusDb::read_by_pk(&cargo.spec.cargo_key, &state_ptr.pool)
.await?;
let new_status = ObjPsStatusUpdate {
wanted: Some(ObjPsStatusKind::Running.to_string()),
prev_wanted: Some(cur_status.wanted),
actual: Some(ObjPsStatusKind::Running.to_string()),
prev_actual: Some(cur_status.actual),
};
ObjPsStatusDb::update_pk(
&cargo.spec.cargo_key,
new_status,
&state_ptr.pool,
)
.await?;
state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Start);
Ok::<_, IoError>(())
});
state
.task_manager
.add_task(&format!("{}@{key}", &actor.kind), task)
.await?;

Check warning on line 134 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L113-L134

Added lines #L113 - L134 were not covered by tests
}
EventActorKind::Vm => {}

Check warning on line 136 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L136

Added line #L136 was not covered by tests
EventActorKind::Job => {
let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?;
for mut container in job.containers {
let job_name = job.name.clone();
let mut labels = container.labels.clone().unwrap_or_default();
labels.insert("io.nanocl.j".to_owned(), job_name.clone());
container.labels = Some(labels);
let short_id = utils::key::generate_short_id(6);
let name = format!("{job_name}-{short_id}.j");
let process = utils::container::create_process(
&ProcessKind::Job,
&name,
&job_name,
container,
state,
)
.await?;
// When we run a sequencial order we wait for the container to finish to start the next one.
// let mut stream = state.docker_api.wait_container(
// &process.key,
// Some(WaitContainerOptions {
// condition: "not-running",
// }),
// );
let _ = state
.docker_api
.start_container(&process.key, None::<StartContainerOptions<String>>)
.await;
// while let Some(stream) = stream.next().await {
// let result = stream.map_err(HttpError::internal_server_error)?;
// if result.status_code == 0 {
// break;
// }
// }
let state_ptr = state.clone();
let curr_task = state.task_manager.get_task(&key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&key).await?;
}
let task = ObjTask::new(action, async move {
for mut container in job.containers {
let job_name = job.name.clone();
let mut labels = container.labels.clone().unwrap_or_default();
labels.insert("io.nanocl.j".to_owned(), job_name.clone());
container.labels = Some(labels);
let short_id = utils::key::generate_short_id(6);
let name = format!("{job_name}-{short_id}.j");
let process = utils::container::create_process(
&ProcessKind::Job,
&name,
&job_name,
container,
&state_ptr,
)
.await?;

Check warning on line 159 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L138-L159

Added lines #L138 - L159 were not covered by tests
// When we run a sequencial order we wait for the container to finish to start the next one.
let mut stream = state_ptr.docker_api.wait_container(
&process.key,
Some(WaitContainerOptions {
condition: "not-running",
}),
);
let _ = state_ptr
.docker_api
.start_container(
&process.key,
None::<StartContainerOptions<String>>,
)
.await;
while let Some(stream) = stream.next().await {
let result = stream.map_err(HttpError::internal_server_error)?;
if result.status_code == 0 {
break;
}

Check warning on line 178 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L161-L178

Added lines #L161 - L178 were not covered by tests
}
}
Ok::<_, IoError>(())
});
state
.task_manager
.add_task(&format!("{}@{key}", &actor.kind), task)
.await?;

Check warning on line 186 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L181-L186

Added lines #L181 - L186 were not covered by tests
}
_ => {}

Check warning on line 188 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L188

Added line #L188 was not covered by tests
}
Expand All @@ -159,34 +201,70 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> {
return Ok(());

Check warning on line 201 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L200-L201

Added lines #L200 - L201 were not covered by tests
};
let key = actor.key.clone().unwrap_or_default();
let processes = ProcessDb::read_by_kind_key(&key, &state.pool).await?;
for process in processes {
let _ = state
.docker_api
.remove_container(
&process.key,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await;
}
match actor.kind {

Check warning on line 204 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L203-L204

Added lines #L203 - L204 were not covered by tests
EventActorKind::Cargo => {
log::debug!("handling delete event for cargo {key}");
let cargo = CargoDb::transform_read_by_pk(&key, &state.pool).await?;
CargoDb::clear_by_pk(&key, &state.pool).await?;
state.emit_normal_native_action(&cargo, NativeEventAction::Delete);
let task_key = format!("{}@{key}", &actor.kind);
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
let state_ptr = state.clone();
let task = ObjTask::new(action, async move {
let processes =
ProcessDb::read_by_kind_key(&key, &state_ptr.pool).await?;
for process in processes {
let _ = state_ptr
.docker_api
.remove_container(
&process.key,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await;

Check warning on line 226 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L206-L226

Added lines #L206 - L226 were not covered by tests
}
let cargo =
CargoDb::transform_read_by_pk(&key, &state_ptr.pool).await?;
CargoDb::clear_by_pk(&key, &state_ptr.pool).await?;
state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Delete);
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;

Check warning on line 234 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L228-L234

Added lines #L228 - L234 were not covered by tests
}
EventActorKind::Vm => {}

Check warning on line 236 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L236

Added line #L236 was not covered by tests
EventActorKind::Job => {
let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?;
JobDb::clear(&key, &state.pool).await?;
if job.schedule.is_some() {
utils::job::remove_cron_rule(&job, state).await?;
let task_key = format!("{}@{key}", &actor.kind);
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.emit_normal_native_action(&job, NativeEventAction::Delete);
let state_ptr = state.clone();
let task = ObjTask::new(action, async move {
let processes =
ProcessDb::read_by_kind_key(&key, &state_ptr.pool).await?;
for process in processes {
let _ = state_ptr
.docker_api
.remove_container(
&process.key,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await;

Check warning on line 258 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L238-L258

Added lines #L238 - L258 were not covered by tests
}
JobDb::clear(&job.name, &state_ptr.pool).await?;
if job.schedule.is_some() {
utils::job::remove_cron_rule(&job, &state_ptr).await?;
}
state_ptr.emit_normal_native_action(&job, NativeEventAction::Delete);
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;

Check warning on line 267 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L260-L267

Added lines #L260 - L267 were not covered by tests
}
_ => {}

Check warning on line 269 in bin/nanocld/src/subsystem/event.rs

View check run for this annotation

Codecov / codecov/patch

bin/nanocld/src/subsystem/event.rs#L269

Added line #L269 was not covered by tests
};
Expand Down

0 comments on commit fbb6e13

Please sign in to comment.