Skip to content

Commit

Permalink
Implement simple stats
Browse files Browse the repository at this point in the history
  • Loading branch information
drogus committed Mar 21, 2024
1 parent c5e7384 commit 7785631
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 105 deletions.
1 change: 1 addition & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::PathBuf;

use clap::{Parser, Subcommand};

#[derive(Clone)]
struct ClientService;

impl Client for ClientService {}
Expand Down
92 changes: 52 additions & 40 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,29 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use crows_utils::services::{
create_coordinator_server, create_worker_to_coordinator_server, ClientClient, CoordinatorError,
RunId, WorkerClient, WorkerStatus,
};
use crows_utils::services::{Coordinator, WorkerToCoordinator};
use crows_wasm::{fetch_config, Instance};
use futures::future::join_all;
use tokio::sync::Mutex;
use tokio::time::sleep;
use crows_utils::services::{
create_coordinator_server, create_worker_to_coordinator_server, CoordinatorError, WorkerClient,
WorkerStatus, ClientClient
};
use crows_utils::services::{Coordinator, WorkerToCoordinator};
use uuid::Uuid;

// TODO: I don't like the fact that we have to wrap the client in Mutex and option. It should
// be easier to match the client object with the request to the service. I should probably
// add a context object at some point.
// TODO: Client should probably be thread safe for easier handling
#[derive(Default)]
#[derive(Default, Clone)]
struct WorkerToCoordinatorService {
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
client: Arc<Mutex<Option<WorkerClient>>>,
}

