Skip to content

Commit

Permalink
feature/nanocld: task manager wait_task (#829)
Browse files Browse the repository at this point in the history
* feature/nanocld: task manager wait_task
  • Loading branch information
leon3s authored Feb 1, 2024
1 parent a152df0 commit 40c1634
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 72 deletions.
82 changes: 35 additions & 47 deletions bin/nanocld/src/models/task_manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::{
sync::{Arc, Mutex},
collections::HashMap,
};
use std::{sync::Arc, collections::HashMap, time::Duration};

use ntex::{rt, web};
use futures_util::Future;
use ntex::{rt, time};
use futures_util::{Future, lock::Mutex};

use nanocl_error::io::{IoResult, IoError};
use nanocl_error::io::IoResult;

use nanocl_stubs::system::NativeEventAction;

Expand All @@ -24,6 +21,17 @@ impl ObjTask {
let fut = Arc::new(Mutex::new(rt::spawn(task)));
Self { kind, fut }
}

pub async fn wait(&self) {
loop {
let fut = self.fut.lock().await;
if fut.is_finished() {
break;
}
drop(fut);
time::sleep(Duration::from_secs(1)).await;
}
}
}

#[derive(Clone, Default)]
Expand All @@ -36,54 +44,34 @@ impl TaskManager {
Self::default()
}

pub async fn add_task(&self, key: &str, task: ObjTask) -> IoResult<()> {
pub async fn add_task(&self, key: &str, task: ObjTask) {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
web::block(move || {
let mut tasks = tasks.lock()?;
log::debug!("Adding task: {key} {}", task.kind);
tasks.insert(key.clone(), task.clone());
Ok::<_, IoError>(())
})
.await?;
Ok(())
let mut tasks = self.tasks.lock().await;
log::debug!("Adding task: {key} {}", task.kind);
tasks.insert(key.clone(), task.clone());
}

pub async fn remove_task(&self, key: &str) -> IoResult<()> {
pub async fn remove_task(&self, key: &str) {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
web::block(move || {
let mut tasks = tasks.lock().map_err(|err| {
IoError::interrupted("Task", err.to_string().as_str())
})?;
let task = tasks.get(&key);
if let Some(task) = task {
log::debug!("Removing task: {key} {}", task.kind);
task.fut.lock()?.abort();
}
tasks.remove(&key);
Ok::<_, IoError>(())
})
.await?;
Ok(())
let mut tasks = self.tasks.lock().await;
let task = tasks.get(&key);
if let Some(task) = task {
log::debug!("Removing task: {key} {}", task.kind);
task.fut.lock().await.abort();
}
tasks.remove(&key);
}

pub async fn get_task(&self, key: &str) -> Option<ObjTask> {
let key = key.to_owned();
let tasks = Arc::clone(&self.tasks);
let res = web::block(move || {
let tasks = tasks.lock().map_err(|err| {
IoError::interrupted("Task", err.to_string().as_str())
})?;
Ok::<_, IoError>(tasks.get(&key).cloned())
})
.await;
match res {
Ok(res) => res,
Err(err) => {
log::error!("Failed to get task: {}", err);
None
}
let tasks = self.tasks.lock().await;
tasks.get(&key).cloned()
}

pub async fn wait_task(&self, key: &str) {
if let Some(task) = self.get_task(key).await {
task.wait().await;
}
self.remove_task(key).await;
}
}
35 changes: 10 additions & 25 deletions bin/nanocld/src/subsystem/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,7 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> {
let task_key = format!("{}@{key}", actor.kind);
let cargo = CargoDb::transform_read_by_pk(&key, &state.pool).await?;
let state_ptr = state.clone();
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.task_manager.wait_task(&task_key).await;
let task = ObjTask::new(action, async move {
let mut processes =
ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state_ptr.pool)
Expand Down Expand Up @@ -128,17 +125,14 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> {
state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Start);
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;
state.task_manager.add_task(&task_key, task).await;
}
EventActorKind::Vm => {}
EventActorKind::Job => {
let task_key = format!("{}@{key}", actor.kind);
let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?;
let state_ptr = state.clone();
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.task_manager.wait_task(&task_key).await;
let task = ObjTask::new(action, async move {
for mut container in job.containers {
let job_name = job.name.clone();
Expand Down Expand Up @@ -178,7 +172,7 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> {
}
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;
state.task_manager.add_task(&task_key, task).await;
}
_ => {}
}
Expand All @@ -200,10 +194,7 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> {
EventActorKind::Cargo => {
log::debug!("handling delete event for cargo {key}");
let task_key = format!("{}@{key}", &actor.kind);
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.task_manager.wait_task(&task_key).await;
let state_ptr = state.clone();
let task = ObjTask::new(action, async move {
let processes =
Expand All @@ -226,16 +217,13 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> {
state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Delete);
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;
state.task_manager.add_task(&task_key, task).await;
}
EventActorKind::Vm => {}
EventActorKind::Job => {
let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?;
let task_key = format!("{}@{key}", &actor.kind);
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.task_manager.wait_task(&task_key).await;
let state_ptr = state.clone();
let task = ObjTask::new(action, async move {
let processes =
Expand All @@ -259,7 +247,7 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> {
state_ptr.emit_normal_native_action(&job, NativeEventAction::Delete);
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;
state.task_manager.add_task(&task_key, task).await;
}
_ => {}
};
Expand All @@ -280,10 +268,7 @@ async fn update(e: &Event, state: &SystemState) -> IoResult<()> {
match actor.kind {
EventActorKind::Cargo => {
let task_key = format!("{}@{key}", &actor.kind);
let curr_task = state.task_manager.get_task(&task_key).await;
if curr_task.is_some() {
state.task_manager.remove_task(&task_key).await?;
}
state.task_manager.wait_task(&task_key).await;
let state_ptr = state.clone();
let task = ObjTask::new(action, async move {
let cargo =
Expand Down Expand Up @@ -333,7 +318,7 @@ async fn update(e: &Event, state: &SystemState) -> IoResult<()> {
}
Ok::<_, IoError>(())
});
state.task_manager.add_task(&task_key, task).await?;
state.task_manager.add_task(&task_key, task).await;
}
EventActorKind::Vm => {}
EventActorKind::Job => {}
Expand Down

0 comments on commit 40c1634

Please sign in to comment.