diff --git a/.vscode/settings.json b/.vscode/settings.json index d6bec0f9c..c98be481c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,6 +12,7 @@ "rust-analyzer.cargo.noDefaultFeatures": true, "rust-analyzer.cargo.features": ["dev", "test"], "cSpell.words": [ + "aarch", "canonicalize", "chrono", "crond", @@ -25,6 +26,8 @@ "Nanocld", "nstore", "ntex", - "statefile" + "schemars", + "statefile", + "utoipa" ] } diff --git a/bin/nanocld/src/cli.rs b/bin/nanocld/src/cli.rs index b801794ef..6ddbe6038 100644 --- a/bin/nanocld/src/cli.rs +++ b/bin/nanocld/src/cli.rs @@ -3,7 +3,7 @@ use clap::Parser; /// Nanocl Daemon - Self Sufficient Orchestrator #[derive(Debug, Clone, Parser)] #[command(name = "Nanocl")] -#[command(author = "nexthat team ")] +#[command(author = "Next Hat team ")] #[command(version)] pub struct Cli { /// Hosts to listen to use tcp:// and unix:// [default: unix:///run/nanocl.sock] diff --git a/bin/nanocld/src/models/raw_emitter.rs b/bin/nanocld/src/models/raw_emitter.rs index 48c6e1bde..315bb6660 100644 --- a/bin/nanocld/src/models/raw_emitter.rs +++ b/bin/nanocld/src/models/raw_emitter.rs @@ -104,11 +104,14 @@ impl RawEventEmitter { } /// Send an event to all clients - pub fn emit(&self, e: &Event) -> IoResult<()> { - let inner = self.inner.lock().map_err(|err| { - IoError::interrupted("RawEmitterMutex", err.to_string().as_str()) - })?; - for client in &inner.clients { + pub async fn emit(&self, e: &Event) -> IoResult<()> { + let inner = Arc::clone(&self.inner); + let clients = web::block(move || { + let clients = inner.lock()?.clients.clone(); + Ok::<_, IoError>(clients) + }) + .await?; + for client in clients { match e.try_to_bytes() { Ok(msg) => { let _ = client.try_send(msg); @@ -122,16 +125,14 @@ impl RawEventEmitter { } /// Subscribe to events - pub fn subscribe(&self) -> IoResult { + pub async fn subscribe(&self) -> IoResult { let (tx, rx) = channel(100); - self - .inner - .lock() - .map_err(|err| { - IoError::interrupted("RawEmitterMutex", err.to_string().as_str()) - })? - .clients - .push(tx); + let inner = Arc::clone(&self.inner); + web::block(move || { + inner.lock()?.clients.push(tx); + Ok::<_, IoError>(()) + }) + .await?; Ok(RawEventClient(rx)) } } diff --git a/bin/nanocld/src/models/system.rs b/bin/nanocld/src/models/system.rs index 2f62f9dae..f12766cb8 100644 --- a/bin/nanocld/src/models/system.rs +++ b/bin/nanocld/src/models/system.rs @@ -34,7 +34,7 @@ impl EventManager { log::trace!("event_manager: dispatch_event {:?}", ev); let self_ptr = self.clone(); rt::spawn(async move { - self_ptr.raw.emit(&ev)?; + self_ptr.raw.emit(&ev).await?; Ok::<(), IoError>(()) }); } @@ -100,8 +100,8 @@ impl SystemState { }); } - pub fn subscribe_raw(&self) -> IoResult { - self.event_manager.raw.subscribe() + pub async fn subscribe_raw(&self) -> IoResult { + self.event_manager.raw.subscribe().await } pub fn emit_normal_native_action( diff --git a/bin/nanocld/src/services/system.rs b/bin/nanocld/src/services/system.rs index f1a749a9f..b769d849f 100644 --- a/bin/nanocld/src/services/system.rs +++ b/bin/nanocld/src/services/system.rs @@ -76,7 +76,7 @@ pub async fn get_info( pub async fn watch_event( state: web::types::State, ) -> HttpResult { - let stream = state.subscribe_raw()?; + let stream = state.subscribe_raw().await?; Ok( web::HttpResponse::Ok() .content_type("text/event-stream") diff --git a/bin/nanocld/src/subsystem/init.rs b/bin/nanocld/src/subsystem/init.rs index 8f2826dab..32a9f3dcd 100644 --- a/bin/nanocld/src/subsystem/init.rs +++ b/bin/nanocld/src/subsystem/init.rs @@ -172,7 +172,7 @@ mod tests { // Test state let state = init(&config).await.unwrap(); let state_ptr = state.clone(); - let mut raw_sub = state.subscribe_raw().unwrap(); + let mut raw_sub = state.subscribe_raw().await.unwrap(); rt::spawn(async move { ntex::time::sleep(std::time::Duration::from_secs(1)).await; let actor = Resource::default();