diff --git a/bin/nanocld/src/models/task_manager.rs b/bin/nanocld/src/models/task_manager.rs index 0b458e201..36f4a0aa1 100644 --- a/bin/nanocld/src/models/task_manager.rs +++ b/bin/nanocld/src/models/task_manager.rs @@ -1,12 +1,9 @@ -use std::{ - sync::{Arc, Mutex}, - collections::HashMap, -}; +use std::{sync::Arc, collections::HashMap, time::Duration}; -use ntex::{rt, web}; -use futures_util::Future; +use ntex::{rt, time}; +use futures_util::{Future, lock::Mutex}; -use nanocl_error::io::{IoResult, IoError}; +use nanocl_error::io::IoResult; use nanocl_stubs::system::NativeEventAction; @@ -24,6 +21,17 @@ impl ObjTask { let fut = Arc::new(Mutex::new(rt::spawn(task))); Self { kind, fut } } + + pub async fn wait(&self) { + loop { + let fut = self.fut.lock().await; + if fut.is_finished() { + break; + } + drop(fut); + time::sleep(Duration::from_secs(1)).await; + } + } } #[derive(Clone, Default)] @@ -36,54 +44,34 @@ impl TaskManager { Self::default() } - pub async fn add_task(&self, key: &str, task: ObjTask) -> IoResult<()> { + pub async fn add_task(&self, key: &str, task: ObjTask) { let key = key.to_owned(); - let tasks = Arc::clone(&self.tasks); - web::block(move || { - let mut tasks = tasks.lock()?; - log::debug!("Adding task: {key} {}", task.kind); - tasks.insert(key.clone(), task.clone()); - Ok::<_, IoError>(()) - }) - .await?; - Ok(()) + let mut tasks = self.tasks.lock().await; + log::debug!("Adding task: {key} {}", task.kind); + tasks.insert(key.clone(), task.clone()); } - pub async fn remove_task(&self, key: &str) -> IoResult<()> { + pub async fn remove_task(&self, key: &str) { let key = key.to_owned(); - let tasks = Arc::clone(&self.tasks); - web::block(move || { - let mut tasks = tasks.lock().map_err(|err| { - IoError::interrupted("Task", err.to_string().as_str()) - })?; - let task = tasks.get(&key); - if let Some(task) = task { - log::debug!("Removing task: {key} {}", task.kind); - task.fut.lock()?.abort(); - } - tasks.remove(&key); - Ok::<_, IoError>(()) - }) - .await?; - Ok(()) + let mut tasks = self.tasks.lock().await; + let task = tasks.get(&key); + if let Some(task) = task { + log::debug!("Removing task: {key} {}", task.kind); + task.fut.lock().await.abort(); + } + tasks.remove(&key); } pub async fn get_task(&self, key: &str) -> Option { let key = key.to_owned(); - let tasks = Arc::clone(&self.tasks); - let res = web::block(move || { - let tasks = tasks.lock().map_err(|err| { - IoError::interrupted("Task", err.to_string().as_str()) - })?; - Ok::<_, IoError>(tasks.get(&key).cloned()) - }) - .await; - match res { - Ok(res) => res, - Err(err) => { - log::error!("Failed to get task: {}", err); - None - } + let tasks = self.tasks.lock().await; + tasks.get(&key).cloned() + } + + pub async fn wait_task(&self, key: &str) { + if let Some(task) = self.get_task(key).await { + task.wait().await; } + self.remove_task(key).await; } } diff --git a/bin/nanocld/src/subsystem/event.rs b/bin/nanocld/src/subsystem/event.rs index 9c2ab8ea4..a4547987f 100644 --- a/bin/nanocld/src/subsystem/event.rs +++ b/bin/nanocld/src/subsystem/event.rs @@ -89,10 +89,7 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> { let task_key = format!("{}@{key}", actor.kind); let cargo = CargoDb::transform_read_by_pk(&key, &state.pool).await?; let state_ptr = state.clone(); - let curr_task = state.task_manager.get_task(&task_key).await; - if curr_task.is_some() { - state.task_manager.remove_task(&task_key).await?; - } + state.task_manager.wait_task(&task_key).await; let task = ObjTask::new(action, async move { let mut processes = ProcessDb::read_by_kind_key(&cargo.spec.cargo_key, &state_ptr.pool) @@ -128,17 +125,14 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> { state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Start); Ok::<_, IoError>(()) }); - state.task_manager.add_task(&task_key, task).await?; + state.task_manager.add_task(&task_key, task).await; } EventActorKind::Vm => {} EventActorKind::Job => { let task_key = format!("{}@{key}", actor.kind); let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?; let state_ptr = state.clone(); - let curr_task = state.task_manager.get_task(&task_key).await; - if curr_task.is_some() { - state.task_manager.remove_task(&task_key).await?; - } + state.task_manager.wait_task(&task_key).await; let task = ObjTask::new(action, async move { for mut container in job.containers { let job_name = job.name.clone(); @@ -178,7 +172,7 @@ async fn start(e: &Event, state: &SystemState) -> IoResult<()> { } Ok::<_, IoError>(()) }); - state.task_manager.add_task(&task_key, task).await?; + state.task_manager.add_task(&task_key, task).await; } _ => {} } @@ -200,10 +194,7 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> { EventActorKind::Cargo => { log::debug!("handling delete event for cargo {key}"); let task_key = format!("{}@{key}", &actor.kind); - let curr_task = state.task_manager.get_task(&task_key).await; - if curr_task.is_some() { - state.task_manager.remove_task(&task_key).await?; - } + state.task_manager.wait_task(&task_key).await; let state_ptr = state.clone(); let task = ObjTask::new(action, async move { let processes = @@ -226,16 +217,13 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> { state_ptr.emit_normal_native_action(&cargo, NativeEventAction::Delete); Ok::<_, IoError>(()) }); - state.task_manager.add_task(&task_key, task).await?; + state.task_manager.add_task(&task_key, task).await; } EventActorKind::Vm => {} EventActorKind::Job => { let job = JobDb::read_by_pk(&key, &state.pool).await?.try_to_spec()?; let task_key = format!("{}@{key}", &actor.kind); - let curr_task = state.task_manager.get_task(&task_key).await; - if curr_task.is_some() { - state.task_manager.remove_task(&task_key).await?; - } + state.task_manager.wait_task(&task_key).await; let state_ptr = state.clone(); let task = ObjTask::new(action, async move { let processes = @@ -259,7 +247,7 @@ async fn delete(e: &Event, state: &SystemState) -> IoResult<()> { state_ptr.emit_normal_native_action(&job, NativeEventAction::Delete); Ok::<_, IoError>(()) }); - state.task_manager.add_task(&task_key, task).await?; + state.task_manager.add_task(&task_key, task).await; } _ => {} }; @@ -280,10 +268,7 @@ async fn update(e: &Event, state: &SystemState) -> IoResult<()> { match actor.kind { EventActorKind::Cargo => { let task_key = format!("{}@{key}", &actor.kind); - let curr_task = state.task_manager.get_task(&task_key).await; - if curr_task.is_some() { - state.task_manager.remove_task(&task_key).await?; - } + state.task_manager.wait_task(&task_key).await; let state_ptr = state.clone(); let task = ObjTask::new(action, async move { let cargo = @@ -333,7 +318,7 @@ async fn update(e: &Event, state: &SystemState) -> IoResult<()> { } Ok::<_, IoError>(()) }); - state.task_manager.add_task(&task_key, task).await?; + state.task_manager.add_task(&task_key, task).await; } EventActorKind::Vm => {} EventActorKind::Job => {}