From c9c0c914f0fafebad2b5cf1647c9469f46aaf145 Mon Sep 17 00:00:00 2001 From: imDema Date: Thu, 19 Jan 2023 18:25:02 +0100 Subject: [PATCH] Add files from deib-polimi/noir repo --- benchmarks/.gitignore | 5 + benchmarks/README.md | 101 ++++ benchmarks/aggregate_csv.py | 182 ++++++++ benchmarks/benchmark.py | 814 +++++++++++++++++++++++++++++++++ benchmarks/example.config.yaml | 39 ++ benchmarks/gen_csv.py | 95 ++++ benchmarks/latency/.gitignore | 1 + benchmarks/latency/Cargo.toml | 14 + benchmarks/latency/src/lib.rs | 17 + benchmarks/latency/src/main.rs | 88 ++++ benchmarks/make_plot.py | 205 +++++++++ benchmarks/plots/Makefile | 92 ++++ datasets/gen_words.py | 56 +++ index.html | 19 + visualizer/css/style.css | 55 +++ visualizer/index.html | 50 ++ visualizer/js/data.js | 602 ++++++++++++++++++++++++ visualizer/js/graph.js | 117 +++++ visualizer/js/network.js | 519 +++++++++++++++++++++ visualizer/js/profiler.js | 118 +++++ visualizer/js/script.js | 67 +++ 21 files changed, 3256 insertions(+) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/README.md create mode 100755 benchmarks/aggregate_csv.py create mode 100755 benchmarks/benchmark.py create mode 100644 benchmarks/example.config.yaml create mode 100755 benchmarks/gen_csv.py create mode 100644 benchmarks/latency/.gitignore create mode 100644 benchmarks/latency/Cargo.toml create mode 100644 benchmarks/latency/src/lib.rs create mode 100644 benchmarks/latency/src/main.rs create mode 100755 benchmarks/make_plot.py create mode 100644 benchmarks/plots/Makefile create mode 100755 datasets/gen_words.py create mode 100644 index.html create mode 100644 visualizer/css/style.css create mode 100644 visualizer/index.html create mode 100644 visualizer/js/data.js create mode 100644 visualizer/js/graph.js create mode 100644 visualizer/js/network.js create mode 100644 visualizer/js/profiler.js create mode 100644 visualizer/js/script.js diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 0000000..928b993 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1,5 @@ +config.yaml +results* +temp +plots/target +__pycache__ diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..e626793 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,101 @@ +# Tools for running the benchmarks + +`benchmark.py` is able to automate most of the steps required to run a benchmark and collect the results. +In particular it does automatically the following: + +- Testing the connection to the workers, measuring ping and collecting info about their CPU +- Compile the program on a worker node ensuring the correct native optimizations +- Sync the executable to all the worker nodes +- Set up the host files +- Run a warmup execution to cache the dataset in RAM +- Run the benchmark N times collecting the outputs in handy json files + +This script supports MPI projects compiled with cmake, RStream examples and Flink projects with maven. + +Where `benchmark.py` really shines is automating the benchmark varying hyperparameters: +in the configuration file you can define a set of hyperparameter and their possible values, the script will enumerate all the possible combinations and run the benchmarks on them. + +The 2 required hyperparameters are: + +- `num_hosts`: the number of hosts to use (the values should not be grater than the number of known hosts) +- `procs_per_host`: the number of processes (slots) for each host + - When using RStream it's the number of slots in the hostfile (ignoring source and sink, they're added automatically) + - When using MPI or Flink it's the number of slots for that worker + +You can use `--verbose` to see all the details of what's happening under the hoods. + +## Example Usage + +All the commands should be executed inside `tools/`, running elsewhere needs changes in the arguments. + +Before running `benchmark.py` copy `example.config.yaml` in `config.yaml` and tune the parameters (e.g. setting the list of known hosts, hyperparameters, ...). + +Try running `./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 1 -d ~/rust_stream/data/gutenberg.txt` against this configuration file: +```yaml +known_hosts: + - localhost +results_file: "./results/{experiment}/{system}/{num_hosts}-hosts-{procs_per_host}-procs-{run}.json" +# [...] +warmup: true +num_runs: 5 +hyperparameters: + num_hosts: [1] + procs_per_host: [1, 2, 4, 8] +``` + +This is a possible content of one results file (`num_hosts-1-procs_per_host-1-run-0.json`): +```json +{ + "hosts": [ + "localhost" + ], + "result": { + "stdout": "total:1673354595\ncsv:0\nnetwork:397\n", + "stderr": "[ 0/ 0] has interval 0 - 104857600\n[ 0/ 0] has interval 0 - 104857600 -- done\n" + }, + "system": "mpi", + "run": 0, + "experiment": "experiment_name", + "num_hosts": 1, + "procs_per_host": 1 +} +``` + +### Running an MPI benchmark + +```bash +./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 8 -d ~/rust_stream/data/gutenberg.txt +``` + +- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. +- `mpi` tell the script you are running an MPI benchmark. +- `../mpi/wordcount/` is the path where the benchmark is stored (in the master node). There should be a `CMakeLists.txt` inside. +- `main` is the name of the executable produced by compiling the benchmark. +- `--` what follows are arguments passed to the executable run with `mpirun`. +- `-T 8` tells the benchmark to use 8 threads. You can also add hyperparameters in `config.yaml` and use them in the command line arguments wrapping them in curly braces (e.g `-T {threads}`). + +### Running a RStream example + +```bash +./benchmark.py experiment_name rstream .. wordcount_p -- ~/rust_stream/data/gutenberg.txt +``` + +- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. +- `rstream` tell the script you are running a RStream benchmark. +- `..` is the path where the RStream project is cloned (`..` since we are inside `tools/`). +- `wordcount_p` is the name of the example to run. +- `--` what follows are arguments passed to the executable. + +### Running a Flink example + +Remember to set `flink_base_path` inside `config.yaml` to the correct path and to copy flink on all the hosts, including the workers. + +```bash +./benchmark.py experiment_name flink ../flink/WordCount wordCount-1.0.jar -- -input ~/rust_stream/data/gutenberg.txt +``` + +- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. +- `flink` tell the script you are running a Flink benchmark. +- `../flink/WordCount` is the path where the Flink project is stored. There should be a `pom.xml` inside. +- `wordCount-1.0.jar` is the name of the jar file produced by `mvn package`. +- `--` what follows are arguments passed to the executable. \ No newline at end of file diff --git a/benchmarks/aggregate_csv.py b/benchmarks/aggregate_csv.py new file mode 100755 index 0000000..8622727 --- /dev/null +++ b/benchmarks/aggregate_csv.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 + +import argparse +import csv +import sys +from typing import Dict + + +class Experiment: + def __init__(self, name): + self.name = name + self.times = {} + + def add(self, row): + hosts = int(row["num_hosts"]) + cores = int(row["procs_per_host"]) + time = float(row["time"]) + self.times.setdefault((hosts, cores), []).append(time) + + def row_ids(self): + return set(self.times.keys()) + + def get_data(self, row_id): + if row_id not in self.times: + return None + data = self.times[row_id] + delta = (max(data) - min(data)) / 2 + avg = sum(data) / len(data) + return avg, delta + + def get_row(self, row_id, format): + if row_id not in self.times: + if format in {"human", "latex"}: + return [""] + else: + return ["", ""] + avg, delta = self.get_data(row_id) + if format == "human": + return [f"{avg:.2f}s (± {delta:.2f}s)"] + elif format == "latex": + return [f"\sipm{{{avg:.2f}}}{{{delta:.2f}}}"] + else: + return [str(avg), str(delta)] + + +class System: + def __init__(self, system): + self.system = system + self.experiments = {} # type: Dict[str, Experiment] + + def add(self, row): + exp = row["experiment"] + if exp not in self.experiments: + self.experiments[exp] = Experiment(exp) + self.experiments[exp].add(row) + + def header(self, experiment, single_experiment, format): + if experiment not in self.experiments: + return [] + + if single_experiment: + if format in {"human", "latex"}: + return [self.system] + else: + return [f"{self.system} (s)", f"{self.system} (± s)"] + else: + if format in {"human", "latex"}: + return [f"{experiment} ({self.system})"] + else: + return [f"{experiment} ({self.system}) ({h}s)" for h in ["", "± "]] + + def get_row(self, experiment, row_id, format): + if experiment not in self.experiments: + return [] + return self.experiments[experiment].get_row(row_id, format) + + def get_experiments(self): + return set(self.experiments.keys()) + + def row_ids(self): + ids = set() + for exp in self.experiments.values(): + ids |= exp.row_ids() + return ids + + +class RStream1(System): + def __init__(self): + System.__init__(self, "rstream1") + + +class RStream2(System): + def __init__(self): + System.__init__(self, "rstream2") + + +class Flink(System): + def __init__(self): + System.__init__(self, "flink") + + +class MPI(System): + def __init__(self): + System.__init__(self, "mpi") + + +class Timely(System): + def __init__(self): + System.__init__(self, "timely") + + +def get_systems(): + return [RStream1(), RStream2(), Flink(), MPI(), Timely()] + + +def parse_stdin(systems): + for row in csv.DictReader(sys.stdin): + system = row["system"] + if system == "rstream": + systems[0].add(row) + elif system == "rstream2": + systems[1].add(row) + elif system == "flink": + systems[2].add(row) + elif system == "mpi": + systems[3].add(row) + elif system == "timely": + systems[4].add(row) + else: + raise ValueError("Unsupported system: " + system) + + +def main(args): + systems = get_systems() + parse_stdin(systems) + + experiments = set() + for system in systems: + experiments |= system.get_experiments() + experiments = list(sorted(experiments)) + single_experiment = len(experiments) == 1 + + headers = ["hosts", "cores"] + for experiment in experiments: + for system in systems: + headers += system.header(experiment, single_experiment, args.format) + + ids = set() + for system in systems: + ids |= system.row_ids() + ids = list(sorted(ids)) + + rows = [] + for row_id in ids: + row = [str(row_id[0]), str(row_id[0] * row_id[1])] + for experiment in experiments: + for system in systems: + row += system.get_row(experiment, row_id, args.format) + rows += [row] + + if args.format in {"human", "csv"}: + writer = csv.writer(sys.stdout) + writer.writerow(headers) + writer.writerows(rows) + else: + print(" & ".join(headers), end="\\\\\n") + print("\\midrule") + for row in rows: + print(" & ".join(row), end="\\\\\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Aggregate the output of gen_csv.py") + parser.add_argument( + "--format", + "-f", + choices=["human", "csv", "latex"], + default="human", + help="Output format", + ) + args = parser.parse_args() + main(args) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py new file mode 100755 index 0000000..6788d10 --- /dev/null +++ b/benchmarks/benchmark.py @@ -0,0 +1,814 @@ +#!/usr/bin/env python3 + +import abc +import argparse +import json +import logging +import math +import os.path +import shlex +import subprocess +import sys +import tempfile +import time +import gc + +import coloredlogs +import paramiko +import ruamel.yaml + +logger = logging.getLogger("benchmark") +verbosity = 0 +capture_output = False +print_output = False + +class Benchmark(abc.ABC): + @abc.abstractmethod + def prepare(self, args): + """Prepare the solution (e.g. compiling it if needed)""" + + def setup_hyp(self, hyperparameters): + """Setup the system for the selected hyperparameters, for example + raising up the cluster if needed. + """ + + @abc.abstractmethod + def run(self, args, hyperparameters, run_index): + """Run the benchmark of the prepared solution""" + + +class MPI(Benchmark): + exec_path = "/tmp/mpibin" + mpi_extra_args = [ + "-oversubscribe", + "-mca", + "btl_base_warn_component_unused", + "0", + "--map-by", + # "NUMA:PE=4", + "hwthread", + # "node", + "-display-map", + "--mca", + "mpi_yield_when_idle", + "1", + ] + + def __init__(self, config): + self.config = config + + def prepare(self, args): + logger.info("Preparing MPI") + compiler_host = self.config["known_hosts"][0] + source = args.directory + if not source.endswith("/"): + source += "/" + + logger.info("Copying source files from %s to %s", source, compiler_host) + compilation_dir = self.config["compilation_dir"] + "/" + args.experiment + sync_on(compiler_host, source, compilation_dir) + + logger.info("Compiling using cmake") + run_on(compiler_host, "mkdir", "-p", "%s/build" % compilation_dir) + run_on( + compiler_host, + "cd %s/build && cmake .. -DCMAKE_BUILD_TYPE=Release" % compilation_dir, + shell=True, + ) + run_on(compiler_host, "cd %s/build && make -j 8" % compilation_dir, shell=True) + run_on( + compiler_host, + "cd %s/build && ls -lah && file -E ./%s" % (compilation_dir, args.bin_name), + shell=True, + ) + logger.info( + "Executable compiled at %s/build/%s on %s" + % (compilation_dir, args.bin_name, compiler_host) + ) + + logger.info("Copying executable to this machine") + local_path = "%s/%s/mpi/%s" % ( + self.config["temp_dir"], + args.experiment, + args.bin_name, + ) + base = os.path.dirname(local_path) + os.makedirs(base, exist_ok=True) + sync_from( + compiler_host, "%s/build/%s" % (compilation_dir, args.bin_name), local_path + ) + + logger.info("Copying executable to all the hosts") + for host in self.config["known_hosts"]: + sync_on(host, local_path, self.exec_path) + + def run(self, args, hyperparameters, run_index): + num_hosts = hyperparameters["num_hosts"] + procs_per_host = hyperparameters["procs_per_host"] + with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: + hostfile_content = "" + for host in self.config["known_hosts"][:num_hosts]: + hostfile_content += "%s slots=%d\n" % (host, procs_per_host) + hostfile.write(hostfile_content) + hostfile.flush() + if verbosity > 1: + logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) + + num_proc = num_hosts * procs_per_host + extra_args = map( + lambda arg: replace_vars(arg, hyperparameters), + args.extra_args, + ) + cmd = [ + "mpirun", + # "-x", + # "LD_PRELOAD=/usr/bin/libmimalloc.so", + "-np", + str(num_proc), + "-hostfile", + hostfile.name, + *self.mpi_extra_args, + self.exec_path, + *extra_args, + ] + return run_benchmark(cmd) + + +class RStream(Benchmark): + exec_path = "/tmp/rstreambin" + compilation_suffix = "rstream" + + def __init__(self, config): + self.config = config + + def prepare(self, args): + logger.info("Preparing RStream") + compiler_host = self.config["known_hosts"][0] + source = args.directory + if not source.endswith("/"): + source += "/" + + logger.info("Copying source files from %s to %s", source, compiler_host) + compilation_dir = self.config["compilation_dir"] + "/" + self.compilation_suffix + run_on(compiler_host, "mkdir -p %s" % compilation_dir, shell=True) + sync_on(compiler_host, source, compilation_dir) + + logger.info("Compiling using cargo") + run_on( + compiler_host, + "cd %s && cargo build --release --example %s" + % (compilation_dir, args.example), + shell=True, + ) + remote_path = "%s/target/release/examples/%s" % (compilation_dir, args.example) + logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) + + logger.info("Copying executable to this machine") + self.local_path = "%s/%s/rstream/%s" % ( + self.config["temp_dir"], + args.experiment, + args.example, + ) + base = os.path.dirname(self.local_path) + os.makedirs(base, exist_ok=True) + sync_from(compiler_host, remote_path, self.local_path) + + logger.info("Copying executable to all the hosts") + for host in self.config["known_hosts"]: + sync_on(host, self.local_path, self.exec_path) + + def run(self, args, hyperparameters, run_index): + num_hosts = hyperparameters["num_hosts"] + procs_per_host = hyperparameters["procs_per_host"] + with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: + hostfile_content = "" + for i, host in enumerate(self.config["known_hosts"][:num_hosts]): + slots = procs_per_host * args.num_steps + if i == 0: + # source process + slots += 1 + if i == num_hosts - 1: + # sink process + slots += 1 + hostfile_content += "- hostname: %s\n" % host + hostfile_content += " slots: %d\n" % slots + hostfile.write(hostfile_content) + hostfile.flush() + logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) + + extra_args = map( + lambda arg: replace_vars(arg, hyperparameters), + args.extra_args, + ) + cmd = [ + "cargo", + "run", + "--release", + "--", + "--hostfile", + hostfile.name, + self.exec_path, + *extra_args, + ] + return run_benchmark(cmd, cwd=args.directory) + + +class RStream2(RStream): + compilation_suffix = "noir" + + def run(self, args, hyperparameters, run_index): + num_hosts = hyperparameters["num_hosts"] + procs_per_host = hyperparameters["procs_per_host"] + with tempfile.NamedTemporaryFile( + "w", dir=self.config["temp_dir"] + ) as configfile: + config_content = "hosts:\n" + for i, host in enumerate(self.config["known_hosts"][:num_hosts]): + config_content += " - address: %s\n" % host + config_content += " base_port: 10000\n" + config_content += " num_cores: %d\n" % procs_per_host + config_content += " ssh:\n" + config_content += " username: ubuntu\n" + config_content += " key_file: /home/ubuntu/.ssh/id_ed25519\n" + configfile.write(config_content) + configfile.flush() + logger.debug("Config at %s:\n%s", configfile.name, config_content) + + extra_args = map( + lambda arg: replace_vars(arg, hyperparameters), + args.extra_args, + ) + cmd = [ + self.local_path, + "--remote", + configfile.name, + *extra_args, + ] + return run_benchmark(cmd) + + +class TimelyDataflow(Benchmark): + exec_path = "/tmp/timelybin" + hostfile_path = "/tmp/timely.hosts" + compilation_suffix = "timely" + port_number = 2101 + + def __init__(self, config): + self.config = config + + def prepare(self, args): + logger.info("Preparing Timely Dataflow") + compiler_host = self.config["known_hosts"][0] + source = args.directory + if not source.endswith("/"): + source += "/" + + logger.info("Copying source files from %s to %s", source, compiler_host) + compilation_dir = self.config["compilation_dir"] + "/" + self.compilation_suffix + run_on(compiler_host, "mkdir -p %s" % compilation_dir, shell=True) + sync_on(compiler_host, source, compilation_dir) + + logger.info("Compiling using cargo") + run_on( + compiler_host, + "cd %s && cargo build --release --bin %s" % (compilation_dir, args.bin), + shell=True, + ) + remote_path = "%s/target/release/%s" % (compilation_dir, args.bin) + logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) + + logger.info("Copying executable to this machine") + self.local_path = "%s/%s/timely/%s" % ( + self.config["temp_dir"], + args.experiment, + args.bin, + ) + base = os.path.dirname(self.local_path) + os.makedirs(base, exist_ok=True) + sync_from(compiler_host, remote_path, self.local_path) + + logger.info("Copying executable to all the hosts") + for host in self.config["known_hosts"]: + sync_on(host, self.local_path, self.exec_path) + + def run(self, args, hyperparameters, run_index): + num_hosts = hyperparameters["num_hosts"] + hosts = self.config["known_hosts"][:num_hosts] + procs_per_host = hyperparameters["procs_per_host"] + with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: + hostfile_content = "" + for host in hosts: + hostfile_content += "%s:%d\n" % (host, self.port_number) + hostfile.write(hostfile_content) + hostfile.flush() + logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) + + logger.info("Copying hostfile to all the hosts") + for host in hosts: + sync_on(host, hostfile.name, self.hostfile_path) + + extra_args = " ".join( + map( + lambda arg: replace_vars(arg, hyperparameters), + args.extra_args, + ) + ) + + bash = "" + for i, host in enumerate(hosts): + bash += f"ssh {host} {self.exec_path} {extra_args} -w {procs_per_host} -n {num_hosts} -p {i} -h {self.hostfile_path} &\n" + bash += f"wait\n" + + cmd = ["bash", "-c", bash] + return run_benchmark(cmd, cwd=args.directory) + + +class Flink(Benchmark): + def __init__(self, config): + self.config = config + self.flink = self.config["flink_base_path"] + + def prepare(self, args): + logger.info("Preparing Flink") + compiler_host = self.config["known_hosts"][0] + source = args.directory + if not source.endswith("/"): + source += "/" + + logger.info("Copying source files from %s to %s", source, compiler_host) + compilation_dir = self.config["compilation_dir"] + sync_on(compiler_host, source, compilation_dir) + + logger.info("Compiling using mvn") + run_on( + compiler_host, + "cd %s && mvn package" % compilation_dir, + shell=True, + ) + run_on( + compiler_host, + "cd %s/target && ls -lah && file -E ./%s" + % (compilation_dir, args.jar_name), + shell=True, + ) + remote_path = "%s/target/%s" % (compilation_dir, args.jar_name) + logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) + + logger.info("Copying executable to this machine") + local_path = "%s/%s/flink/%s" % ( + self.config["temp_dir"], + args.experiment, + args.jar_name, + ) + base = os.path.dirname(local_path) + os.makedirs(base, exist_ok=True) + sync_from(compiler_host, remote_path, local_path) + + def setup_hyp(self, hyperparameters): + logger.info("Stopping the cluster") + subprocess.run([self.flink + "/bin/stop-cluster.sh"], check=True) + + logger.info("Setting up ./conf/workers file") + workers = self.flink + "/conf/workers" + num_hosts = hyperparameters["num_hosts"] + with open(workers, "w") as f: + for host in self.config["known_hosts"][:num_hosts]: + f.write(host + "\n") + + logger.info("Starting the cluster") + subprocess.run([self.flink + "/bin/start-cluster.sh"], check=True) + + def run(self, args, hyperparameters, run_index): + num_hosts = hyperparameters["num_hosts"] + procs_per_host = hyperparameters["procs_per_host"] + parallelism = num_hosts * procs_per_host + local_path = "%s/%s/flink/%s" % ( + self.config["temp_dir"], + args.experiment, + args.jar_name, + ) + extra_args = map( + lambda arg: replace_vars(arg, hyperparameters), + args.extra_args, + ) + cmd = [ + self.flink + "/bin/flink", + "run", + "-p", + str(parallelism), + local_path, + *extra_args, + ] + return run_benchmark(cmd) + + +def run_benchmark(cmd, cwd=None): + logger.debug("Command: %s", shlex.join(cmd)) + + gc.disable() + start = time.perf_counter_ns() + + out = subprocess.run( + cmd, + cwd=cwd, + capture_output=capture_output and not print_output, + stdout=None if capture_output or print_output else subprocess.DEVNULL, + stderr=None if capture_output or print_output else subprocess.DEVNULL, + ) + + end = time.perf_counter_ns() + gc.enable() + + dt = end-start + + if capture_output and not print_output: + stdout = out.stdout.decode() + stderr = out.stderr.decode() + else: + stdout = "" + stderr = "" + + if out.returncode != 0: + raise RuntimeError( + "benchmark failed!\nExit code: %d\nStdout:\n%s\nStderr:\n%s" + % (out.returncode, stdout, stderr) + ) + return { + "stdout": stdout, + "stderr": stderr, + "time": dt / 1e9 + } + + +def run_on(host, *cmd, shell=False): + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(host) + + if shell: + assert len(cmd) == 1 + cmd = "bash -l -c %s" % shlex.quote(cmd[0]) + else: + cmd = shlex.join(cmd) + logger.debug("Remote on %s: %s", host, cmd) + stdin, stdout, stderr = client.exec_command(cmd) + exit_code = stdout.channel.recv_exit_status() + stdout = stdout.read().decode() + stderr = stderr.read().decode() + if exit_code != 0: + raise RuntimeError( + "Remote command failed on host %s: %s\nStdout:\n%s\nStderr:\n%s" + % (host, cmd, stdout, stderr) + ) + return stdout + stderr + + +def sync_on(host, local_path, remote_path): + remote = host + ":" + remote_path + logger.debug("rsync: %s -> %s", local_path, remote) + subprocess.run( + [ + "rsync", + "-a", + "--copy-links", + "--exclude", + "target", + "--exclude", + "build", + "--exclude", + "data", + local_path, + remote, + ], + check=True, + ) + + +def sync_from(host, remote_path, local_path): + remote = host + ":" + remote_path + logger.debug("rsync: %s -> %s", remote, local_path) + subprocess.run( + ["rsync", "-a", "--exclude", "target", remote, local_path], + check=True, + ) + + +def ping_host(host): + proc = subprocess.run( + "ping %s -c 1 -w 5" % host, shell=True, capture_output=True, check=True + ) + stdout = proc.stdout.decode().splitlines() + times = [] + for line in stdout: + if "time=" in line: + time = float(line.split("time=")[1].split(" ")[0]) + times += [time] + return sum(times) / len(times) + + +def test_hosts(config): + logger.info("Testing hosts connection:") + for host in config["known_hosts"]: + try: + model_name = run_on(host, 'lscpu | grep "Model name"', shell=True) + model_name = model_name.split(":")[1].strip() + ping = ping_host(host) + logger.info(" - %s: %s (%.3fms)", host, model_name, ping) + except: + logger.info(" - %s: FAILED", host) + + +def hyperparameter_generator(hyperparameters): + """Yield all the possibile combinations of hyperparameters""" + if isinstance(hyperparameters, dict): + yield from hyperparameter_generator(list(hyperparameters.items())) + return + if not hyperparameters: + yield {} + return + head, *rest = hyperparameters + name, values = head + for value in values: + for next_hyp in hyperparameter_generator(rest): + yield {name: value, **next_hyp} + + +def sanity_check(config): + hyperparameters = config["hyperparameters"] + for name, values in hyperparameters.items(): + if not isinstance(values, list): + raise ValueError("Invalid hyperparameter: %s. It should be a list" % (name)) + if not values: + raise ValueError( + "Invalid hyperparameter: %s. An empty list will void all the " + "combinations, please comment it if you don't want to use that hyperparameter." + % (name) + ) + num_hosts = len(config["known_hosts"]) + max_hosts = max(hyperparameters["num_hosts"]) + if max_hosts > num_hosts: + raise ValueError( + "Invalid hyperparameter: num_hosts. Cannot use %d hosts, %d known" + % (max_hosts, num_hosts) + ) + if len(config["known_hosts"]) != len(set(config["known_hosts"])): + raise ValueError( + "known_hosts contains duplicated host names, this is not supported by MPI" + ) + + +def replace_vars(pattern, vars): + for name, value in vars.items(): + pattern = pattern.replace("{" + name + "}", str(value)) + return pattern + +def run_file_name(hyp): + return "-".join( + [ + "{}-{}".format(name, value).replace("/", "_") + for name, value in hyp.items() + ] + ) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "experiment", + help="Name of the experiment, used for naming the results file", + ) + parser.add_argument( + "--config", + help="Path to the config.yaml file", + default="config.yaml", + ) + parser.add_argument( + "--overwrite", + help="Overwrite the results instead of skipping", + default=False, + action="store_true", + ) + parser.add_argument( + "--capture-output", + help="Save stdout and stderr", + default=False, + action="store_true", + ) + parser.add_argument( + "--print-output", + help="Save stdout and stderr", + default=False, + action="store_true", + ) + parser.add_argument( + "--verbose", + "-v", + help="Log more output", + default=0, + action="count", + ) + + subparsers = parser.add_subparsers( + required=True, help="System to test", dest="system" + ) + + mpi = subparsers.add_parser("mpi") + mpi.add_argument( + "directory", + help="Local path to the folder with the MPI project", + ) + mpi.add_argument( + "bin_name", + help="Name of the binary file produced by cmake", + ) + mpi.add_argument( + "extra_args", + help="Additional arguments to pass to the executable", + nargs="*", + ) + + rstream = subparsers.add_parser("rstream") + rstream.add_argument( + "directory", + help="Local path to the folder with the RStream repo", + ) + rstream.add_argument( + "example", + help="Name of the example to compile and run", + ) + rstream.add_argument( + "num_steps", + help="Number of steps of the pipeline: the number of processes to spawn is adjusted based on this.", + type=int, + ) + rstream.add_argument( + "extra_args", + help="Additional arguments to pass to the executable", + nargs="*", + ) + + rstream2 = subparsers.add_parser("rstream2") + rstream2.add_argument( + "directory", + help="Local path to the folder with the RStream2 repo", + ) + rstream2.add_argument( + "example", + help="Name of the example to compile and run", + ) + rstream2.add_argument( + "extra_args", + help="Additional arguments to pass to the executable", + nargs="*", + ) + + flink = subparsers.add_parser("flink") + flink.add_argument( + "directory", + help="Local path to the folder with the flink project", + ) + flink.add_argument( + "jar_name", + help="Name of the jar generated by 'mvn package'", + ) + flink.add_argument( + "extra_args", + help="Additional arguments to pass to the executable", + nargs="*", + ) + + timely = subparsers.add_parser("timely") + timely.add_argument( + "directory", + help="Local path to the folder with the Timely benchmark crate", + ) + timely.add_argument( + "bin", + help="Name of the bin to compile and run", + ) + timely.add_argument( + "extra_args", + help="Additional arguments to pass to the executable", + nargs="*", + ) + + args = parser.parse_args() + + verbosity = args.verbose + capture_output = args.capture_output + print_output = args.print_output + coloredlogs.install( + level="DEBUG" if args.verbose > 0 else "INFO", + milliseconds=True, + logger=logger, + fmt="%(asctime)s,%(msecs)03d %(levelname)s %(message)s", + ) + + logger.info("Using configuration file: %s", args.config) + with open(args.config) as f: + config = ruamel.yaml.safe_load(f) + + try: + sanity_check(config) + except ValueError as e: + logger.fatal(e.args[0]) + sys.exit(1) + + hyperparameters = config["hyperparameters"] + num_tests = math.prod([len(hyp) for hyp in hyperparameters.values()]) + logger.info("- %d hyperparameters", len(hyperparameters)) + logger.info("- %d total combinations", num_tests) + + if args.system == "mpi": + runner = MPI(config) + elif args.system == "rstream": + runner = RStream(config) + elif args.system == "rstream2": + runner = RStream2(config) + elif args.system == "flink": + runner = Flink(config) + elif args.system == "timely": + runner = TimelyDataflow(config) + else: + raise ValueError("%s is not supported yet" % args.system) + + test_hosts(config) + + logger.info("Preparation step") + runner.prepare(args) + + for hyp_index, hyp in enumerate(hyperparameter_generator(hyperparameters)): + logger.info("-" * 80) + logger.info("Current hyperparameters (%d / %d):" % (hyp_index + 1, num_tests)) + for name, value in hyp.items(): + logger.info(" - %s: %s", name, value) + logger.info("Running on:") + for host in config["known_hosts"][: hyp["num_hosts"]]: + logger.info(" - %s", host) + logger.info("Setting up...") + runner.setup_hyp(hyp) + + # Check completion for warmup + complete = True + for run_index in range(config["num_runs"]): + metadata = { + "system": args.system, + "run": run_index, + "experiment": args.experiment, + **hyp, + } + file_name = run_file_name(hyp) + results_dir = replace_vars(config["results_dir"], metadata) + + file_name += "-run-" + str(run_index) + ".json" + results_file = results_dir + "/" + file_name + if not os.path.exists(results_file): + complete = False + break + + if (not complete or args.overwrite) and config["warmup"]: + logger.info("Running a warmup run to cache the input") + try: + result = runner.run(args, hyp, None) + logger.info("Done in %.3f. Est: %.3f", result["time"], result["time"] * config["num_runs"]) + except: + logger.exception("Execution failed!", exc_info=True) + + # Run experiments + for run_index in range(config["num_runs"]): + metadata = { + "system": args.system, + "run": run_index, + "experiment": args.experiment, + **hyp, + } + file_name = run_file_name(hyp) + results_dir = replace_vars(config["results_dir"], metadata) + + file_name += "-run-" + str(run_index) + ".json" + results_file = results_dir + "/" + file_name + logger.info( + "Run %d / %d: %s", run_index + 1, config["num_runs"], results_file + ) + basedir = os.path.dirname(results_file) + os.makedirs(basedir, exist_ok=True) + if os.path.exists(results_file): + if args.overwrite: + logger.warning("Overwriting previous results!") + else: + logger.warning("Results already present, skipping!") + continue + + try: + result = runner.run(args, hyp, run_index) + result = { + "hosts": config["known_hosts"][: hyp["num_hosts"]], + "result": result, + **metadata, + } + except: + logger.exception("Execution failed!", exc_info=True) + else: + with open(results_file, "w") as f: + json.dump(result, f, indent=4) + f.write("\n") diff --git a/benchmarks/example.config.yaml b/benchmarks/example.config.yaml new file mode 100644 index 0000000..72c2ecc --- /dev/null +++ b/benchmarks/example.config.yaml @@ -0,0 +1,39 @@ +# The list of known workers +known_hosts: + - StreamWorker1 + - StreamWorker2 + - StreamWorker3 + - StreamWorker4 + +# Name of the directory where to place the execution results +results_dir: "./results/{experiment}/{system}" +# Directory where to put temporary files locally +temp_dir: ./temp +# Directory to use for compiling the binaries in the first worker +compilation_dir: /tmp/compilation +# Directory where flink is installed (with bin/ and conf/ inside) +# It should be the same in the master node as well as in the workers +flink_base_path: /home/ubuntu/flink + +# Whether to do an extra run (for each hyperparameter configuration) ignoring +# the results +warmup: true +# Number of runs from which data is collected +num_runs: 5 + +# List of hyper parameters to try. All the combinations of values will be tried. +hyperparameters: + # Number of hosts to run the benchmark on + num_hosts: [4, 3, 2, 1] + # When using RStream this is the number of "slots" in the hostfile (sources + # and sinks are added automatically). + # In Flink and MPI it's the number of slots assigned to each worker. + procs_per_host: [8] + + # You can add more hyperparameters here! + + # Example: an MPI/OpenMP implementation may spawn "threads_per_proc" threads + # for each process, taking this parameter as command line argument. You can + # use, for example, `-T {threads_per_proc}` as extra arguments to tell the + # program how many threads it is allowed to use. + # threads_per_proc: [4, 8] diff --git a/benchmarks/gen_csv.py b/benchmarks/gen_csv.py new file mode 100755 index 0000000..3a7a278 --- /dev/null +++ b/benchmarks/gen_csv.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 + +import argparse +import csv +import glob +import json +import logging +import re +import sys + +import coloredlogs + +logger = logging.getLogger("gen_csv") + +regex = re.compile(r"(?:(?P(?:timens|events)):)?(?P[a-zA-Z0-9-_]+):(?P[\d.]+)") + + +# def parse_lines(content): +# result = {} +# for line in content.splitlines(): +# match = regex.search(line) +# if not match: +# continue +# typ = match.group("type") +# name = match.group("name") +# dur = match.group("amount") +# if typ == "events": +# continue +# result[name] = dur +# return result + + +def extract_dict(path): + with open(path) as f: + content = json.load(f) + result = { + name: value + for name, value in content.items() + if name not in ["hosts", "result"] + } + result.update({"time": content["result"]["time"]}) + # result.update(parse_lines(content["result"]["stdout"])) + # result.update(parse_lines(content["result"]["stderr"])) + return result + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--verbose", help="Verbose output", action="store_true") + parser.add_argument("results_dir", help="Directory where the results are stored", nargs="+") + parser.add_argument( + "-o", + help="CSV file where to write the results. - for stdout", + dest="output_file", + default="-", + nargs="?", + ) + + args = parser.parse_args() + + coloredlogs.install( + "INFO" if args.verbose else "WARNING", + milliseconds=True, + logger=logger, + fmt="%(asctime)s,%(msecs)03d %(levelname)s %(message)s", + ) + + files = [] + for path in args.results_dir: + files += list(glob.glob(path + "/**/*.json", recursive=True)) + logger.info("%d JSON found", len(files)) + + if not files: + logger.fatal("No JSON found!") + sys.exit(1) + + result = [] + columns = [] + + for path in sorted(files): + logger.info("Processing %s", path) + file_data = extract_dict(path) + for key in file_data.keys(): + if key not in columns: + columns.append(key) + result += [file_data] + + if args.output_file == "-": + output_file = sys.stdout + else: + output_file = open(args.output_file, "w") + writer = csv.DictWriter(output_file, columns) + writer.writeheader() + writer.writerows(result) + output_file.close() diff --git a/benchmarks/latency/.gitignore b/benchmarks/latency/.gitignore new file mode 100644 index 0000000..03314f7 --- /dev/null +++ b/benchmarks/latency/.gitignore @@ -0,0 +1 @@ +Cargo.lock diff --git a/benchmarks/latency/Cargo.toml b/benchmarks/latency/Cargo.toml new file mode 100644 index 0000000..fe7822a --- /dev/null +++ b/benchmarks/latency/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "latency" +version = "0.1.0" +edition = "2018" + +[lib] +proc-macro = true + +[dependencies] +noir = { path = "../../.." } + +[[bin]] +path = "src/main.rs" +name = "latency" \ No newline at end of file diff --git a/benchmarks/latency/src/lib.rs b/benchmarks/latency/src/lib.rs new file mode 100644 index 0000000..303e4f6 --- /dev/null +++ b/benchmarks/latency/src/lib.rs @@ -0,0 +1,17 @@ +extern crate proc_macro; +use proc_macro::TokenStream; + +#[proc_macro] +pub fn repeat(args: TokenStream) -> TokenStream { + let args = args.to_string(); + let (num, what) = args.split_once(",").unwrap(); + let num: usize = num.parse().unwrap(); + + let (base, what) = what.split_once(",").unwrap(); + let mut res = base.to_string(); + for _ in 0..num { + res += "."; + res += what; + } + res.parse().unwrap() +} diff --git a/benchmarks/latency/src/main.rs b/benchmarks/latency/src/main.rs new file mode 100644 index 0000000..a6dde26 --- /dev/null +++ b/benchmarks/latency/src/main.rs @@ -0,0 +1,88 @@ +use std::time::{Duration, SystemTime}; + +use noir::operator::source::ParallelIteratorSource; +use noir::{BatchMode, EnvironmentConfig, StreamEnvironment}; + +use latency::repeat; + +fn main() { + let (config, args) = EnvironmentConfig::from_args(); + if args.len() != 5 { + panic!( + "\n\nUsage: num_items items_per_sec batch_size batch_timeout num_threads\n\ + - num_items: number of items to put in the stream\n\ + - items_per_sec: number of items to generate per second (0 = as fast as possible)\n\ + - batch_size: the size of the BatchMode::Fixed or BatchMode::Adaptive\n\ + - batch_timeout: 0 => BatchMode::Fixed, n > 0 => BatchMode::Adaptive (in ms)\n\ + - num_threads: number of generating threads\n\n" + ); + } + + let num_items: usize = args[0].parse().expect("invalid num_items"); + let items_per_sec: u64 = args[1].parse().expect("invalid items_per_sec"); + let batch_size: usize = args[2].parse().expect("invalid batch_size"); + let batch_timeout: u64 = args[3].parse().expect("invalid batch_timeout"); + let num_threads: usize = args[4].parse().expect("invalid num_threads"); + + assert!(num_items >= 1, "num_items must be at least 1"); + assert!(num_threads >= 1, "num_threads must be at least 1"); + + let batch_mode = if batch_timeout == 0 { + BatchMode::fixed(batch_size) + } else { + BatchMode::adaptive(batch_size, Duration::from_millis(batch_timeout)) + }; + + let mut env = StreamEnvironment::new(config); + env.spawn_remote_workers(); + + let source = ParallelIteratorSource::new(move |id, _num_replica| { + let iter = if id < num_threads { + let to_generate = num_items / num_threads; + (to_generate * id)..(to_generate * (id + 1)) + } else { + 0..0 + }; + iter.map(move |i| { + if items_per_sec > 0 { + std::thread::sleep(Duration::from_micros( + 1_000_000 * num_threads as u64 / items_per_sec, + )); + } + i + }) + }); + let stream = env.stream(source).batch_mode(BatchMode::fixed(1)); + + // first shuffle: move the items to a predictable host + let stream = stream + .group_by(|_| 0) + .map(|(_, i)| (0, i, SystemTime::now())) + .batch_mode(batch_mode) + .drop_key(); + + // n-1 shuffles to accumulate latency + let stream = repeat!( + 4, + stream, + map(|(i, n, t)| (i + 1, n, t)) + .group_by(|&(i, _, _)| i) + .drop_key() + ); + + // final shuffle back to the first host + let stream = stream + .map(|(i, n, t)| (i + 1, n, t)) + .group_by(|_| 0) + .drop_key(); + + // compute the time durations; time are accurate because it's the same host (and we ignore clock + // skews) + stream.for_each(|(i, n, start)| { + let duration = start.elapsed().expect("Clock skewed"); + // num steps,item index,latency + eprintln!("{},{},{}", i, n, duration.as_nanos()); + }); + + env.execute(); +} diff --git a/benchmarks/make_plot.py b/benchmarks/make_plot.py new file mode 100755 index 0000000..51ae6c5 --- /dev/null +++ b/benchmarks/make_plot.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 + +import argparse + +import matplotlib.pyplot as plt + +from aggregate_csv import get_systems, parse_stdin + +SYSTEM_NAMES = { + "mpi": "MPI", + "rstream1": "RStream", + "rstream2": "Noir", + "flink": "Flink", + "timely": "Timely", +} + +SYSTEM_COLORS = { + "mpi": "blue", + "rstream1": "red", + "rstream2": "black", + "flink": "orange", + "timely": "green", +} + + +def get_ids(systems): + ids = set() + for system in systems: + ids |= system.row_ids() + ids = list(sorted(ids)) + cores = [a[0] * a[1] for a in ids] + assert len(cores) == len(set(cores)), "Number of cores is not unique" + return ids, cores + + +def make_time_plot(args, systems): + ids, cores = get_ids(systems) + for system in systems: + exp = system.get_experiments() + for experiment in exp: + x = [] + times = [] + errors = [] + for num_cores, id in zip(cores, ids): + data = system.experiments[experiment].get_data(id) + if data is not None: + x.append(num_cores) + times.append(data[0]) + errors.append(data[1]) + + color = SYSTEM_COLORS[system.system] + label = SYSTEM_NAMES[system.system] + linestyle = None + if len(exp) > 1: + if experiment == args.base_experiment: + label = SYSTEM_NAMES[system.system] + elif experiment == args.extra_experiment: + label = f"{label} ({args.extra_experiment_name})" + linestyle = "--" + else: + raise RuntimeError( + f"Experiment {experiment} not passed to --extra-experiment" + ) + plt.errorbar( + x, + times, + yerr=errors, + capsize=3, + label=label, + color=color, + linestyle=linestyle, + ) + + plt.gca().set_ylim(bottom=0) + plt.xlabel("Number of cores") + plt.xticks(cores) + plt.grid() + plt.ylabel("Execution time (s)") + + +def make_scaling_plot(args, systems): + ids, cores = get_ids(systems) + for system in systems: + exp = system.get_experiments() + for experiment in exp: + baseline_t = None + baseline_c = None + for num_cores, id in zip(cores, ids): + data = system.experiments[experiment].get_data(id) + if data is not None: + baseline_t = data[0] + baseline_c = num_cores + break + + x = [] + scale = [] + error = [] + for num_cores, id in zip(cores, ids): + data = system.experiments[experiment].get_data(id) + if data is not None: + s = (1 / data[0]) / ((1 / baseline_t) / baseline_c) / cores[0] + e = s - ( + (1 / (data[0] - data[1])) + / ((1 / baseline_t) / baseline_c) + / cores[0] + ) + x.append(num_cores) + scale.append(s) + error.append(e) + + color = SYSTEM_COLORS[system.system] + label = SYSTEM_NAMES[system.system] + linestyle = None + if len(exp) > 1: + if experiment == args.base_experiment: + label = SYSTEM_NAMES[system.system] + elif experiment == args.extra_experiment: + label = f"{label} ({args.extra_experiment_name})" + linestyle = "--" + else: + raise RuntimeError( + f"Experiment {experiment} not passed to --extra-experiment" + ) + plt.errorbar( + x, + scale, + yerr=error, + capsize=3, + label=label, + color=color, + linestyle=linestyle, + ) + if args.ideal: + ideal = [c / cores[0] for c in cores] + plt.plot( + cores, + ideal, + color="black", + linestyle="--", + linewidth=0.5, + label="ideal", + ) + + plt.xlabel("Number of cores") + plt.xticks(cores) + plt.grid() + plt.ylabel(f"Speedup") + + +def main(args): + systems = get_systems() + parse_stdin(systems) + + experiments = set() + for system in systems: + experiments |= system.get_experiments() + experiments = list(sorted(experiments)) + + if args.title: + title = args.title + else: + title = " ".join(experiments) + + plt.rc("font", size=14) + plt.rc("axes", titlesize=16) + plt.rc("axes", labelsize=16) + + if args.variant == "time": + make_time_plot(args, systems) + elif args.variant == "scaling": + make_scaling_plot(args, systems) + else: + raise ValueError(f"Unknown variant: {args.variant}") + + plt.title(title) + plt.legend() + if args.output: + plt.tight_layout() + plt.savefig(args.output) + else: + plt.show() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Aggregate the output of gen_csv.py and produce a plot" + ) + parser.add_argument("variant", choices=["time", "scaling"]) + parser.add_argument("--output", "-o", help="Where to save the plot") + parser.add_argument("--title", "-t", help="Title of the plot") + parser.add_argument( + "--size", + "-s", + help="Size of the dataset (for thoughput)", + type=int, + ) + parser.add_argument("--unit", "-u", help="Unit of the size") + parser.add_argument("--ideal", help="Display the ideal line", action="store_true") + parser.add_argument("--base-experiment", help="Name of the base experiment") + parser.add_argument("--extra-experiment", help="Name of the extra experiment") + parser.add_argument( + "--extra-experiment-name", help="Name to give to the extra experiment" + ) + args = parser.parse_args() + main(args) diff --git a/benchmarks/plots/Makefile b/benchmarks/plots/Makefile new file mode 100644 index 0000000..d2069b4 --- /dev/null +++ b/benchmarks/plots/Makefile @@ -0,0 +1,92 @@ +TARGET = target +RESULTS = ../results + +EXPERIMENTS = \ + car-accidents \ + connected-components \ + enum-triangles \ + kmeans \ + kmeans-noflink \ + pagerank \ + transitive-closure \ + wordcount-gutenberg wordcount-randomwords \ + wordcount-assoc-gutenberg \ + wordcount-windowed-gutenberg wordcount-windowed-randomwords + +DEP_car-accidents := $(RESULTS)/car-accidents* +TITLE_car-accidents := "Car Accidents" +BASE_car-accidents := car-accidents +EXTRA_car-accidents := car-accidents-shared +EXTRANAME_car-accidents := shared + +DEP_connected-components := $(RESULTS)/connected-components +TITLE_connected-components := "Connected Components" +BASE_connected-components := connected-components + +DEP_enum-triangles := $(RESULTS)/enum-triangles/{flink,rstream2} $(RESULTS)/enum-triangles-new +TITLE_enum-triangles := "Enum Triangles" +BASE_enum-triangles := enum-triangles + +DEP_kmeans := $(RESULTS)/kmeans-30c-15it-200M +TITLE_kmeans := "K-Means (k = 30, h = 15, n = 10M)" +BASE_kmeans := kmeans + +DEP_kmeans-noflink := $(RESULTS)/kmeans-30c-15it-200M/{mpi,rstream*} +TITLE_kmeans-noflink := "K-Means (k = 30, h = 15, n = 10M)" +BASE_kmeans-noflink := kmeans + +DEP_pagerank := $(RESULTS)/pagerank +TITLE_pagerank := "PageRank" +BASE_pagerank := pagerank + +DEP_transitive-closure := $(RESULTS)/transitive-closure +TITLE_transitive-closure := "Transitive Closure" +BASE_transitive-closure := transitive-closure + +DEP_wordcount-gutenberg := $(RESULTS)/wordcount/{rstream2,timely} $(RESULTS)/wordcount-fixed-new +TITLE_wordcount-gutenberg := "Non-associative Wordcount (Gutenberg)" +BASE_wordcount-gutenberg := wordcount + +DEP_wordcount-randomwords := $(RESULTS)/wordcount-randomwords +TITLE_wordcount-randomwords := "Non-associative Wordcount (Randomwords)" +BASE_wordcount-randomwords := wordcount + +DEP_wordcount-assoc-gutenberg := $(RESULTS)/wordcount-assoc +TITLE_wordcount-assoc-gutenberg := "Associative Wordcount (Gutenberg)" +BASE_wordcount-assoc-gutenberg := wordcount-assoc + +DEP_wordcount-windowed-gutenberg := $(RESULTS)/wordcount-windowed/rstream2 $(RESULTS)/wordcount-windowed/mpi $(RESULTS)/wordcount-windowed/flink $(RESULTS)/wordcount-windowed-new +TITLE_wordcount-windowed-gutenberg := "Windowed Wordcount (Gutenberg)" +BASE_wordcount-windowed-gutenberg := wordcount + +DEP_wordcount-windowed-randomwords := $(RESULTS)/wordcount-windowed-randomwords/rstream2 $(RESULTS)/wordcount-windowed-randomwords/mpi $(RESULTS)/wordcount-windowed-randomwords/flink $(RESULTS)/wordcount-windowed-randomwords-fixed +TITLE_wordcount-windowed-randomwords := "Windowed Wordcount (Randomwords)" +BASE_wordcount-windowed-randomwords := wordcount + +PDF = $(EXPERIMENTS:%=$(TARGET)/%/time.pdf) $(EXPERIMENTS:%=$(TARGET)/%/scaling.pdf) + +all: $(PDF) + +$(TARGET)/%/: + mkdir -p $@ + +$(TARGET)/%/time.pdf: $(TARGET)/%/ + ../gen_csv.py $(DEP_$*) | ../make_plot.py time -o $@ \ + -t $(TITLE_$*) \ + --base-experiment "$(BASE_$*)" \ + --extra-experiment "$(EXTRA_$*)" \ + --extra-experiment-name "$(EXTRANAME_$*)" + +$(TARGET)/%/scaling.pdf: $(TARGET)/%/ + ../gen_csv.py $(DEP_$*) | ../make_plot.py scaling -o $@ \ + --ideal \ + -t $(TITLE_$*) \ + --base-experiment "$(BASE_$*)" \ + --extra-experiment "$(EXTRA_$*)" \ + --extra-experiment-name "$(EXTRANAME_$*)" + +clean: + rm -rf $(TARGET) + +.PHONY: all clean +.PRECIOUS: $(TARGET)/%/ diff --git a/datasets/gen_words.py b/datasets/gen_words.py new file mode 100755 index 0000000..8dce5a6 --- /dev/null +++ b/datasets/gen_words.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +import argparse +import collections +import random +import string +import sys + +ALPHABET = string.ascii_lowercase + + +def main(filesize, num_words, word_length, words_per_line): + words = "".join(random.choices(ALPHABET, k=num_words + word_length)) + word_distribution = collections.Counter() + + def generate_word(): + start = random.randint(0, num_words - 1) + length = word_length + word = words[start:start + length] + word_distribution.update([word]) + return word + + def gen_line(): + words = [generate_word() for _ in range(words_per_line)] + return " ".join(words) + + size = 0 + lines = 0 + while size < filesize * 1024 * 1024: + line = gen_line() + size += len(line) + 1 + lines += 1 + print(line) + + freq_histogram = collections.Counter(word_distribution.values()) + + print(f"{lines} lines", file=sys.stderr) + print(f"{len(word_distribution)} unique words", file=sys.stderr) + print("Most frequent words", word_distribution.most_common(5), file=sys.stderr) + + max_freq = freq_histogram.most_common(1)[0][1] + print("word frequency vs number of words with that frequency", file=sys.stderr) + for freq, amount in sorted(freq_histogram.items()): + length = int(amount / max_freq * 30) + print(f"{freq:2} ({amount:8} times) ", "*" * length, file=sys.stderr) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument("filesize", help="dimension of the output in MiB", type=int) + parser.add_argument("--num-words", help="average number of unique words to generate", type=int, default=100_000) + parser.add_argument("--word-length", help="number of characters per word", type=int, default=8) + parser.add_argument("--words-per-line", help="average number of words per line", type=int, default=8) + + args = parser.parse_args() + main(args.filesize, args.num_words, args.word_length, args.words_per_line) diff --git a/index.html b/index.html new file mode 100644 index 0000000..d34d378 --- /dev/null +++ b/index.html @@ -0,0 +1,19 @@ + + + + + + + + + Noir + + +