struct WorkerEntry {
client: Arc<Mutex<Option<WorkerClient>>>,
client: WorkerClient,
hostname: String,
status: WorkerStatus,
}
Expand All @@ -40,6 +39,7 @@ impl WorkerToCoordinator for WorkerToCoordinatorService {
struct CoordinatorService {
scenarios: Arc<Mutex<HashMap<String, Vec<u8>>>>,
workers: Arc<Mutex<HashMap<Uuid, WorkerEntry>>>,
runs: Arc<Mutex<HashMap<RunId, Vec<WorkerEntry>>>>,
}

impl Coordinator for CoordinatorService {
Expand All @@ -56,18 +56,18 @@ impl Coordinator for CoordinatorService {
name: String,
content: Vec<u8>,
) -> Result<(), CoordinatorError> {
// TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be
// TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be
// gzipping once and sending to N clients
//
// send each uploaded scenario to all of the workers
for (_, worker_entry) in self.workers.lock().await.iter() {
let locked = worker_entry.client.lock();
let mut futures = Vec::new();
futures.push(async {
if let Some(client) = locked.await.as_mut() {
// TODO: handle Result
client.upload_scenario(name.clone(), content.clone()).await;
}
// TODO: handle Result
worker_entry
.client
.upload_scenario(name.clone(), content.clone())
.await;
});

join_all(futures).await;
Expand All @@ -77,7 +77,13 @@ impl Coordinator for CoordinatorService {
Ok(())
}

async fn start(&self, _: ClientClient, name: String, workers_number: usize) -> Result<(), CoordinatorError> {
async fn start(
&self,
_: ClientClient,
name: String,
workers_number: usize,
) -> Result<RunId, CoordinatorError> {
let id = RunId::new();
// TODO: we should check if we have enough workers
// TODO: also this way we will always choose the same workers. in the future we should
// either always split between all workers or do some kind of round robin
Expand All @@ -86,24 +92,39 @@ impl Coordinator for CoordinatorService {
// TODO: creating a runtime is probably fast enough, but I'd like to measure and see
// if it's not better to keep one around so we don't create it before each test run
let scenarios = self.scenarios.lock().await;
let scenario = scenarios.get(&name).ok_or(CoordinatorError::NoSuchModule(name.clone()))?.to_owned();
let scenario = scenarios
.get(&name)
.ok_or(CoordinatorError::NoSuchModule(name.clone()))?
.to_owned();
drop(scenarios);

let runtime = crows_wasm::Runtime::new(&scenario).map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?;
let (instance, _, mut store) = Instance::new(&runtime.environment, &runtime.module).await.map_err(|_| CoordinatorError::FailedToCompileModule)?;
let config = fetch_config(instance, &mut store).await.map_err(|err| CoordinatorError::CouldNotFetchConfig(err.to_string()))?.split(workers_number);
let (runtime, _) = crows_wasm::Runtime::new(&scenario)
.map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?;
let (instance, _, mut store) = Instance::new(&runtime.environment, &runtime.module)
.await
.map_err(|_| CoordinatorError::FailedToCompileModule)?;
let config = fetch_config(instance, &mut store)
.await
.map_err(|err| CoordinatorError::CouldNotFetchConfig(err.to_string()))?
.split(workers_number);

for (_, worker_entry) in self.workers.lock().await.iter().take(workers_number) {
if let Some(client) = worker_entry.client.lock().await.as_mut() {
let name = name.clone();
let config = config.clone();
let id = id.clone();
let client = worker_entry.client.clone();
tokio::spawn(async move {
// TODO: at the moment we split config to split the load between each of the
// workers, which means that if a worker dies, we will not get a full test
// It would be ideal if we had a way to j
// client.start(name.clone(), config.clone()).await;
client.start(name.clone(), config.clone()).await.unwrap();
}
if let Err(err) = client.start(name, config, id).await {
eprintln!("Got an error while trying to execute a scenario: {err:?}");
}
});
}

Ok(())
Ok(id)
}

async fn list_workers(&self, _: ClientClient) -> Vec<String> {
Expand Down Expand Up @@ -138,10 +159,8 @@ pub async fn main() {
.unwrap();

loop {
let wrapped_client: Arc<Mutex<Option<WorkerClient>>> = Default::default();
let service = WorkerToCoordinatorService {
scenarios: scenarios.clone(),
client: wrapped_client.clone(),
workers: workers.clone(),
};

Expand All @@ -159,25 +178,14 @@ pub async fn main() {
}
drop(locked);

let mut locked = wrapped_client.lock().await;
*locked = Some(client);
drop(locked);

let mut id = None;
if let Ok(data) = wrapped_client
.lock()
.await
.as_mut()
.unwrap()
.get_data()
.await
{
if let Ok(data) = client.get_data().await {
id = Some(data.id.clone());
let mut locked = workers.lock().await;
locked.entry(data.id).or_insert(WorkerEntry {
client: wrapped_client.clone(),
client: client.clone(),
hostname: data.hostname,
status: WorkerStatus::Busy
status: WorkerStatus::Busy,
});
drop(locked);
}
Expand All @@ -204,7 +212,11 @@ pub async fn main() {
let server = create_coordinator_server(format!("0.0.0.0:{client_port}"))
.await
.unwrap();
let service = CoordinatorService { scenarios, workers };
let service = CoordinatorService {
scenarios,
workers,
runs: Default::default(),
};

while let Some(mut client) = server.accept(service.clone()).await {
tokio::spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions utils/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub enum WorkerStatus {
#[service(variant = "server", other_side = Client)]
pub trait Coordinator {
async fn upload_scenario(name: String, content: Vec<u8>) -> Result<(), CoordinatorError>;
async fn start(name: String, workers_number: usize) -> Result<(), CoordinatorError>;
async fn start(name: String, workers_number: usize) -> Result<RunId, CoordinatorError>;
async fn list_workers() -> Vec<String>;
async fn update_status(&self, status: WorkerStatus, id: Uuid);
}
Expand All @@ -77,7 +77,7 @@ pub struct WorkerData {
pub trait Worker {
async fn upload_scenario(&self, name: String, content: Vec<u8>);
async fn ping(&self) -> String;
async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError>;
async fn start(&self, name: String, config: crows_shared::Config, run_id: RunId) -> Result<(), WorkerError>;
async fn get_data(&self) -> WorkerData;
}

Expand Down
Loading

0 comments on commit 7785631

Please sign in to comment.