Skip to content

Commit

Permalink
Add files from deib-polimi/noir repo
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jan 19, 2023
0 parents commit c9c0c91
Show file tree
Hide file tree
Showing 21 changed files with 3,256 additions and 0 deletions.
5 changes: 5 additions & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
config.yaml
results*
temp
plots/target
__pycache__
101 changes: 101 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Tools for running the benchmarks

`benchmark.py` is able to automate most of the steps required to run a benchmark and collect the results.
In particular it does automatically the following:

- Testing the connection to the workers, measuring ping and collecting info about their CPU
- Compile the program on a worker node ensuring the correct native optimizations
- Sync the executable to all the worker nodes
- Set up the host files
- Run a warmup execution to cache the dataset in RAM
- Run the benchmark N times collecting the outputs in handy json files

This script supports MPI projects compiled with cmake, RStream examples and Flink projects with maven.

Where `benchmark.py` really shines is automating the benchmark varying hyperparameters:
in the configuration file you can define a set of hyperparameter and their possible values, the script will enumerate all the possible combinations and run the benchmarks on them.

The 2 required hyperparameters are:

- `num_hosts`: the number of hosts to use (the values should not be grater than the number of known hosts)
- `procs_per_host`: the number of processes (slots) for each host
- When using RStream it's the number of slots in the hostfile (ignoring source and sink, they're added automatically)
- When using MPI or Flink it's the number of slots for that worker

You can use `--verbose` to see all the details of what's happening under the hoods.

## Example Usage

All the commands should be executed inside `tools/`, running elsewhere needs changes in the arguments.

Before running `benchmark.py` copy `example.config.yaml` in `config.yaml` and tune the parameters (e.g. setting the list of known hosts, hyperparameters, ...).

Try running `./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 1 -d ~/rust_stream/data/gutenberg.txt` against this configuration file:
```yaml
known_hosts:
- localhost
results_file: "./results/{experiment}/{system}/{num_hosts}-hosts-{procs_per_host}-procs-{run}.json"
# [...]
warmup: true
num_runs: 5
hyperparameters:
num_hosts: [1]
procs_per_host: [1, 2, 4, 8]
```
This is a possible content of one results file (`num_hosts-1-procs_per_host-1-run-0.json`):
```json
{
"hosts": [
"localhost"
],
"result": {
"stdout": "total:1673354595\ncsv:0\nnetwork:397\n",
"stderr": "[ 0/ 0] has interval 0 - 104857600\n[ 0/ 0] has interval 0 - 104857600 -- done\n"
},
"system": "mpi",
"run": 0,
"experiment": "experiment_name",
"num_hosts": 1,
"procs_per_host": 1
}
```

### Running an MPI benchmark

```bash
./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 8 -d ~/rust_stream/data/gutenberg.txt
```

- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder.
- `mpi` tell the script you are running an MPI benchmark.
- `../mpi/wordcount/` is the path where the benchmark is stored (in the master node). There should be a `CMakeLists.txt` inside.
- `main` is the name of the executable produced by compiling the benchmark.
- `--` what follows are arguments passed to the executable run with `mpirun`.
- `-T 8` tells the benchmark to use 8 threads. You can also add hyperparameters in `config.yaml` and use them in the command line arguments wrapping them in curly braces (e.g `-T {threads}`).

### Running a RStream example

```bash
./benchmark.py experiment_name rstream .. wordcount_p -- ~/rust_stream/data/gutenberg.txt
```

- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder.
- `rstream` tell the script you are running a RStream benchmark.
- `..` is the path where the RStream project is cloned (`..` since we are inside `tools/`).
- `wordcount_p` is the name of the example to run.
- `--` what follows are arguments passed to the executable.

### Running a Flink example

Remember to set `flink_base_path` inside `config.yaml` to the correct path and to copy flink on all the hosts, including the workers.

```bash
./benchmark.py experiment_name flink ../flink/WordCount wordCount-1.0.jar -- -input ~/rust_stream/data/gutenberg.txt
```

- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder.
- `flink` tell the script you are running a Flink benchmark.
- `../flink/WordCount` is the path where the Flink project is stored. There should be a `pom.xml` inside.
- `wordCount-1.0.jar` is the name of the jar file produced by `mvn package`.
- `--` what follows are arguments passed to the executable.
182 changes: 182 additions & 0 deletions benchmarks/aggregate_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python3

