Skip to content

Commit

Permalink
removed mess
Browse files Browse the repository at this point in the history
  • Loading branch information
leon3s committed Jan 13, 2024
1 parent 68d3990 commit 0141db5
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 173 deletions.
2 changes: 0 additions & 2 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ args = [
"--features",
"test",
"--html",
"report",
"--lcov",
"--",
"--test-threads",
"1",
Expand Down
138 changes: 13 additions & 125 deletions bin/nanocld/src/models/system.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use std::sync::Arc;

use ntex::{rt, time};
use futures::channel::mpsc;
use futures_util::{SinkExt, StreamExt};
use ntex::rt;
use nanocl_error::io::{IoResult, FromIo, IoError};
use nanocl_stubs::{
config::DaemonConfig,
Expand All @@ -16,30 +11,10 @@ use crate::{vars, utils, repositories::generic::*};

use super::{Pool, EventDb, RawEventEmitter, RawEventClient, TaskManager};

#[derive(Debug)]
pub enum SystemEventKind {
Emit(Event),
Ping,
Subscribe(SystemEventEmitter),
}

pub type SystemEventEmitter = mpsc::UnboundedSender<SystemEventKind>;
pub type SystemEventReceiver = mpsc::UnboundedReceiver<SystemEventKind>;

#[derive(Clone)]
pub struct EventManagerInner {
/// Clients that are subscribed to the event emitter
pub clients: Vec<SystemEventEmitter>,
}

#[derive(Clone)]
pub struct EventManager {
/// Inner manager with system clients
pub inner: Arc<Mutex<EventManagerInner>>,
/// Raw emitter for http clients
pub raw: RawEventEmitter,
/// Emitter
pub emitter: SystemEventEmitter,
}

impl Default for EventManager {
Expand All @@ -50,83 +25,17 @@ impl Default for EventManager {

impl EventManager {
pub fn new() -> Self {
let (sx, rx) = mpsc::unbounded();
let inner = Arc::new(Mutex::new(EventManagerInner { clients: vec![] }));
let n = Self {
inner,
emitter: sx,
Self {
raw: RawEventEmitter::new(),
};
n.spawn_check_connection();
n.run_event_loop(rx);
n
}

/// Check if clients are still connected
async fn check_connection(&mut self) {
let mut alive_clients = Vec::new();
let clients = self.inner.lock().unwrap().clients.clone();
for mut client in clients {
if client.send(SystemEventKind::Ping).await.is_err() {
continue;
}
alive_clients.push(client.clone());
}
self.inner.lock().unwrap().clients = alive_clients;
}

/// Spawn a task that will check if clients are still connected
fn spawn_check_connection(&self) {
let mut self_ptr = self.clone();
rt::Arbiter::new().exec_fn(|| {
rt::spawn(async move {
let task = time::interval(Duration::from_secs(10));
loop {
task.tick().await;
self_ptr.check_connection().await;
}
});
});
}

fn dispatch_event(&self, sys_ev: SystemEventKind) -> IoResult<()> {
log::trace!("event_manager: dispatch_event {:?}", sys_ev);
let self_ptr = self.clone();
match sys_ev {
SystemEventKind::Emit(event) => {
rt::spawn(async move {
let clients = self_ptr.inner.lock().unwrap().clients.clone();
for mut client in clients {
let _ = client.send(SystemEventKind::Emit(event.clone())).await;
}
self_ptr.raw.emit(&event)?;
Ok::<(), IoError>(())
});
}
SystemEventKind::Ping => {
log::trace!("event_manager: ping");
}
SystemEventKind::Subscribe(emitter) => {
log::trace!("event_manager: subscribe");
rt::spawn(async move {
self_ptr.inner.lock().unwrap().clients.push(emitter);
Ok::<(), IoError>(())
});
}
}
Ok(())
}

fn run_event_loop(&self, mut rx: SystemEventReceiver) {
fn dispatch_event(&self, ev: Event) {
log::trace!("event_manager: dispatch_event {:?}", ev);
let self_ptr = self.clone();
rt::Arbiter::new().exec_fn(move || {
rt::spawn(async move {
while let Some(event) = rx.next().await {
if let Err(err) = self_ptr.dispatch_event(event) {
log::warn!("event_manager: loop error {err}");
}
}
});
rt::spawn(async move {
self_ptr.raw.emit(&ev)?;
Ok::<(), IoError>(())
});
}
}
Expand Down Expand Up @@ -173,45 +82,24 @@ impl SystemState {
Ok(system_state)
}

pub async fn emit_event(&mut self, event: EventPartial) -> IoResult<()> {
let event: Event = EventDb::create_try_from(event, &self.pool)
pub async fn emit_event(&self, new_ev: EventPartial) -> IoResult<()> {
let ev: Event = EventDb::create_try_from(new_ev, &self.pool)
.await?
.try_into()?;
self
.event_manager
.emitter
.clone()
.send(SystemEventKind::Emit(event))
.await
.map_err(|err| {
IoError::interrupted("EventEmitter", err.to_string().as_str())
})?;
crate::subsystem::exec_event(&ev, self).await?;
self.event_manager.dispatch_event(ev);
Ok(())
}

pub fn spawn_emit_event(&self, event: EventPartial) {
let mut self_ptr = self.clone();
let self_ptr = self.clone();
rt::spawn(async move {
if let Err(err) = self_ptr.emit_event(event).await {
log::warn!("system::spawn_emit_event: {err}");
}
});
}

pub async fn subscribe(&self) -> IoResult<SystemEventReceiver> {
let (sx, rx) = mpsc::unbounded();
self
.event_manager
.emitter
.clone()
.send(SystemEventKind::Subscribe(sx))
.await
.map_err(|err| {
IoError::interrupted("EventEmitter", err.to_string().as_str())
})?;
Ok(rx)
}

pub fn subscribe_raw(&self) -> IoResult<RawEventClient> {
self.event_manager.raw.subscribe()
}
Expand Down
51 changes: 8 additions & 43 deletions bin/nanocld/src/subsystem/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
objects::generic::*,
repositories::generic::*,
models::{
SystemState, JobDb, ProcessDb, SystemEventReceiver, SystemEventKind,
CargoDb, ObjPsStatusUpdate, ObjPsStatusDb, ObjTask,
SystemState, JobDb, ProcessDb, CargoDb, ObjPsStatusUpdate, ObjPsStatusDb,
ObjTask,
},
};

Expand Down Expand Up @@ -344,46 +344,11 @@ async fn update(e: &Event, state: &SystemState) -> IoResult<()> {
}

/// Take action when event is received
async fn exec_event(e: Event, state: &SystemState) -> IoResult<()> {
log::debug!("exec_event: {} {}", e.kind, e.action);
start(&e, state).await?;
delete(&e, state).await?;
update(&e, state).await?;
job_ttl(&e, state).await?;
pub async fn exec_event(ev: &Event, state: &SystemState) -> IoResult<()> {
log::debug!("exec_event: {} {}", ev.kind, ev.action);
start(ev, state).await?;
delete(ev, state).await?;
update(ev, state).await?;
job_ttl(ev, state).await?;
Ok(())
}

/// Read events from the event stream
async fn read_events(stream: &mut SystemEventReceiver, state: &SystemState) {
while let Some(e) = stream.next().await {
if let SystemEventKind::Emit(e) = e {
let state = state.clone();
rt::spawn(async move {
if let Err(err) = exec_event(e, &state).await {
log::warn!("event::read_events: {err}");
}
});
}
}
}

/// Spawn a tread to analyze events from the event stream in his own loop
pub fn analyze(state: &SystemState) {
let state = state.clone();
rt::Arbiter::new().exec_fn(|| {
rt::spawn(async move {
loop {
let mut stream = match state.subscribe().await {
Ok(stream) => stream,
Err(err) => {
log::error!("event::analyze: {err}");
continue;
}
};
log::info!("event::analyze: stream connected");
read_events(&mut stream, &state).await;
ntex::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
});
}
3 changes: 0 additions & 3 deletions bin/nanocld/src/subsystem/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ pub async fn init(conf: &DaemonConfig) -> IoResult<SystemState> {
Ok::<_, IoError>(())
});
super::docker_event::analyze(&system_state);
super::event::analyze(&system_state);
super::metric::spawn(&system_state);
Ok(system_state)
}
Expand Down Expand Up @@ -192,7 +191,5 @@ mod tests {
nanocl_stubs::system::NativeEventAction::Create,
);
});
let mut sub = state.subscribe().await.unwrap();
sub.next().await;
}
}
1 change: 1 addition & 0 deletions bin/nanocld/src/subsystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ mod event;
mod metric;
mod docker_event;

pub use event::exec_event;
pub use init::init;

0 comments on commit 0141db5

Please sign in to comment.