Skip to content

Commit

Permalink
Add ability to set throttling network for simulator (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrWad3r authored May 3, 2024
1 parent bfff28f commit 51b4b86
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 31 deletions.
2 changes: 1 addition & 1 deletion network.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# syntax=docker/dockerfile:1.2
FROM rust:1.76-buster as builder
FROM rust:1.77.2-buster as builder
WORKDIR /build
COPY . .

Expand Down
1 change: 1 addition & 0 deletions simulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ cargo install --path ./simulator
# Or alias via `alias simulator="cargo run --bin simulator --"`

simulator prepare
simulator build
simulator node start
simulator node logs -f
simulator node exec
Expand Down
128 changes: 112 additions & 16 deletions simulator/src/compose.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::process::{Command, Output, Stdio};
use std::str;
use std::str::FromStr;

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};

use crate::config::ServiceConfig;
use crate::node::NodeOptions;

pub struct ComposeRunner {
compose_path: PathBuf,
Expand Down Expand Up @@ -42,8 +45,40 @@ impl ComposeRunner {
})
}

pub fn add_service(&mut self, name: String, service: Service) {
self.compose.services.insert(name, service);
pub fn add_prometheus(&mut self) -> Result<()> {
let prom_data = serde_json::json!(
{
"image": "prom/prometheus",
"ports": ["9090:9090"],
"restart": "unless-stopped",
"volumes": ["./prometheus:/etc/prometheus"],
"command": ["--config.file=/etc/prometheus/prometheus.yml"]
});
self.compose
.services
.insert("prometheus".to_string(), prom_data);
Ok(())
}

pub fn add_grafana(&mut self) -> Result<()> {
let prom_data = serde_json::json!(
{
"image": "grafana/grafana",
"ports": ["3000:3000"],
"restart": "unless-stopped",
"volumes": ["./grafana:/etc/grafana/provisioning/datasources"],
"environment": ["GF_SECURITY_ADMIN_USER=admin", "GF_SECURITY_ADMIN_PASSWORD=grafana"]
});
self.compose
.services
.insert("grafana".to_string(), prom_data);
Ok(())
}

pub fn add_service(&mut self, name: String, service: Service) -> Result<()> {
let value = serde_json::to_value(service)?;
self.compose.services.insert(name, value);
Ok(())
}

pub fn finalize(&self) -> Result<()> {
Expand All @@ -55,7 +90,7 @@ impl ComposeRunner {
}

/// Executes a Docker Compose command with the given arguments.
pub fn execute_compose_command<T>(&self, args: &[T]) -> Result<()>
pub fn execute_compose_command<T>(&self, args: &[T]) -> Result<Output>
where
T: AsRef<OsStr>,
{
Expand All @@ -66,16 +101,16 @@ impl ComposeRunner {
command = command.arg(arg);
}

command
.stdout(Stdio::inherit())
let result = command
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.stdin(Stdio::inherit())
.spawn()
.context("Failed to spawn Docker Compose command")?
.wait()
.wait_with_output()
.context("Failed to wait on Docker Compose command")?;

Ok(())
Ok(result)
}

pub fn logs(&self, follow: bool, node: Option<usize>) -> Result<()> {
Expand All @@ -89,15 +124,17 @@ impl ComposeRunner {
args.push(format!("node-{}", node_index));
}

self.execute_compose_command(&args)
self.execute_compose_command(&args)?;
Ok(())
}

pub fn stop_node(&self, node_index: Option<usize>) -> Result<()> {
let mut args = vec!["stop".to_string()];
if let Some(node_index) = node_index {
args.push(format!("node-{}", node_index));
}
self.execute_compose_command(&args)
self.execute_compose_command(&args)?;
Ok(())
}

pub fn start_node(&self, node_index: Option<usize>) -> Result<()> {
Expand All @@ -107,16 +144,75 @@ impl ComposeRunner {
}
args.push("-d".to_string());

self.execute_compose_command(&args)
self.execute_compose_command(&args)?;

{
for i in self.get_running_nodes_list()? {
let index = usize::from_str(&i[5..6])?;
let info = self.node_info(index)?;
if info.delay > 0 {
self.set_delay(index, info.delay)?;
}
if info.packet_loss > 0 {
self.set_packet_loss(index, info.packet_loss)?;
}
}
}

Ok(())
}

pub fn exec_command(&self, node_index: usize, cmd: &str, args: Vec<String>) -> Result<()> {
pub fn get_running_nodes_list(&self) -> Result<Vec<String>> {
let docker_compose_command = vec!["config".to_string(), "--services".to_string()];
let output = self.execute_compose_command(&docker_compose_command)?;
let x = String::from_utf8(output.stdout)?
.split("\n")
.map(|x| x.to_string())
.collect();
Ok(x)
}

pub fn node_info(&self, node_index: usize) -> Result<NodeOptions> {
let command = "cat";
let output = self.exec_command(
node_index,
command,
vec!["/options/options.json".to_string()],
)?;
let node_options = serde_json::from_slice(output.stdout.as_slice())?;
Ok(node_options)
}

pub fn set_delay(&self, node_index: usize, delay: u16) -> Result<()> {
println!("Setting delay {delay}ms for node {node_index}");
let command = "sh";
let args = format!("tc qdisc add dev eth0 root netem delay {delay}ms");
self.exec_command(
node_index,
command,
vec!["-c".to_string(), format!("{args}")],
)?;
Ok(())
}

pub fn set_packet_loss(&self, node_index: usize, loss: u16) -> Result<()> {
println!("Setting packet loss {loss}% for node {node_index}");
let command = "sh";
let args = format!("tc qdisc change dev eth0 root netem loss {loss}%");
self.exec_command(
node_index,
command,
vec!["-c".to_string(), format!("{args}")],
)?;
Ok(())
}

pub fn exec_command(&self, node_index: usize, cmd: &str, args: Vec<String>) -> Result<Output> {
let service_name = format!("node-{}", node_index);
let mut docker_compose_command = vec!["exec".to_string(), service_name, cmd.to_string()];
docker_compose_command.extend(args);
self.execute_compose_command(&docker_compose_command)?;

Ok(())
let output = self.execute_compose_command(&docker_compose_command)?;
Ok(output)
}

pub fn down(&self) -> Result<()> {
Expand All @@ -128,7 +224,7 @@ impl ComposeRunner {
#[derive(Serialize, Deserialize, Debug)]
struct DockerCompose {
version: String,
services: HashMap<String, Service>,
services: HashMap<String, serde_json::value::Value>,
networks: HashMap<String, Network>,
}

Expand Down
10 changes: 10 additions & 0 deletions simulator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ impl ServiceConfig {
pub fn entrypoints(&self) -> PathBuf {
self.scratch_dir.join("entrypoints")
}
pub fn grafana(&self) -> PathBuf {
self.scratch_dir.join("grafana")
}
pub fn prometheus(&self) -> PathBuf {
self.scratch_dir.join("prometheus")
}

pub fn options(&self) -> PathBuf {
self.scratch_dir.join("options")
}
}
34 changes: 30 additions & 4 deletions simulator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ impl StatusCommand {
let config = config::ServiceConfig::new(DEFAULT_SUBNET.to_string())?;
let compose = ComposeRunner::load_from_fs(&config)?;

compose.execute_compose_command(&["ps"])
compose.execute_compose_command(&["ps"])?;
Ok(())
}
}

Expand Down Expand Up @@ -147,6 +148,7 @@ enum NodeCommand {
Add(AddCommand),
Start(NodeStartCommand),
Stop(NodeStopCommand),
Info(NodeInfoCommand),
Logs(NodeLogsCommand),
Exec(NodeExecCommand),
Status(StatusCommand),
Expand All @@ -164,19 +166,25 @@ impl NodeCommand {
NodeCommand::Logs(a) => a.run(compose),
NodeCommand::Exec(a) => a.run(compose),
NodeCommand::Status(a) => a.run(),
NodeCommand::Info(a) => a.run(compose),
}
}
}

#[derive(Parser)]
struct AddCommand;
struct AddCommand {
#[clap(short, long)]
pub delay: Option<u16>,
#[clap(short, long)]
pub loss: Option<u16>,
}

impl AddCommand {
fn run(self) -> Result<()> {
let config = config::ServiceConfig::new(DEFAULT_SUBNET.to_string())?;
let mut sim = Simulator::new(config)?;
let next_node_index = sim.next_node_index();
sim.add_node(next_node_index)?;
sim.add_node(next_node_index, self.delay, self.loss)?;
sim.finalize()?;

println!("Added node-{}", next_node_index);
Expand Down Expand Up @@ -232,6 +240,24 @@ struct NodeExecCommand {

impl NodeExecCommand {
fn run(self, compose: ComposeRunner) -> Result<()> {
compose.exec_command(self.node_index, &self.cmd, self.args)
compose.exec_command(self.node_index, &self.cmd, self.args)?;
Ok(())
}
}

#[derive(Parser)]
struct NodeInfoCommand {
#[clap(short, long)]
node_index: usize,
}

impl NodeInfoCommand {
fn run(self, compose: ComposeRunner) -> Result<()> {
let output = compose.node_info(self.node_index)?;
println!(
"Node {} artificial delay: {} ms and packet loss: {}% ",
self.node_index, output.delay, output.packet_loss
);
Ok(())
}
}
26 changes: 25 additions & 1 deletion simulator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::PathBuf;
use std::process::Command;

use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};

use crate::compose::{Service, ServiceNetwork};
use crate::config::ServiceConfig;
Expand All @@ -14,10 +15,22 @@ pub struct Node {
pub port: u16,
pub dht_value: serde_json::Value,
pub key: String,
pub options: Option<NodeOptions>,
}

#[derive(Serialize, Deserialize, Default, Debug)]
pub struct NodeOptions {
pub delay: u16,
pub packet_loss: u16,
}

impl Node {
pub fn init_from_cli(ip: Ipv4Addr, port: u16, index: usize) -> Result<Self> {
pub fn init_from_cli(
ip: Ipv4Addr,
port: u16,
index: usize,
options: Option<NodeOptions>,
) -> Result<Self> {
let private_key = hex::encode(rand::random::<[u8; 32]>());
let output = Command::new("cargo")
.arg("run")
Expand All @@ -43,6 +56,7 @@ impl Node {
dht_value,
port,
key: private_key,
options,
})
}

Expand All @@ -56,6 +70,10 @@ impl Node {
"{}:/app/global-config.json",
service_config.global_config_path().to_string_lossy()
),
format!(
"./options/node-{}_options.json:/options/options.json",
self.index
),
format!(
"{}:/app/logs:rw",
self.logs_dir(service_config).to_string_lossy()
Expand Down Expand Up @@ -92,6 +110,12 @@ impl Node {
.join(format!("node-{}_entrypoint.sh", self.index))
}

pub fn options_path(&self, service_config: &ServiceConfig) -> PathBuf {
service_config
.options()
.join(format!("node-{}_options.json", self.index))
}

pub fn run_command(&self) -> String {
format!(
"run {ip}:{node_port} --key {key} --global-config /app/global-config.json",
Expand Down
Loading

0 comments on commit 51b4b86

Please sign in to comment.