diff --git a/network.Dockerfile b/network.Dockerfile index 8e514d55c..47d9660eb 100644 --- a/network.Dockerfile +++ b/network.Dockerfile @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1.2 -FROM rust:1.76-buster as builder +FROM rust:1.77.2-buster as builder WORKDIR /build COPY . . diff --git a/simulator/README.md b/simulator/README.md index e1caea12f..d231dd8f2 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -18,6 +18,7 @@ cargo install --path ./simulator # Or alias via `alias simulator="cargo run --bin simulator --"` simulator prepare +simulator build simulator node start simulator node logs -f simulator node exec diff --git a/simulator/src/compose.rs b/simulator/src/compose.rs index 3b00d10e8..9c2f967fa 100644 --- a/simulator/src/compose.rs +++ b/simulator/src/compose.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; use std::ffi::OsStr; use std::path::PathBuf; -use std::process::{Command, Stdio}; +use std::process::{Command, Output, Stdio}; +use std::str; +use std::str::FromStr; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use crate::config::ServiceConfig; +use crate::node::NodeOptions; pub struct ComposeRunner { compose_path: PathBuf, @@ -42,8 +45,40 @@ impl ComposeRunner { }) } - pub fn add_service(&mut self, name: String, service: Service) { - self.compose.services.insert(name, service); + pub fn add_prometheus(&mut self) -> Result<()> { + let prom_data = serde_json::json!( + { + "image": "prom/prometheus", + "ports": ["9090:9090"], + "restart": "unless-stopped", + "volumes": ["./prometheus:/etc/prometheus"], + "command": ["--config.file=/etc/prometheus/prometheus.yml"] + }); + self.compose + .services + .insert("prometheus".to_string(), prom_data); + Ok(()) + } + + pub fn add_grafana(&mut self) -> Result<()> { + let prom_data = serde_json::json!( + { + "image": "grafana/grafana", + "ports": ["3000:3000"], + "restart": "unless-stopped", + "volumes": ["./grafana:/etc/grafana/provisioning/datasources"], + "environment": ["GF_SECURITY_ADMIN_USER=admin", "GF_SECURITY_ADMIN_PASSWORD=grafana"] + }); + self.compose + .services + .insert("grafana".to_string(), prom_data); + Ok(()) + } + + pub fn add_service(&mut self, name: String, service: Service) -> Result<()> { + let value = serde_json::to_value(service)?; + self.compose.services.insert(name, value); + Ok(()) } pub fn finalize(&self) -> Result<()> { @@ -55,7 +90,7 @@ impl ComposeRunner { } /// Executes a Docker Compose command with the given arguments. - pub fn execute_compose_command(&self, args: &[T]) -> Result<()> + pub fn execute_compose_command(&self, args: &[T]) -> Result where T: AsRef, { @@ -66,16 +101,16 @@ impl ComposeRunner { command = command.arg(arg); } - command - .stdout(Stdio::inherit()) + let result = command + .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .stdin(Stdio::inherit()) .spawn() .context("Failed to spawn Docker Compose command")? - .wait() + .wait_with_output() .context("Failed to wait on Docker Compose command")?; - Ok(()) + Ok(result) } pub fn logs(&self, follow: bool, node: Option) -> Result<()> { @@ -89,7 +124,8 @@ impl ComposeRunner { args.push(format!("node-{}", node_index)); } - self.execute_compose_command(&args) + self.execute_compose_command(&args)?; + Ok(()) } pub fn stop_node(&self, node_index: Option) -> Result<()> { @@ -97,7 +133,8 @@ impl ComposeRunner { if let Some(node_index) = node_index { args.push(format!("node-{}", node_index)); } - self.execute_compose_command(&args) + self.execute_compose_command(&args)?; + Ok(()) } pub fn start_node(&self, node_index: Option) -> Result<()> { @@ -107,16 +144,75 @@ impl ComposeRunner { } args.push("-d".to_string()); - self.execute_compose_command(&args) + self.execute_compose_command(&args)?; + + { + for i in self.get_running_nodes_list()? { + let index = usize::from_str(&i[5..6])?; + let info = self.node_info(index)?; + if info.delay > 0 { + self.set_delay(index, info.delay)?; + } + if info.packet_loss > 0 { + self.set_packet_loss(index, info.packet_loss)?; + } + } + } + + Ok(()) } - pub fn exec_command(&self, node_index: usize, cmd: &str, args: Vec) -> Result<()> { + pub fn get_running_nodes_list(&self) -> Result> { + let docker_compose_command = vec!["config".to_string(), "--services".to_string()]; + let output = self.execute_compose_command(&docker_compose_command)?; + let x = String::from_utf8(output.stdout)? + .split("\n") + .map(|x| x.to_string()) + .collect(); + Ok(x) + } + + pub fn node_info(&self, node_index: usize) -> Result { + let command = "cat"; + let output = self.exec_command( + node_index, + command, + vec!["/options/options.json".to_string()], + )?; + let node_options = serde_json::from_slice(output.stdout.as_slice())?; + Ok(node_options) + } + + pub fn set_delay(&self, node_index: usize, delay: u16) -> Result<()> { + println!("Setting delay {delay}ms for node {node_index}"); + let command = "sh"; + let args = format!("tc qdisc add dev eth0 root netem delay {delay}ms"); + self.exec_command( + node_index, + command, + vec!["-c".to_string(), format!("{args}")], + )?; + Ok(()) + } + + pub fn set_packet_loss(&self, node_index: usize, loss: u16) -> Result<()> { + println!("Setting packet loss {loss}% for node {node_index}"); + let command = "sh"; + let args = format!("tc qdisc change dev eth0 root netem loss {loss}%"); + self.exec_command( + node_index, + command, + vec!["-c".to_string(), format!("{args}")], + )?; + Ok(()) + } + + pub fn exec_command(&self, node_index: usize, cmd: &str, args: Vec) -> Result { let service_name = format!("node-{}", node_index); let mut docker_compose_command = vec!["exec".to_string(), service_name, cmd.to_string()]; docker_compose_command.extend(args); - self.execute_compose_command(&docker_compose_command)?; - - Ok(()) + let output = self.execute_compose_command(&docker_compose_command)?; + Ok(output) } pub fn down(&self) -> Result<()> { @@ -128,7 +224,7 @@ impl ComposeRunner { #[derive(Serialize, Deserialize, Debug)] struct DockerCompose { version: String, - services: HashMap, + services: HashMap, networks: HashMap, } diff --git a/simulator/src/config.rs b/simulator/src/config.rs index d6a26e47e..73312d472 100644 --- a/simulator/src/config.rs +++ b/simulator/src/config.rs @@ -46,4 +46,14 @@ impl ServiceConfig { pub fn entrypoints(&self) -> PathBuf { self.scratch_dir.join("entrypoints") } + pub fn grafana(&self) -> PathBuf { + self.scratch_dir.join("grafana") + } + pub fn prometheus(&self) -> PathBuf { + self.scratch_dir.join("prometheus") + } + + pub fn options(&self) -> PathBuf { + self.scratch_dir.join("options") + } } diff --git a/simulator/src/main.rs b/simulator/src/main.rs index d13f27c94..1c586e3aa 100644 --- a/simulator/src/main.rs +++ b/simulator/src/main.rs @@ -113,7 +113,8 @@ impl StatusCommand { let config = config::ServiceConfig::new(DEFAULT_SUBNET.to_string())?; let compose = ComposeRunner::load_from_fs(&config)?; - compose.execute_compose_command(&["ps"]) + compose.execute_compose_command(&["ps"])?; + Ok(()) } } @@ -147,6 +148,7 @@ enum NodeCommand { Add(AddCommand), Start(NodeStartCommand), Stop(NodeStopCommand), + Info(NodeInfoCommand), Logs(NodeLogsCommand), Exec(NodeExecCommand), Status(StatusCommand), @@ -164,19 +166,25 @@ impl NodeCommand { NodeCommand::Logs(a) => a.run(compose), NodeCommand::Exec(a) => a.run(compose), NodeCommand::Status(a) => a.run(), + NodeCommand::Info(a) => a.run(compose), } } } #[derive(Parser)] -struct AddCommand; +struct AddCommand { + #[clap(short, long)] + pub delay: Option, + #[clap(short, long)] + pub loss: Option, +} impl AddCommand { fn run(self) -> Result<()> { let config = config::ServiceConfig::new(DEFAULT_SUBNET.to_string())?; let mut sim = Simulator::new(config)?; let next_node_index = sim.next_node_index(); - sim.add_node(next_node_index)?; + sim.add_node(next_node_index, self.delay, self.loss)?; sim.finalize()?; println!("Added node-{}", next_node_index); @@ -232,6 +240,24 @@ struct NodeExecCommand { impl NodeExecCommand { fn run(self, compose: ComposeRunner) -> Result<()> { - compose.exec_command(self.node_index, &self.cmd, self.args) + compose.exec_command(self.node_index, &self.cmd, self.args)?; + Ok(()) + } +} + +#[derive(Parser)] +struct NodeInfoCommand { + #[clap(short, long)] + node_index: usize, +} + +impl NodeInfoCommand { + fn run(self, compose: ComposeRunner) -> Result<()> { + let output = compose.node_info(self.node_index)?; + println!( + "Node {} artificial delay: {} ms and packet loss: {}% ", + self.node_index, output.delay, output.packet_loss + ); + Ok(()) } } diff --git a/simulator/src/node.rs b/simulator/src/node.rs index 2c970dea9..8d643aeb0 100644 --- a/simulator/src/node.rs +++ b/simulator/src/node.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use std::process::Command; use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; use crate::compose::{Service, ServiceNetwork}; use crate::config::ServiceConfig; @@ -14,10 +15,22 @@ pub struct Node { pub port: u16, pub dht_value: serde_json::Value, pub key: String, + pub options: Option, +} + +#[derive(Serialize, Deserialize, Default, Debug)] +pub struct NodeOptions { + pub delay: u16, + pub packet_loss: u16, } impl Node { - pub fn init_from_cli(ip: Ipv4Addr, port: u16, index: usize) -> Result { + pub fn init_from_cli( + ip: Ipv4Addr, + port: u16, + index: usize, + options: Option, + ) -> Result { let private_key = hex::encode(rand::random::<[u8; 32]>()); let output = Command::new("cargo") .arg("run") @@ -43,6 +56,7 @@ impl Node { dht_value, port, key: private_key, + options, }) } @@ -56,6 +70,10 @@ impl Node { "{}:/app/global-config.json", service_config.global_config_path().to_string_lossy() ), + format!( + "./options/node-{}_options.json:/options/options.json", + self.index + ), format!( "{}:/app/logs:rw", self.logs_dir(service_config).to_string_lossy() @@ -92,6 +110,12 @@ impl Node { .join(format!("node-{}_entrypoint.sh", self.index)) } + pub fn options_path(&self, service_config: &ServiceConfig) -> PathBuf { + service_config + .options() + .join(format!("node-{}_options.json", self.index)) + } + pub fn run_command(&self) -> String { format!( "run {ip}:{node_port} --key {key} --global-config /app/global-config.json", diff --git a/simulator/src/simulator.rs b/simulator/src/simulator.rs index 27c5898bf..230458660 100644 --- a/simulator/src/simulator.rs +++ b/simulator/src/simulator.rs @@ -1,12 +1,13 @@ use std::net::Ipv4Addr; use std::os::unix::fs::PermissionsExt; +use std::str::FromStr; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; -use crate::compose::ComposeRunner; +use crate::compose::{ComposeRunner, Service}; use crate::config::ServiceConfig; -use crate::node::Node; +use crate::node::{Node, NodeOptions}; pub(crate) struct Simulator { config: ServiceConfig, @@ -37,10 +38,15 @@ impl Simulator { } pub fn prepare(&mut self, nodes: usize) -> Result<()> { + let mut ips = Vec::new(); for node_index in 0..nodes { - self.add_node(node_index)?; + let ip = self.add_node(node_index, None, None)?; + ips.push(ip) } + self.add_grafana()?; + self.add_prometheus(ips)?; + self.finalize()?; Ok(()) } @@ -56,43 +62,145 @@ impl Simulator { } // updates the next_node_ip and adds a new node to the network - pub fn add_node(&mut self, node_index: usize) -> Result<()> { + pub fn add_node( + &mut self, + node_index: usize, + delay: Option, + loss: Option, + ) -> Result { let node_ip = increment_ip(self.next_node_ip, 1); - let mut node = Node::init_from_cli(node_ip, self.config.node_port, node_index) + + let options = match (delay, loss) { + (Some(delay), Some(loss)) => Some(NodeOptions { + delay, + packet_loss: loss, + }), + (Some(delay), None) => Some(NodeOptions { + delay, + packet_loss: 0, + }), + (None, Some(loss)) => Some(NodeOptions { + delay: 0, + packet_loss: loss, + }), + (None, None) => None, + }; + + let mut node = Node::init_from_cli(node_ip, self.config.node_port, node_index, options) .with_context(|| format!("failed to init node-{node_index}"))?; + let ip = node.ip.to_string(); let service = node.as_service(&self.config)?; self.global_config .bootstrap_peers .push(node.dht_value.take()); self.compose - .add_service(format!("node-{}", node_index), service); + .add_service(format!("node-{}", node_index), service)?; let logs_dir = node.logs_dir(&self.config); println!("Creating {:?}", logs_dir); std::fs::create_dir_all(&logs_dir)?; - self.write_entrypoint(node)?; + self.write_run_data(node)?; self.next_node_ip = node_ip; - Ok(()) + Ok(ip) + } + + pub fn add_grafana(&mut self) -> Result<()> { + self.write_grafana_data()?; + self.compose.add_grafana() + } + + pub fn add_prometheus(&mut self, node_addresses: Vec) -> Result<()> { + self.write_prometheus_data(node_addresses)?; + self.compose.add_prometheus() } pub fn next_node_index(&self) -> usize { self.global_config.bootstrap_peers.len() } - fn write_entrypoint(&self, node: Node) -> Result<()> { + fn write_grafana_data(&self) -> Result<()> { + std::fs::create_dir_all(self.config.grafana())?; + let grafana_data = r#"apiVersion: 1 + +datasources: +- name: Prometheus + type: prometheus + url: http://prometheus:9090 + isDefault: true + access: proxy + editable: true + "#; + let grafana_datasource_config = self.config.grafana().join("datasource.yml"); + std::fs::write(&grafana_datasource_config, grafana_data) + .context("Failed to write grafana data")?; + + Ok(()) + } + + fn write_prometheus_data(&self, node_addresses: Vec) -> Result<()> { + let nodes = node_addresses + .iter() + .map(|x| format!("- {x}:9081")) + .reduce(|left, right| format!("{}\n {}", left, right)) + .unwrap_or_default(); + std::fs::create_dir_all(self.config.prometheus())?; + let prometheus_data = format!( + r#"global: + scrape_interval: 15s + scrape_timeout: 10s + evaluation_interval: 15s +alerting: + alertmanagers: + - static_configs: + - targets: [] + scheme: http + timeout: 10s + api_version: v1 +scrape_configs: +- job_name: prometheus + honor_timestamps: true + scrape_interval: 15s + scrape_timeout: 10s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + {} + "#, + nodes + ); + let prometheus_datasource_config = self.config.prometheus().join("prometheus.yml"); + std::fs::write(&prometheus_datasource_config, prometheus_data) + .context("Failed to write prometheus data")?; + Ok(()) + } + fn write_run_data(&self, node: Node) -> Result<()> { let entrypoint_data = generate_entrypoint(node.run_command()); let entrypoint_path = node.entrypoint_path(&self.config); + let options_path = node.options_path(&self.config); println!("Writing entrypoint to {:?}", entrypoint_path); std::fs::create_dir_all(self.config.entrypoints())?; + std::fs::create_dir_all(self.config.options())?; + std::fs::write(&entrypoint_path, entrypoint_data) .context("Failed to write entrypoint data")?; std::fs::set_permissions(entrypoint_path, std::fs::Permissions::from_mode(0o755)) .context("Failed to set entrypoint permissions")?; + println!("Writing persistent options json file"); + let data = match node.options { + Some(options) => serde_json::to_string(&options)?, + None => serde_json::to_string(&NodeOptions::default())?, + }; + + std::fs::write(&options_path, data).context("Failed to write node options")?; + std::fs::set_permissions(options_path, std::fs::Permissions::from_mode(0o755)) + .context("Failed to set node options permissions")?; + Ok(()) } }