Skip to content

Commit

Permalink
Daemon and hwinfo restructuring
Browse files Browse the repository at this point in the history
  • Loading branch information
Levminer committed Aug 3, 2024
1 parent f0f70a4 commit 08fdfc6
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 84 deletions.
181 changes: 171 additions & 10 deletions platforms/unix/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use axum::{
routing::get,
Router,
};
use hardwareinfo::settings::get_settings;
use ezrtc::host::EzRTCHost;
use ezrtc::socket::DataChannelHandler;
use futures::{sink::SinkExt, stream::StreamExt};
use hardwareinfo::settings::get_settings;
use hardwareinfo::{refresh_hardware_info, Data, HardwareInfo, Networks, Nvml, System};
use log::{error, info, warn, LevelFilter};
use serde::{Deserialize, Serialize};
Expand All @@ -21,7 +21,7 @@ use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tower_http::{
cors::{Any, CorsLayer},
trace::{DefaultMakeSpan, TraceLayer},
Expand All @@ -32,13 +32,15 @@ use webrtc::ice_transport::ice_server::RTCIceServer;
use wol::{send_wol, MacAddr};

#[derive(Serialize, Deserialize)]
struct GenericMessage<T> {
pub struct GenericMessage<T> {
pub r#type: String,
pub data: T,
}

struct AppState {
pub struct AppState {
hardware_info_receiver: async_channel::Receiver<HardwareInfo>,
last_60s_hardware_info: Mutex<Vec<HardwareInfo>>,
last_60m_hardware_info: Mutex<Vec<HardwareInfo>>,
}

#[tokio::main]
Expand Down Expand Up @@ -66,6 +68,8 @@ async fn main() {

let app_state = Arc::new(AppState {
hardware_info_receiver: r.clone(),
last_60s_hardware_info: Mutex::new(Vec::new()),
last_60m_hardware_info: Mutex::new(Vec::new()),
});

// Setup HTTP server routes
Expand All @@ -81,7 +85,7 @@ async fn main() {
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
)
.with_state(app_state);
.with_state(app_state.clone());

let listener = tokio::net::TcpListener::bind("127.0.0.1:5390")
.await
Expand All @@ -93,6 +97,7 @@ async fn main() {
);

// Refresh hardware info every 5 seconds
let app_state_clone = app_state.clone();
let hw_task = tokio::spawn(async move {
loop {
data.sys.refresh_all();
Expand All @@ -103,10 +108,45 @@ async fn main() {

s.send(data.hw_info.clone()).await.unwrap();

if app_state_clone.last_60s_hardware_info.lock().unwrap().len() > 60 {
app_state_clone
.last_60s_hardware_info
.lock()
.unwrap()
.remove(0);
app_state_clone
.last_60s_hardware_info
.lock()
.unwrap()
.push(data.hw_info.clone());
} else {
app_state_clone
.last_60s_hardware_info
.lock()
.unwrap()
.push(data.hw_info.clone());
}

tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

// Save last 60m hardware info
let app_state_clone = app_state.clone();
let rcv = r.clone();
let last_60m_hardware_info_task = tokio::spawn(async move {
loop {
let data = rcv.recv().await.unwrap();
app_state_clone
.last_60m_hardware_info
.lock()
.unwrap()
.push(data);

tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

// Start HTTP server
let server_task = tokio::spawn(async move {
axum::serve(
Expand All @@ -128,17 +168,85 @@ async fn main() {
// Define your data channel handler
struct MyDataChannelHandler {
receiver: async_channel::Receiver<HardwareInfo>,
state: Arc<AppState>,
}

impl DataChannelHandler for MyDataChannelHandler {
fn handle_data_channel_open(&self, dc: Arc<RTCDataChannel>) {
warn!("Data channel opened!");

let receiver = self.receiver.clone();
let state = self.state.clone();

tokio::spawn(async move {
loop {
if dc.ready_state() == RTCDataChannelState::Open {
if dc.ready_state() == RTCDataChannelState::Open {
let last60s_hardware_info =
state.last_60s_hardware_info.lock().unwrap().clone();
let last60m_hardware_info =
state.last_60m_hardware_info.lock().unwrap().clone();

// get every third element from the last 60s hardware info
let last60s_hardware_info = last60s_hardware_info
.iter()
.step_by(3)
.cloned()
.collect::<Vec<HardwareInfo>>();
let last60m_hardware_info = last60m_hardware_info
.iter()
.step_by(3)
.cloned()
.collect::<Vec<HardwareInfo>>();

// Send initial data
let hw_message = receiver.recv().await.unwrap();
let network_data = GenericMessage::<HardwareInfo> {
r#type: "initialData".to_string(),
data: hw_message.clone(),
};

if dc
.send_text(serde_json::to_string(&network_data).unwrap())
.await
.is_err()
{
info!("Failed to send initialData to client");
};

// Send 60s data
for hw_info in last60s_hardware_info {
let network_data = GenericMessage::<HardwareInfo> {
r#type: "secondsData".to_string(),
data: hw_info.clone(),
};

if dc
.send_text(serde_json::to_string(&network_data).unwrap())
.await
.is_err()
{
info!("Failed to send secondsData to client");
};
}

// Send 60m data
for hw_info in last60m_hardware_info {
let network_data = GenericMessage::<HardwareInfo> {
r#type: "minutesData".to_string(),
data: hw_info.clone(),
};

if dc
.send_text(serde_json::to_string(&network_data).unwrap())
.await
.is_err()
{
info!("Failed to send minutesData to client");
break;
};
}

// Send data every 2 second
loop {
let hw_message = receiver.recv().await.unwrap();
let network_data = GenericMessage::<HardwareInfo> {
r#type: "data".to_string(),
Expand All @@ -151,10 +259,11 @@ async fn main() {
.is_err()
{
info!("Failed to send data to client");
break;
};
}

tokio::time::sleep(std::time::Duration::from_secs(2)).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
});
}
Expand Down Expand Up @@ -226,7 +335,6 @@ async fn main() {

// Get settings
let settings = get_settings();

info!("Connection code: {:?}", settings.connection_code);

// Start the connection
Expand All @@ -236,6 +344,7 @@ async fn main() {
ice_servers,
Arc::new(Box::new(MyDataChannelHandler {
receiver: r.clone(),
state: app_state.clone(),
})),
)
.await;
Expand All @@ -247,6 +356,7 @@ async fn main() {
}
});

// Start tasks
tokio::select! {
_ = server_task => {
info!("Server stopped");
Expand All @@ -257,6 +367,9 @@ async fn main() {
_ = hw_task => {
info!("HW stopped");
}
_ = last_60m_hardware_info_task => {
info!("Last 60s hardware info stopped");
}
};

error!("Daemon stopped");
Expand Down Expand Up @@ -291,6 +404,54 @@ async fn handle_socket(mut socket: WebSocket, addr: SocketAddr, state: Arc<AppSt
// Split socket into sender and receiver
let (mut sender, mut receiver) = socket.split();

let state2 = state.clone();
let last60s_hardware_info = state2.last_60s_hardware_info.lock().unwrap().clone();
let last60m_hardware_info = state2.last_60m_hardware_info.lock().unwrap().clone();

// Get every third element from the last 60s and 60m hardware info
let last60s_hardware_info = last60s_hardware_info
.iter()
.step_by(3)
.cloned()
.collect::<Vec<HardwareInfo>>();
let last60m_hardware_info = last60m_hardware_info
.iter()
.step_by(3)
.cloned()
.collect::<Vec<HardwareInfo>>();

// Send last 60s hardware info
for hw_info in last60s_hardware_info {
let network_data = GenericMessage::<HardwareInfo> {
r#type: "secondsData".to_string(),
data: hw_info.clone(),
};

if sender
.send(Message::Text(serde_json::to_string(&network_data).unwrap()))
.await
.is_err()
{
break;
}
}

// Send last 60m hardware info
for hw_info in last60m_hardware_info {
let network_data = GenericMessage::<HardwareInfo> {
r#type: "minutesData".to_string(),
data: hw_info.clone(),
};

if sender
.send(Message::Text(serde_json::to_string(&network_data).unwrap()))
.await
.is_err()
{
break;
}
}

// Spawn a sender task to send data to the client
let mut send_task = tokio::spawn(async move {
let receiver = state.hardware_info_receiver.clone();
Expand Down
Loading

0 comments on commit 08fdfc6

Please sign in to comment.