Noir

+ + + + diff --git a/visualizer/css/style.css b/visualizer/css/style.css new file mode 100644 index 0000000..cbb11d5 --- /dev/null +++ b/visualizer/css/style.css @@ -0,0 +1,55 @@ +body, html { + min-width: 100%; + min-height: 100%; + height: 100%; + width: 100%; + margin: 0; + padding: 0; + display: block; +} +#main { + height: 100%; + width: 100%; + display: grid; + grid-template-rows: 40px auto 200px; + grid-template-columns: auto 400px; + grid-template-areas: + "toolbar toolbar" + "network details" + "graph graph"; +} + +#toolbar { + grid-area: toolbar; + padding: 5px; + border-bottom: 1px solid; +} +#network { + grid-area: network; + font-size: 0; + overflow: hidden; +} +#details { + grid-area: details; + padding: 0 20px; + border-left: 1px solid; + overflow: auto; +} +#graph { + grid-area: graph; + border-top: 1px solid; +} + +.svg-container { + width: 100%; + height: 100%; +} +.svg-content { + width: 100%; + height: 100%; +} +.grid line { + stroke: lightgrey; + stroke-opacity: 0.7; + shape-rendering: crispEdges; +} \ No newline at end of file diff --git a/visualizer/index.html b/visualizer/index.html new file mode 100644 index 0000000..21c9f63 --- /dev/null +++ b/visualizer/index.html @@ -0,0 +1,50 @@ + + + + + + + Noir visualizer + + + +
+
+ Select a file: + + + + + + + + + + + + + + +
+
+
+
+
+
+
+

