Skip to content

Commit

Permalink
xd
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s committed Jan 14, 2024
1 parent 0141db5 commit 795bb42
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 21 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"rust-analyzer.cargo.noDefaultFeatures": true,
"rust-analyzer.cargo.features": ["dev", "test"],
"cSpell.words": [
"aarch",
"canonicalize",
"chrono",
"crond",
Expand All @@ -25,6 +26,8 @@
"Nanocld",
"nstore",
"ntex",
"statefile"
"schemars",
"statefile",
"utoipa"
]
}
2 changes: 1 addition & 1 deletion bin/nanocld/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::Parser;
/// Nanocl Daemon - Self Sufficient Orchestrator
#[derive(Debug, Clone, Parser)]
#[command(name = "Nanocl")]
#[command(author = "nexthat team <[email protected]>")]
#[command(author = "Next Hat team <[email protected]>")]
#[command(version)]
pub struct Cli {
/// Hosts to listen to use tcp:// and unix:// [default: unix:///run/nanocl.sock]
Expand Down
29 changes: 15 additions & 14 deletions bin/nanocld/src/models/raw_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,14 @@ impl RawEventEmitter {
}

/// Send an event to all clients
pub fn emit(&self, e: &Event) -> IoResult<()> {
let inner = self.inner.lock().map_err(|err| {
IoError::interrupted("RawEmitterMutex", err.to_string().as_str())
})?;
for client in &inner.clients {
pub async fn emit(&self, e: &Event) -> IoResult<()> {
let inner = Arc::clone(&self.inner);
let clients = web::block(move || {
let clients = inner.lock()?.clients.clone();
Ok::<_, IoError>(clients)
})
.await?;
for client in clients {
match e.try_to_bytes() {
Ok(msg) => {
let _ = client.try_send(msg);
Expand All @@ -122,16 +125,14 @@ impl RawEventEmitter {
}

/// Subscribe to events
pub fn subscribe(&self) -> IoResult<RawEventClient> {
pub async fn subscribe(&self) -> IoResult<RawEventClient> {
let (tx, rx) = channel(100);
self
.inner
.lock()
.map_err(|err| {
IoError::interrupted("RawEmitterMutex", err.to_string().as_str())
})?
.clients
.push(tx);
let inner = Arc::clone(&self.inner);
web::block(move || {
inner.lock()?.clients.push(tx);
Ok::<_, IoError>(())
})
.await?;
Ok(RawEventClient(rx))
}
}
6 changes: 3 additions & 3 deletions bin/nanocld/src/models/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl EventManager {
log::trace!("event_manager: dispatch_event {:?}", ev);
let self_ptr = self.clone();
rt::spawn(async move {
self_ptr.raw.emit(&ev)?;
self_ptr.raw.emit(&ev).await?;
Ok::<(), IoError>(())
});
}
Expand Down Expand Up @@ -100,8 +100,8 @@ impl SystemState {
});
}

pub fn subscribe_raw(&self) -> IoResult<RawEventClient> {
self.event_manager.raw.subscribe()
pub async fn subscribe_raw(&self) -> IoResult<RawEventClient> {
self.event_manager.raw.subscribe().await
}

pub fn emit_normal_native_action<A>(
Expand Down
2 changes: 1 addition & 1 deletion bin/nanocld/src/services/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn get_info(
pub async fn watch_event(
state: web::types::State<SystemState>,
) -> HttpResult<web::HttpResponse> {
let stream = state.subscribe_raw()?;
let stream = state.subscribe_raw().await?;
Ok(
web::HttpResponse::Ok()
.content_type("text/event-stream")
Expand Down
2 changes: 1 addition & 1 deletion bin/nanocld/src/subsystem/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ mod tests {
// Test state
let state = init(&config).await.unwrap();
let state_ptr = state.clone();
let mut raw_sub = state.subscribe_raw().unwrap();
let mut raw_sub = state.subscribe_raw().await.unwrap();
rt::spawn(async move {
ntex::time::sleep(std::time::Duration::from_secs(1)).await;
let actor = Resource::default();
Expand Down

0 comments on commit 795bb42

Please sign in to comment.