import argparse
import csv
import sys
from typing import Dict


class Experiment:
def __init__(self, name):
self.name = name
self.times = {}

def add(self, row):
hosts = int(row["num_hosts"])
cores = int(row["procs_per_host"])
time = float(row["time"])
self.times.setdefault((hosts, cores), []).append(time)

def row_ids(self):
return set(self.times.keys())

def get_data(self, row_id):
if row_id not in self.times:
return None
data = self.times[row_id]
delta = (max(data) - min(data)) / 2
avg = sum(data) / len(data)
return avg, delta

def get_row(self, row_id, format):
if row_id not in self.times:
if format in {"human", "latex"}:
return [""]
else:
return ["", ""]
avg, delta = self.get_data(row_id)
if format == "human":
return [f"{avg:.2f}s (± {delta:.2f}s)"]
elif format == "latex":
return [f"\sipm{{{avg:.2f}}}{{{delta:.2f}}}"]
else:
return [str(avg), str(delta)]


class System:
def __init__(self, system):
self.system = system
self.experiments = {} # type: Dict[str, Experiment]

def add(self, row):
exp = row["experiment"]
if exp not in self.experiments:
self.experiments[exp] = Experiment(exp)
self.experiments[exp].add(row)

def header(self, experiment, single_experiment, format):
if experiment not in self.experiments:
return []

if single_experiment:
if format in {"human", "latex"}:
return [self.system]
else:
return [f"{self.system} (s)", f"{self.system} (± s)"]
else:
if format in {"human", "latex"}:
return [f"{experiment} ({self.system})"]
else:
return [f"{experiment} ({self.system}) ({h}s)" for h in ["", "± "]]

def get_row(self, experiment, row_id, format):
if experiment not in self.experiments:
return []
return self.experiments[experiment].get_row(row_id, format)

def get_experiments(self):
return set(self.experiments.keys())

def row_ids(self):
ids = set()
for exp in self.experiments.values():
ids |= exp.row_ids()
return ids


class RStream1(System):
def __init__(self):
System.__init__(self, "rstream1")


class RStream2(System):
def __init__(self):
System.__init__(self, "rstream2")


class Flink(System):
def __init__(self):
System.__init__(self, "flink")


class MPI(System):
def __init__(self):
System.__init__(self, "mpi")


class Timely(System):
def __init__(self):
System.__init__(self, "timely")


def get_systems():
return [RStream1(), RStream2(), Flink(), MPI(), Timely()]


def parse_stdin(systems):
for row in csv.DictReader(sys.stdin):
system = row["system"]
if system == "rstream":
systems[0].add(row)
elif system == "rstream2":
systems[1].add(row)
elif system == "flink":
systems[2].add(row)
elif system == "mpi":
systems[3].add(row)
elif system == "timely":
systems[4].add(row)
else:
raise ValueError("Unsupported system: " + system)


def main(args):
systems = get_systems()
parse_stdin(systems)

experiments = set()
for system in systems:
experiments |= system.get_experiments()
experiments = list(sorted(experiments))
single_experiment = len(experiments) == 1

headers = ["hosts", "cores"]
for experiment in experiments:
for system in systems:
headers += system.header(experiment, single_experiment, args.format)

ids = set()
for system in systems:
ids |= system.row_ids()
ids = list(sorted(ids))

rows = []
for row_id in ids:
row = [str(row_id[0]), str(row_id[0] * row_id[1])]
for experiment in experiments:
for system in systems:
row += system.get_row(experiment, row_id, args.format)
rows += [row]

if args.format in {"human", "csv"}:
writer = csv.writer(sys.stdout)
writer.writerow(headers)
writer.writerows(rows)
else:
print(" & ".join(headers), end="\\\\\n")
print("\\midrule")
for row in rows:
print(" & ".join(row), end="\\\\\n")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aggregate the output of gen_csv.py")
parser.add_argument(
"--format",
"-f",
choices=["human", "csv", "latex"],
default="human",
help="Output format",
)
args = parser.parse_args()
main(args)
Loading

0 comments on commit c9c0c91

Please sign in to comment.