Details

+
+
+
+
+
+
+
+
+ + + + + + + diff --git a/visualizer/js/data.js b/visualizer/js/data.js new file mode 100644 index 0000000..2ac7a39 --- /dev/null +++ b/visualizer/js/data.js @@ -0,0 +1,602 @@ +const detailsContent = $("#details-content"); + +const drawJobGraph = (structures, profiler) => { + resetGraph(); + resetNetwork(); + const [nodes, links] = buildJobGraph(structures, profiler); + drawNetwork(nodes, links); +}; + +const drawExecutionGraph = (structures, profiler) => { + resetGraph(); + resetNetwork(); + const [nodes, links] = buildExecutionGraph(structures, profiler); + drawNetwork(nodes, links); +}; + +const formatBytes = (bytes) => { + const fmt = d3.format('.1f'); + if (bytes < 1024) return `${fmt(bytes)}B`; + if (bytes < 1024*1024) return `${fmt(bytes / 1024)}KiB`; + if (bytes < 1024*1024*1024) return `${fmt(bytes / 1024 / 1024)}MiB`; + return `${fmt(bytes / 1024 / 1024 / 1024)}GiB`; +}; + +const formatNumber = (num) => { + return num.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ","); +} + +const makeLink = (text, onclick) => { + return $("") + .attr("href", "#") + .on("click", (e) => { + e.preventDefault(); + onclick(); + }) + .text(text); +} + +const buildJobGraph = (structures, profiler) => { + + const drawOperatorDetails = (block_id, operator, replicas, linkMetrics) => { + detailsContent.html(""); + detailsContent.append( + $("

") + .append($("").text("Operator: ")) + .append($("").text(operator.title)) + ); + if (operator.subtitle) { + detailsContent.append( + $("

") + .append($("").text(operator.subtitle)) + ); + } + + const hostCounts = {}; + for (const {host_id} of replicas) { + if (!(host_id in hostCounts)) hostCounts[host_id] = 0; + hostCounts[host_id] += 1; + } + detailsContent.append($("

").append($("").text("Replicated at:"))); + const replicasList = $("

    "); + for (const [host_id, count] of Object.entries(hostCounts)) { + replicasList.append( + $("
  • ") + .append($("").text(`Host${host_id}`)) + .append(` × ${count}`)); + } + detailsContent.append(replicasList); + + detailsContent.append( + $("

    ") + .append($("").text("Produces: ")) + .append($("").text(operator.out_type)) + ); + if (operator.connections.length > 0) { + const list = $("

      "); + for (const connection of operator.connections) { + const to_block_id = connection.to_block_id; + const li = $("
    • ") + .append("Block " + to_block_id + " sending ") + .append($("").text(connection.data_type)) + .append(" with strategy ") + .append($("").text(connection.strategy)); + const key = ChannelMetric.blockPairKey(block_id, connection.to_block_id); + if (key in linkMetrics.items_out) { + const drawMessages = () => { + drawProfilerGraph(linkMetrics.items_out[key].series, `Items/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatNumber(v)); + }; + const drawNetworkMessages = () => { + drawProfilerGraph(linkMetrics.net_messages_out[key].series, `Network messages/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(linkMetrics.net_bytes_out[key].series, `Network bytes/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + const total = linkMetrics.items_out[key].series.total; + li.append(": ") + .append(makeLink(`${formatNumber(total)} items sent`, () => drawMessages())); + + drawMessages(); + if (key in linkMetrics.net_messages_out) { + const numMex = linkMetrics.net_messages_out[key].series.total; + const bytes = linkMetrics.net_bytes_out[key].series.total; + li + .append(" (in ") + .append(makeLink(`${numMex} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) + .append(")"); + + } + } + list.append(li); + } + detailsContent.append($("

      ") + .append($("").text("Connects to: ")) + .append(list)); + } + if (operator.receivers.length > 0) { + const list = $("

        "); + for (const receiver of operator.receivers) { + const from_block_id = receiver.previous_block_id; + const li = $("
      • ") + .append("Block " + from_block_id + " receiving ") + .append($("").text(receiver.data_type)); + const key = ChannelMetric.blockPairKey(receiver.previous_block_id, block_id); + if (key in linkMetrics.items_in) { + const drawMessages = () => { + drawProfilerGraph(linkMetrics.items_in[key].series, `Items/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries, (v) => formatNumber(v)); + }; + const drawNetworkMessages = () => { + drawProfilerGraph(linkMetrics.net_messages_in[key].series, `Network messages/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(linkMetrics.net_bytes_in[key].series, `Network bytes/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + const total = linkMetrics.items_in[key].series.total; + li.append(": ") + .append(makeLink(`${formatNumber(total)} items received`, () => drawMessages())); + + drawMessages(); + if (key in linkMetrics.net_messages_in) { + const numMex = linkMetrics.net_messages_in[key].series.total; + const bytes = linkMetrics.net_bytes_in[key].series.total; + li + .append(" (in ") + .append(makeLink(`${numMex} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) + .append(")"); + + } + } + list.append(li); + } + detailsContent.append($("

        ") + .append($("").text("Receives data from: ")) + .append(list)); + } + }; + + const drawLinkDetails = (from_block_id, connection, linkMetrics) => { + const to_block_id = connection.to_block_id; + detailsContent.html(""); + detailsContent.append( + $("

        ") + .append($("").text("Connection: ")) + .append($("").text(`${from_block_id} → ${to_block_id}`)) + ); + detailsContent.append( + $("

        ") + .append($("").text("Data type: ")) + .append($("").text(connection.data_type)) + ); + detailsContent.append( + $("

        ") + .append($("").text("Strategy: ")) + .append($("").text(connection.strategy)) + ); + + const metricsKey = ChannelMetric.blockPairKey(from_block_id, to_block_id); + if (metricsKey in linkMetrics.net_messages_in) { + const message = linkMetrics.net_messages_in[metricsKey].series.total; + const bytes = linkMetrics.net_bytes_in[metricsKey].series.total; + const drawNetworkMessages = () => { + drawProfilerGraph(linkMetrics.net_messages_in[metricsKey].series, `Network messages/s in ${from_block_id} → ${to_block_id}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(linkMetrics.net_bytes_in[metricsKey].series, `Network bytes/s in ${from_block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + detailsContent.append($("

        ") + .append($("").text("Traffic: ")) + .append(makeLink(`${message} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) + .append(` (${formatBytes(bytes/message)}/message)`)); + drawNetworkBytes(); + } + } + + const linkMetrics = { + items_in: profiler.channel_metrics.items_in.groupByBlockId(), + items_out: profiler.channel_metrics.items_out.groupByBlockId(), + net_messages_in: profiler.channel_metrics.net_messages_in.groupByBlockId(), + net_messages_out: profiler.channel_metrics.net_messages_out.groupByBlockId(), + net_bytes_in: profiler.channel_metrics.net_bytes_in.groupByBlockId(), + net_bytes_out: profiler.channel_metrics.net_bytes_out.groupByBlockId(), + }; + + const byBlockId = {}; + const blockReplicas = {}; + for (const entry of structures) { + const [coord, structure] = entry; + const block_id = coord["block_id"]; + byBlockId[block_id] = structure; + if (!(block_id in blockReplicas)) blockReplicas[block_id] = []; + blockReplicas[block_id].push(coord); + } + const nodes = []; + const links = []; + const receivers = {}; + + const operatorId = (block_id, index) => { + return 100000 + block_id * 1000 + index; + }; + + const maxChannelBytes = Math.max(...Object.values(linkMetrics.net_bytes_in).map((d) => d.series.total)); + const linkWidth = (from_block_id, to_block_id) => { + const key = ChannelMetric.blockPairKey(from_block_id, to_block_id); + const minWidth = 1; + const maxWidth = 3; + const metric = linkMetrics.net_bytes_in[key]; + if (!metric) return minWidth; + const value = metric.series.total; + return minWidth + (maxWidth - minWidth) * (value / maxChannelBytes); + } + + for (const [block_id, structure] of Object.entries(byBlockId)) { + const block = { + id: block_id, + data: { + text: "Block " + block_id + }, + children: structure.operators.map((operator, index) => { + return { + id: operatorId(block_id, index), + data: { + text: operator["title"], + onclick: () => drawOperatorDetails(block_id, operator, blockReplicas[block_id], linkMetrics) + } + }; + }) + }; + nodes.push(block); + structure.operators.map((operator, index) => { + if (index < structure.operators.length - 1) { + links.push({ + source: operatorId(block_id, index), + target: operatorId(block_id, index+1), + data: { + text: operator.out_type, + } + }) + } + for (const receiver of operator.receivers) { + const prev_block_id = receiver.previous_block_id; + if (!(prev_block_id in receivers)) receivers[prev_block_id] = {}; + receivers[prev_block_id][block_id] = index; + } + }); + } + for (const [block_id, structure] of Object.entries(byBlockId)) { + structure.operators.map((operator, index) => { + for (const connection of operator.connections) { + const receiverIndex = receivers[block_id][connection.to_block_id]; + const source = operatorId(block_id, index); + const target = operatorId(connection.to_block_id, receiverIndex); + links.push({ + source, + target, + data: { + type: "solid", + text: connection.data_type, + onclick: () => drawLinkDetails(block_id, connection, linkMetrics), + width: linkWidth(block_id, connection.to_block_id), + } + }) + } + }); + } + + return [nodes, links]; +}; + +const buildExecutionGraph = (structures, profiler) => { + + const formatCoord = (coord) => { + return `Host${coord.host_id} Block${coord.block_id} Replica${coord.replica_id}`; + } + + const drawOperatorDetails = (coord, index, operator) => { + detailsContent.html(""); + detailsContent.append( + $("

        ") + .append($("").text("Operator: ")) + .append($("").text(operator.title)) + ); + if (operator.subtitle) { + detailsContent.append( + $("

        ") + .append($("").text(operator.subtitle)) + ); + } + + detailsContent.append( + $("

        ") + .append($("").text("At: ")) + .append($("").text(formatCoord(coord))) + ); + + detailsContent.append( + $("

        ") + .append($("").text("Produces: ")) + .append($("").text(operator.out_type)) + ); + const id = operatorId(coord.host_id, coord.block_id, coord.replica_id, index); + const conn = connections[id]; + if (conn) { + const list = $("

          "); + for (const {to, connection} of conn) { + const li = $("
        • ") + .append($("").text(formatCoord(to))) + .append(" sending ") + .append($("").text(connection.data_type)) + .append(" with strategy ") + .append($("").text(connection.strategy)); + const key = ChannelMetric.coordPairKey(coord, to); + if (key in profiler.channel_metrics.items_out.data) { + const messages = profiler.channel_metrics.items_out.data[key]; + const net_messages = profiler.channel_metrics.net_messages_out.data[key]; + const net_bytes = profiler.channel_metrics.net_bytes_out.data[key]; + const drawMessages = () => { + drawProfilerGraph(messages.series, `Items/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries, (v) => formatNumber(v)); + }; + const drawNetworkMessages = () => { + drawProfilerGraph(net_messages.series, `Network messages/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(net_bytes.series, `Network bytes/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + const total = messages.series.total; + li.append(": ") + .append(makeLink(`${formatNumber(total)} items sent`, () => drawMessages())); + + drawMessages(); + if (net_messages) { + li + .append(" (in ") + .append(makeLink(`${net_messages.series.total} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(net_bytes.series.total)}`, () => drawNetworkBytes())) + .append(")"); + + } + } + list.append(li); + } + detailsContent.append($("

          ") + .append($("").text("Connects to: ")) + .append(list)); + } + const recv_conn = recv_connections[id]; + if (recv_conn) { + const list = $("

            "); + for (const {from, connection} of recv_conn) { + const li = $("
          • ") + .append($("").text(formatCoord(from))) + .append(" receiving ") + .append($("").text(connection.data_type)); + const key = ChannelMetric.coordPairKey(from, coord); + if (key in profiler.channel_metrics.items_in.data) { + const messages = profiler.channel_metrics.items_in.data[key]; + const net_messages = profiler.channel_metrics.net_messages_in.data[key]; + const net_bytes = profiler.channel_metrics.net_bytes_in.data[key]; + const drawMessages = () => { + drawProfilerGraph(messages.series, `Items/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries, (v) => formatNumber(v)); + }; + const drawNetworkMessages = () => { + drawProfilerGraph(net_messages.series, `Network messages/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(net_bytes.series, `Network bytes/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + li.append(": ") + .append(makeLink(`${formatNumber(messages.series.total)} items received`, () => drawMessages())); + + drawMessages(); + if (net_messages) { + li + .append(" (in ") + .append(makeLink(`${net_messages.series.total} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(net_bytes.series.total)}`, () => drawNetworkBytes())) + .append(")"); + + } + } + list.append(li); + } + detailsContent.append($("

            ") + .append($("").text("Receives data from: ")) + .append(list)); + } + }; + + const drawLinkDetails = (from, to, connection) => { + detailsContent.html(""); + const coordPair = `${formatCoord(from)} → ${formatCoord(to)}`; + detailsContent.append( + $("

            ") + .append($("").text("Connection: ")) + .append($("").text(coordPair)) + ); + detailsContent.append( + $("

            ") + .append($("").text("Data type: ")) + .append($("").text(connection.data_type)) + ); + detailsContent.append( + $("

            ") + .append($("").text("Strategy: ")) + .append($("").text(connection.strategy)) + ); + + const metricsKey = ChannelMetric.coordPairKey(from, to); + if (metricsKey in profiler.channel_metrics.net_messages_in.data) { + const messages = profiler.channel_metrics.net_messages_in.data[metricsKey].series; + const bytes = profiler.channel_metrics.net_bytes_in.data[metricsKey].series; + const drawNetworkMessages = () => { + drawProfilerGraph(messages, `Network messages/s in ${coordPair}`, profiler.iteration_boundaries); + }; + const drawNetworkBytes = () => { + drawProfilerGraph(bytes, `Network bytes/s in ${coordPair}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); + }; + detailsContent.append($("

            ") + .append($("").text("Traffic: ")) + .append(makeLink(`${messages.total} messages`, () => drawNetworkMessages())) + .append(", for a ") + .append(makeLink(`total of ${formatBytes(bytes.total)}`, () => drawNetworkBytes())) + .append(` (${formatBytes(bytes.total/messages.total)}/message)`)); + drawNetworkBytes(); + } + } + + const byHostId = {}; + for (const entry of structures) { + const [coord, structure] = entry; + const host_id = coord["host_id"]; + const block_id = coord["block_id"]; + const replica_id = coord["replica_id"]; + if (!(host_id in byHostId)) byHostId[host_id] = {}; + if (!(block_id in byHostId[host_id])) byHostId[host_id][block_id] = []; + byHostId[host_id][block_id][replica_id] = structure; + } + + const nodes = []; + const links = []; + const receivers = {}; + const connections = {}; + const recv_connections = {}; + + const hostId = (host_id) => { + return `h${host_id}`; + }; + const blockId = (host_id, block_id, replica_id) => { + return `h${host_id}b${block_id}r${replica_id}`; + }; + const operatorId = (host_id, block_id, replica_id, index) => { + return `h${host_id}b${block_id}r${replica_id}o${index}`; + }; + + const maxChannelBytes = Math.max(...Object.values(profiler.channel_metrics.net_bytes_in.data).map((d) => d.series.total)); + const linkWidth = (from, to) => { + const key = ChannelMetric.coordPairKey(from, to); + const minWidth = 1; + const maxWidth = 3; + const metric = profiler.channel_metrics.net_bytes_in.data[key]; + if (!metric) return minWidth; + const value = metric.series.total; + return minWidth + (maxWidth - minWidth) * (value / maxChannelBytes); + } + + const buildOperatorNode = (host_id, block_id, replica_id, operator_index, structure) => { + for (const receiver of structure.receivers) { + const prev_block_id = receiver.previous_block_id; + if (!(prev_block_id in receivers)) receivers[prev_block_id] = {}; + if (!(block_id in receivers[prev_block_id])) receivers[prev_block_id][block_id] = []; + receivers[prev_block_id][block_id].push([host_id, block_id, replica_id, operator_index]); + } + return { + id: operatorId(host_id, block_id, replica_id, operator_index), + data: { + text: structure["title"], + onclick: () => drawOperatorDetails({host_id, block_id, replica_id}, operator_index, structure) + } + } + }; + + const buildBlockNode = (host_id, block_id, structures) => { + return structures.map((replica, replica_id) => { + const node = { + id: blockId(host_id, block_id, replica_id), + data: { + text: `Block ${block_id} / Replica ${replica_id}`, + }, + children: replica["operators"].map((operator, index) => buildOperatorNode(host_id, block_id, replica_id, index, operator)) + }; + // in-block connections + replica.operators.map((operator, index) => { + if (index < replica.operators.length - 1) { + links.push({ + source: operatorId(host_id, block_id, replica_id, index), + target: operatorId(host_id, block_id, replica_id, index+1), + data: { + text: operator.out_type, + } + }) + } + }); + return node; + }); + }; + + const buildHostNode = (host_id, blocks) => { + const node = { + id: hostId(host_id), + data: { + text: `Host ${host_id}` + }, + children: Object.entries(blocks).flatMap(([block_id, structures]) => { + return buildBlockNode(host_id, block_id, structures); + }), + }; + nodes.push(node); + }; + + for (const [host_id, blocks] of Object.entries(byHostId)) { + buildHostNode(host_id, blocks); + } + + for (const [host_id, blocks] of Object.entries(byHostId)) { + for (const [block_id, replicas] of Object.entries(blocks)) { + replicas.map((replica, replica_id) => { + replica.operators.map((operator, index) => { + for (const connection of operator.connections) { + const {to_block_id, data_type, strategy} = connection; + const recv = receivers[block_id][to_block_id]; + const addLink = (to) => { + const fromCoord = {host_id, block_id, replica_id} + const fromId = operatorId(host_id, block_id, replica_id, index); + const toCoord = {host_id: to[0], block_id: to[1], replica_id: to[2]}; + const toId = operatorId(...to); + if (!(fromId in connections)) connections[fromId] = []; + connections[fromId].push({to: toCoord, connection}); + if (!(toId in recv_connections)) recv_connections[toId] = []; + recv_connections[toId].push({from: fromCoord, connection}); + links.push({ + source: fromId, + target: toId, + data: { + text: data_type, + width: linkWidth(fromCoord, toCoord), + onclick: () => drawLinkDetails(fromCoord, toCoord, connection), + } + }); + }; + + if (strategy === "OnlyOne") { + if (recv.length === 1) { + addLink(recv[0]); + } else { + for (const r of recv) { + const [host_id2, block_id2, replica_id2, operator_id2] = r; + if (host_id === host_id2 && replica_id === replica_id2) { + addLink(r); + } + } + } + } else if (strategy === "GroupBy" || strategy === "Random") { + for (const r of recv) { + addLink(r); + } + } else { + throw Error("Invalid strategy: " + strategy); + } + } + }); + }); + } + } + + return [nodes, links]; +}; \ No newline at end of file diff --git a/visualizer/js/graph.js b/visualizer/js/graph.js new file mode 100644 index 0000000..9046cf3 --- /dev/null +++ b/visualizer/js/graph.js @@ -0,0 +1,117 @@ +const BUCKET_RESOLUTION = 20; +const BUCKET_MERGE_FACTOR = 5; + +const graphContainerId = "graph-content"; + +const resetGraph = () => { + d3.select("#" + graphContainerId).select("svg").remove(); +}; + +const drawProfilerGraph = (series, title, iteration_boundaries, yFormat) => { + if (!yFormat) yFormat = (d) => d; + + const iterToIndex = {}; + let maxIterTime = 0; + Object.entries(iteration_boundaries.data).map(([block_id, times], index) => { + iterToIndex[block_id] = index; + maxIterTime = Math.max(maxIterTime, times[times.length - 1]); + }); + + const container = document.getElementById(graphContainerId); + const svgWidth = container.clientWidth; + const svgHeight = container.clientHeight; + const [left, right, top, bottom] = [60, 10, 20, 30]; + + resetGraph(); + const svg = d3.select("#" + graphContainerId) + .append("svg") + .attr("width", svgWidth) + .attr("height", svgHeight); + + const root = svg.append("g"); + + const rawData = series.asList(); + const data = []; + const scaleFactor = 1000 / BUCKET_RESOLUTION / BUCKET_MERGE_FACTOR; + let bucketTime = 0; + let bucketValue = 0; + for (let i = 0; i < rawData.length; i++) { + const [time, value] = rawData[i]; + if (time < bucketTime + BUCKET_RESOLUTION * BUCKET_MERGE_FACTOR) { + bucketValue += value; + } else { + if (bucketValue > 0) + data.push([bucketTime, bucketValue * scaleFactor]); + bucketTime = time; + bucketValue = value; + } + } + data.push([bucketTime, bucketValue * scaleFactor]); + + const x = d3.scaleLinear() + .domain([0, Math.max(d3.max(data, (d) => d[0]), maxIterTime)]) + .range([left, svgWidth-right]); + // x-axis + root + .append("g") + .attr("transform", `translate(0,${svgHeight - bottom})`) + .call(d3.axisBottom(x).ticks(5)); + // vertical grid + root + .append("g") + .attr("transform", `translate(0,${svgHeight - bottom})`) + .attr("class", "grid") + .call(d3 + .axisBottom(x) + .tickSize(-(svgHeight - bottom - top)) + .tickFormat("") + .ticks(5)); + for (const [block_id, times] of Object.entries(iteration_boundaries.data)) { + const color = d3.schemeCategory20[iterToIndex[block_id]]; + for (const time of times) { + const xPos = x(time); + root.append("line") + .attr("stroke", color) + .attr("x1", xPos) + .attr("x2", xPos) + .attr("y1", top) + .attr("y2", svgHeight-bottom); + } + } + + const y = d3.scaleLinear() + .domain([0, d3.max(data, (d) => d[1])]) + .range([svgHeight - bottom, top]); + // y-axis + root + .append("g") + .attr("transform", `translate(${left},0)`) + .call(d3.axisLeft(y).ticks(10).tickFormat(yFormat)); + // horizontal grid + root + .append("g") + .attr("transform", `translate(${left},0)`) + .attr("class", "grid") + .call(d3 + .axisLeft(y) + .tickSize(-(svgWidth - left - right)) + .tickFormat("") + .ticks(10)); + + root.append("path") + .datum(data) + .attr("fill", "none") + .attr("stroke", "blue") + .attr("stroke-width", 1.5) + .attr("d", d3.line() + .x((d) => x(d[0])) + .y((d) => y(d[1]))) + + root + .append("text") + .attr("text-anchor", "middle") + .attr("alignment-baseline", "middle") + .attr("x", svgWidth / 2) + .attr("y", top / 2) + .text(title); +} \ No newline at end of file diff --git a/visualizer/js/network.js b/visualizer/js/network.js new file mode 100644 index 0000000..b20c6fb --- /dev/null +++ b/visualizer/js/network.js @@ -0,0 +1,519 @@ +const constant = (x) => () => x; + +function rectCollide() { + var nodes, sizes, masses + var size = constant([0, 0]) + var strength = 1 + var iterations = 1 + + function force() { + var node, size, mass, xi, yi + var i = -1 + while (++i < iterations) { iterate() } + + function iterate() { + var j = -1 + var tree = d3.quadtree(nodes, xCenter, yCenter).visitAfter(prepare) + + while (++j < nodes.length) { + node = nodes[j] + size = sizes[j] + mass = masses[j] + xi = xCenter(node) + yi = yCenter(node) + + tree.visit(apply) + } + } + + function apply(quad, x0, y0, x1, y1) { + var data = quad.data + var xSize = (size[0] + quad.size[0]) / 2 + var ySize = (size[1] + quad.size[1]) / 2 + if (data) { + if (data.index <= node.index) { return } + + var x = xi - xCenter(data) + var y = yi - yCenter(data) + var xd = Math.abs(x) - xSize + var yd = Math.abs(y) - ySize + + if (xd < 0 && yd < 0) { + var l = Math.sqrt(x * x + y * y) + var m = masses[data.index] / (mass + masses[data.index]) + + if (Math.abs(xd) < Math.abs(yd)) { + node.vx -= (x *= xd / l * strength) * m + data.vx += x * (1 - m) + } else { + node.vy -= (y *= yd / l * strength) * m + data.vy += y * (1 - m) + } + } + } + + return x0 > xi + xSize || y0 > yi + ySize || + x1 < xi - xSize || y1 < yi - ySize + } + + function prepare(quad) { + if (quad.data) { + quad.size = sizes[quad.data.index] + } else { + quad.size = [0, 0] + var i = -1 + while (++i < 4) { + if (quad[i] && quad[i].size) { + quad.size[0] = Math.max(quad.size[0], quad[i].size[0]) + quad.size[1] = Math.max(quad.size[1], quad[i].size[1]) + } + } + } + } + } + + function xCenter(d) { return d.x + d.vx + sizes[d.index][0] / 2 } + function yCenter(d) { return d.y + d.vy + sizes[d.index][1] / 2 } + + force.initialize = function (_) { + sizes = (nodes = _).map(size) + masses = sizes.map(function (d) { return d[0] * d[1] }) + } + + force.size = function (_) { + return (arguments.length + ? (size = typeof _ === 'function' ? _ : constant(_), force) + : size) + } + + force.strength = function (_) { + return (arguments.length ? (strength = +_, force) : strength) + } + + force.iterations = function (_) { + return (arguments.length ? (iterations = +_, force) : iterations) + } + + return force +} + +let gravityEnabled = true; +let gravityCallbacks = []; + +const changeGravity = (enabled) => { + gravityEnabled = enabled; + for (const callback of gravityCallbacks) callback(); +}; + +const resetNetwork = () => { + gravityCallbacks = []; +}; + +const drawNetwork = (nodes, links) => { + const nodeById = {}; + + const root = { + id: -1, + data: {}, + children: nodes, + parent: null, + }; + + const dfs = (node, depth) => { + nodeById[node.id] = node; + node.depth = depth; + if (node.children) { + node.links = []; + for (const child of node.children) { + child.parent = node; + dfs(child, depth + 1); + } + } + }; + + dfs(root, 0); + + // put each link in the lowest-common-ancestor of the 2 nodes + const groupLinks = (links) => { + for (const link of links) { + let source = nodeById[link.source]; + let target = nodeById[link.target]; + + while (source.depth > target.depth) source = source.parent; + while (target.depth > source.depth) target = target.parent; + + // link cannot point to a children + if (source.id === target.id) { + throw new Error("Invalid link from " + link.source + " to " + link.target); + } + + while (source.parent.id !== target.parent.id) { + source = source.parent; + target = target.parent; + } + + const parent = source.parent; + parent.links.push({ + source: source.id, + target: target.id, + link: link, + }); + } + }; + + groupLinks(links); + + const assignInitialPositions = (node) => { + if (!node.children) return; + const adj = {}; + for (const link of node.links) { + if (!(link.source in adj)) adj[link.source] = []; + adj[link.source].push(link.target); + } + const visited = {}; + const order = []; + const dfs = (node) => { + if (node in visited) return; + visited[node] = true; + if (node in adj) { + for (const next of adj[node]) { + dfs(next); + } + } + order.push(node); + } + const ranks = {}; + for (const child of node.children) { + dfs(child.id); + } + for (const child of node.children) { + visited[child.id] = false; + ranks[child.id] = 0; + } + for (let i = order.length - 1; i >= 0; i--) { + const id = order[i]; + if (id in adj) { + for (const next of adj[id]) { + if (visited[next]) continue; + ranks[next] = Math.max(ranks[next], ranks[id] + 1); + } + } + visited[id] = true; + } + + const rankFreq = {}; + const rankSpacingX = 400; + const rankSpacingY = 200; + const initialPosition = {}; + for (let i = order.length - 1; i >= 0; i--) { + const id = order[i]; + const rank = ranks[id]; + const y = rankSpacingY * rank; + if (!(rank in rankFreq)) rankFreq[rank] = 0; + const x = rankSpacingX * rankFreq[rank]; + rankFreq[rank]++; + initialPosition[id] = [x, y]; + } + + for (const child of node.children) { + const [x, y] = initialPosition[child.id]; + child.x = x; + child.y = y; + + assignInitialPositions(child); + } + }; + + assignInitialPositions(root); + + const contentId = "network-content"; + + const container = document.getElementById(contentId); + const svgWidth = container.clientWidth; + const svgHeight = container.clientHeight; + const nodeWidth = 100; + const nodeHeight = 40; + + d3.select("#" + contentId).select("svg").remove(); + const svg = d3.select("#" + contentId) + .append("svg") + .attr("width", svgWidth) + .attr("height", svgHeight); + + const defs = svg.append("defs"); + defs + .append("marker") + .attr("id", "arrowhead") + .attr("markerWidth", "10") + .attr("markerHeight", "7") + .attr("refX", "7.14") + .attr("refY", "3.5") + .attr("orient", "auto") + .append("polygon") + .attr("points", "0 0, 10 3.5, 0 7"); + + const nodeSize = (node) => { + if (!node.children) return [ + node.x, node.y, node.width, node.height, + ]; + + const padding = 10; + let left = 1e9; + let right = -1e9; + let top = 1e9; + let bottom = -1e9; + for (const child of node.children) { + const childLeft = child.x - child.width / 2; + const childRight = childLeft + child.width; + const childTop = child.y - child.height / 2; + const childBottom = childTop + child.height; + + if (childLeft < left) left = childLeft; + if (childRight > right) right = childRight; + if (childTop < top) top = childTop; + if (childBottom > bottom) bottom = childBottom; + } + const width = right - left + 2 * padding; + const height = bottom - top + 2 * padding; + return [left - padding, top - padding, width, height]; + }; + + function pointOnRect(x, y, minX, minY, maxX, maxY) { + const midX = (minX + maxX) / 2; + const midY = (minY + maxY) / 2; + // if (midX - x == 0) -> m == ±Inf -> minYx/maxYx == x (because value / ±Inf = ±0) + const m = (midY - y) / (midX - x); + + if (x <= midX) { // check "left" side + const minXy = m * (minX - x) + y; + if (minY <= minXy && minXy <= maxY) + return [minX, minXy]; + } + + if (x >= midX) { // check "right" side + const maxXy = m * (maxX - x) + y; + if (minY <= maxXy && maxXy <= maxY) + return [maxX, maxXy]; + } + + if (y <= midY) { // check "top" side + const minYx = (minY - y) / m + x; + if (minX <= minYx && minYx <= maxX) + return [minYx, minY]; + } + + if (y >= midY) { // check "bottom" side + const maxYx = (maxY - y) / m + x; + if (minX <= maxYx && maxYx <= maxX) + return [maxYx, maxY]; + } + + // edge case when finding midpoint intersection: m = 0/0 = NaN + if (x === midX && y === midY) return [x, y]; + } + + const drawNode = (node, parent) => { + if (!node.children) { + node.width = nodeWidth; + node.height = nodeHeight; + const group = parent.append("g"); + group + .append("rect") + .style("fill", "#dddddd") + .style("stroke", "blue") + .attr("x", -nodeWidth/2) + .attr("y", -nodeHeight/2) + .attr("width", nodeWidth) + .attr("height", nodeHeight); + if (node.data && node.data.text) { + group + .append("text") + .attr("text-anchor", "middle") + .attr("dominant-baseline", "middle") + .attr("font-size", "22") + .text(node.data.text) + } + if (node.data && node.data.onclick) { + group.on("click", () => node.data.onclick()); + group.style("cursor", "pointer"); + } + return; + } + + const innerRect = parent + .append("rect") + .attr("class", "node" + node.id) + .style("fill", "white") + .style("stroke", "blue"); + + const innerNodes = parent + .selectAll("g") + .data(node.children, (d) => d.id) + .enter() + .append("g") + .attr("id", (d) => "node" + d.id); + + const innerText = parent + .append("text") + .attr("text-anchor", "start") + .attr("dominant-baseline", "middle") + .attr("font-size", "22") + .text(node.data.text); + + innerNodes.each((inner, i, innerNodeElements) => { + const innerNodeElement = d3.select(innerNodeElements[i]); + drawNode(node.children[i], innerNodeElement); + }); + + const links = node.links.map((link) => { + const {type, text, width, onclick} = link.link.data; + const lineElem = parent + .append("line") + .attr("class", link.link.source + "_" + link.link.target) + .attr("stroke", "black") + .attr("stroke-width", width || 1) + .attr("marker-end", "url(#arrowhead)"); + const textElem = parent + .append("text") + .attr("text-anchor", "start") + .attr("dominant-baseline", "middle") + .attr("font-size", "22") + .text(text || ""); + if (type === "dashed") { + lineElem.attr("stroke-dasharray", "4") + } + if (onclick) { + lineElem + .style("cursor", "pointer") + .on("click", () => onclick()); + textElem + .style("cursor", "pointer") + .on("click", () => onclick()); + } + return [lineElem, textElem]; + }); + + const tick = () => { + innerNodes + .attr("transform", (d) => "translate(" + d.x + "," + d.y + ")"); + + if (node.parent) { + const [x, y, width, height] = nodeSize(node); + node.width = width; + node.height = height; + innerRect + .attr("x", x) + .attr("y", y) + .attr("width", width) + .attr("height", height); + innerText + .attr("x", x+20) + .attr("y", y-20) + .attr("width", width) + .attr("height", height); + } + + links.forEach(([line, text], i) => { + const link = node.links[i]; + const getCoord = (child) => { + if (!child || child.id === node.id) { + return [0, 0]; + } + const [x, y] = getCoord(child.parent); + return [x+child.x, y+child.y]; + }; + const source = nodeById[link.link.source]; + const target = nodeById[link.link.target]; + const [x1, y1] = getCoord(source); + const [x2, y2] = getCoord(target); + + const rect1 = [x1-source.width/2, y1-source.height/2, x1+source.width/2, y1+source.height/2]; + const rect2 = [x2-target.width/2, y2-target.height/2, x2+target.width/2, y2+target.height/2]; + const [px1, py1] = pointOnRect(x2, y2, ...rect1); + const [px2, py2] = pointOnRect(x1, y1, ...rect2); + + line + .attr("x1", px1) + .attr("y1", py1) + .attr("x2", px2) + .attr("y2", py2); + text + .attr("x", (px1 + px2) / 2) + .attr("y", (py1 + py2) / 2); + }); + }; + + const collisionForce = rectCollide() + .size((d) => { + const [,, w, h] = nodeSize(d); + return [w, h]; + }); + const simulation = d3.forceSimulation() + .force("link", d3.forceLink().id((d) => d.id).strength(0.001)) + .force("charge", d3.forceManyBody().strength(-30)) + .force("collision", collisionForce) + .force("center", d3.forceCenter(0, 0)) + .alphaMin(0.1); + + simulation.nodes(node.children).on("tick", tick); + simulation.force("link").links(node.links); + simulation.alpha(1).restart(); + + gravityCallbacks.push(() => { + if (gravityEnabled) { + for (const n of node.children) { + node.fx = null; + node.fy = null; + simulation.alpha(1).restart(); + } + } else { + for (const n of node.children) { + node.fx = node.x; + node.fy = node.y; + } + } + }); + + const drag = () => { + return d3.drag() + .on("start", (d) => { + if (!d3.event.active) simulation.alpha(1).restart(); + d.fx = d.x; + d.fy = d.y; + }) + .on("drag", (d) => { + d.fx = d3.event.x; + d.fy = d3.event.y; + }) + .on("end", (d) => { + if (!d3.event.active) simulation.alphaTarget(0); + if (gravityEnabled) { + d.fx = null; + d.fy = null; + } + }); + } + + innerNodes.call(drag()); + }; + + const rootElem = svg.append("g"); + const contentElem = rootElem + .append("g") + .attr("transform", "translate(" + svgWidth/2 + "," + svgHeight/2 + ")"); + drawNode(root, contentElem); + + const zoom = d3.zoom() + .scaleExtent([0.1, 10]) + .on("zoom", () => rootElem.attr("transform", d3.event.transform)); + svg.call(zoom); + + const resize = () => { + const width = container.clientWidth; + const height = container.clientHeight; + svg + .attr("width", width) + .attr("height", height); + } + window.addEventListener("resize", resize); +} \ No newline at end of file diff --git a/visualizer/js/profiler.js b/visualizer/js/profiler.js new file mode 100644 index 0000000..4c6b737 --- /dev/null +++ b/visualizer/js/profiler.js @@ -0,0 +1,118 @@ +class TimeSeries { + constructor() { + this.data = {}; + this.total = 0; + } + + add(bucket, value) { + if (!(bucket in this.data)) this.data[bucket] = 0; + this.data[bucket] += value; + this.total += value; + } + + asList() { + const res = Object.entries(this.data); + return res + .sort(([a,], [b,]) => a - b) + .map(([t, v]) => [+t, +v]); + } + + static merge(a, b) { + const result = new TimeSeries(); + for (const [bucket, value] of Object.entries(a.data)) { + result.data[bucket] = value; + } + for (const [bucket, value] of Object.entries(b.data)) { + if (!(bucket in result.data)) result.data[bucket] = 0; + result.data[bucket] += value; + } + result.total = a.total + b.total; + return result; + } +} + +class ChannelMetric { + constructor() { + this.data = {}; + } + + add(from, to, bucket, value) { + const key = ChannelMetric.coordPairKey(from, to); + if (!(key in this.data)) { + this.data[key] = { + from, + to, + series: new TimeSeries() + }; + } + this.data[key].series.add(bucket, value); + } + + groupByBlockId() { + const byBlockId = {}; + for (const {from, to, series} of Object.values(this.data)) { + const key = ChannelMetric.blockPairKey(from.block_id, to.block_id); + if (!(key in byBlockId)) byBlockId[key] = {from, to, series: new TimeSeries()}; + byBlockId[key].series = TimeSeries.merge(byBlockId[key].series, series); + } + return byBlockId; + } + + static blockPairKey(from, to) { + return `${from}:${to}`; + } + + static coordPairKey(from, to) { + return `${this.coordKey(from)}|${this.coordKey(to)}`; + } + + static coordKey(coord) { + return `${coord.host_id}:${coord.block_id}:${coord.replica_id}`; + } +} + +class IterationBoundaries { + constructor() { + this.data = {}; + } + + add(block_id, time) { + if (!(block_id in this.data)) this.data[block_id] = []; + this.data[block_id].push(time); + } +} + +class Profiler { + constructor(data) { + this.channel_metrics = { + items_in: new ChannelMetric(), + items_out: new ChannelMetric(), + net_messages_in: new ChannelMetric(), + net_messages_out: new ChannelMetric(), + net_bytes_in: new ChannelMetric(), + net_bytes_out: new ChannelMetric(), + }; + this.iteration_boundaries = new IterationBoundaries(); + for (const profiler of data) { + for (const {metrics, start_ms} of profiler.buckets) { + for (const {from, to, value} of metrics.items_in) { + this.channel_metrics.items_in.add(from, to, start_ms, value); + } + for (const {from, to, value} of metrics.items_out) { + this.channel_metrics.items_out.add(from, to, start_ms, value); + } + for (const {from, to, value} of metrics.net_messages_in) { + this.channel_metrics.net_messages_in.add(from, to, start_ms, value[0]); + this.channel_metrics.net_bytes_in.add(from, to, start_ms, value[1]); + } + for (const {from, to, value} of metrics.net_messages_out) { + this.channel_metrics.net_messages_out.add(from, to, start_ms, value[0]); + this.channel_metrics.net_bytes_out.add(from, to, start_ms, value[1]); + } + for (const [block_id, time] of metrics.iteration_boundaries) { + this.iteration_boundaries.add(block_id, time); + } + } + } + } +} \ No newline at end of file diff --git a/visualizer/js/script.js b/visualizer/js/script.js new file mode 100644 index 0000000..fba2900 --- /dev/null +++ b/visualizer/js/script.js @@ -0,0 +1,67 @@ +const inputSelector = document.getElementById("file-picker"); +const jobGraphRadio = document.getElementById("job-graph"); +const executionGraphRadio = document.getElementById("execution-graph"); +const gravitySelect = document.getElementById("gravity"); +const redrawButton = document.getElementById("redraw"); + +window.graphMode = "job"; +window.profiler = null; + +const drawGraph = (structures, profiler) => { + if (window.graphMode === "job") { + drawJobGraph(structures, profiler); + } else { + drawExecutionGraph(structures, profiler); + } +} + +inputSelector.addEventListener("change", (event) => { + const files = event.target.files; + if (files.length !== 1) { + return; + } + const file = files[0]; + const reader = new FileReader(); + reader.addEventListener("load", (event) => { + const content = event.target.result; + let json; + try { + json = JSON.parse(content); + } catch (e) { + alert("Malformed JSON data: " + e); + } + window.structures = json["structures"]; + window.profilers = json["profilers"]; + if (!window.structures || !window.profilers) { + alert("Invalid JSON data: structures or profilers missing"); + return; + } + window.profiler = new Profiler(window.profilers); + drawGraph(window.structures, window.profiler); + }); + reader.readAsText(file); +}); + +jobGraphRadio.addEventListener("change", () => { + window.graphMode = "job"; + if (window.profiler) { + drawGraph(window.structures, window.profiler); + } +}); + +executionGraphRadio.addEventListener("change", () => { + window.graphMode = "execution"; + if (window.profiler) { + drawGraph(window.structures, window.profiler); + } +}); + +gravitySelect.addEventListener("change", () => { + changeGravity(!gravityEnabled); +}); + +redrawButton.addEventListener("click", () => { + if (window.profiler) { + drawGraph(window.structures, window.profiler); + } +}); \ No newline at end of file