diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 50e69312..555c2ded 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -28,7 +28,7 @@ jobs: uses: actions-rs/clippy-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - args: --all-targets --tests --all -- -D warnings + args: --all-targets --all -- -D warnings test: name: Test diff --git a/README.md b/README.md index af65dd6e..03a88232 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,10 @@ fn main() { let (config, args) = EnvironmentConfig::from_args(); let mut env = StreamEnvironment::new(config); env.spawn_remote_workers(); - let source = FileSource::new(&args[0]); - // Create file source + let result = env - .stream(source) + // Open and read file line by line in parallel + .stream_file(&args[0]) // Split into words .flat_map(|line| tokenize(&line)) // Partition @@ -32,10 +32,11 @@ fn main() { .fold(0, |count, _word| *count += 1) // Collect result .collect_vec(); + env.execute(); // Start execution (blocking) if let Some(result) = result.get() { // Print word counts - println!("{:?}", result); + result.into_iter().for_each(|(word, count)| println!("{word}: {count}")); } } @@ -45,4 +46,6 @@ fn tokenize(s: &str) -> Vec { } // Execute on 6 local hosts `cargo run -- -l 6 input.txt` -``` \ No newline at end of file +``` + +# Documentation is WIP \ No newline at end of file diff --git a/examples/rolling_top_words.rs b/examples/rolling_top_words.rs index f3a1f647..b038527e 100644 --- a/examples/rolling_top_words.rs +++ b/examples/rolling_top_words.rs @@ -170,7 +170,7 @@ fn main() { env.spawn_remote_workers(); let source = ParallelIteratorSource::new(|id, num_replicas| { - TopicSource::new(id as u64, num_replicas as u64) + TopicSource::new(id, num_replicas) }); env.stream(source) // add a timestamp for each item (using the one generated by the source) and add a watermark diff --git a/examples/rolling_top_words_e2e.rs b/examples/rolling_top_words_e2e.rs index ff470f46..ef2e8d77 100644 --- a/examples/rolling_top_words_e2e.rs +++ b/examples/rolling_top_words_e2e.rs @@ -117,7 +117,7 @@ fn main() { env.spawn_remote_workers(); let source = ParallelIteratorSource::new(move |id, num_replicas| { - TopicSource::new(id as u64, num_replicas as u64, limit) + TopicSource::new(id, num_replicas, limit) }); env.stream(source) // add a timestamp for each item (using the one generated by the source) and add a watermark diff --git a/src/operator/source/file.rs b/src/operator/source/file.rs index 13b07c45..f4f25ded 100644 --- a/src/operator/source/file.rs +++ b/src/operator/source/file.rs @@ -3,7 +3,6 @@ use std::fs::File; use std::io::BufRead; use std::io::Seek; use std::io::{BufReader, SeekFrom}; -use std::path::Path; use std::path::PathBuf; use crate::block::{BlockStructure, OperatorKind, OperatorStructure}; @@ -169,7 +168,7 @@ impl Clone for FileSource { impl crate::StreamEnvironment { /// Convenience method, creates a `FileSource` and makes a stream using `StreamEnvironment::stream` - pub fn stream_file(&mut self, path: &Path) -> Stream { + pub fn stream_file>(&mut self, path: P) -> Stream { let source = FileSource::new(path); self.stream(source) } diff --git a/tools/benchmarks/.gitignore b/tools/benchmarks/.gitignore deleted file mode 100644 index 928b993b..00000000 --- a/tools/benchmarks/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -config.yaml -results* -temp -plots/target -__pycache__ diff --git a/tools/benchmarks/README.md b/tools/benchmarks/README.md deleted file mode 100644 index e6267937..00000000 --- a/tools/benchmarks/README.md +++ /dev/null @@ -1,101 +0,0 @@ -# Tools for running the benchmarks - -`benchmark.py` is able to automate most of the steps required to run a benchmark and collect the results. -In particular it does automatically the following: - -- Testing the connection to the workers, measuring ping and collecting info about their CPU -- Compile the program on a worker node ensuring the correct native optimizations -- Sync the executable to all the worker nodes -- Set up the host files -- Run a warmup execution to cache the dataset in RAM -- Run the benchmark N times collecting the outputs in handy json files - -This script supports MPI projects compiled with cmake, RStream examples and Flink projects with maven. - -Where `benchmark.py` really shines is automating the benchmark varying hyperparameters: -in the configuration file you can define a set of hyperparameter and their possible values, the script will enumerate all the possible combinations and run the benchmarks on them. - -The 2 required hyperparameters are: - -- `num_hosts`: the number of hosts to use (the values should not be grater than the number of known hosts) -- `procs_per_host`: the number of processes (slots) for each host - - When using RStream it's the number of slots in the hostfile (ignoring source and sink, they're added automatically) - - When using MPI or Flink it's the number of slots for that worker - -You can use `--verbose` to see all the details of what's happening under the hoods. - -## Example Usage - -All the commands should be executed inside `tools/`, running elsewhere needs changes in the arguments. - -Before running `benchmark.py` copy `example.config.yaml` in `config.yaml` and tune the parameters (e.g. setting the list of known hosts, hyperparameters, ...). - -Try running `./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 1 -d ~/rust_stream/data/gutenberg.txt` against this configuration file: -```yaml -known_hosts: - - localhost -results_file: "./results/{experiment}/{system}/{num_hosts}-hosts-{procs_per_host}-procs-{run}.json" -# [...] -warmup: true -num_runs: 5 -hyperparameters: - num_hosts: [1] - procs_per_host: [1, 2, 4, 8] -``` - -This is a possible content of one results file (`num_hosts-1-procs_per_host-1-run-0.json`): -```json -{ - "hosts": [ - "localhost" - ], - "result": { - "stdout": "total:1673354595\ncsv:0\nnetwork:397\n", - "stderr": "[ 0/ 0] has interval 0 - 104857600\n[ 0/ 0] has interval 0 - 104857600 -- done\n" - }, - "system": "mpi", - "run": 0, - "experiment": "experiment_name", - "num_hosts": 1, - "procs_per_host": 1 -} -``` - -### Running an MPI benchmark - -```bash -./benchmark.py experiment_name mpi ../mpi/wordcount/ main -- -T 8 -d ~/rust_stream/data/gutenberg.txt -``` - -- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. -- `mpi` tell the script you are running an MPI benchmark. -- `../mpi/wordcount/` is the path where the benchmark is stored (in the master node). There should be a `CMakeLists.txt` inside. -- `main` is the name of the executable produced by compiling the benchmark. -- `--` what follows are arguments passed to the executable run with `mpirun`. -- `-T 8` tells the benchmark to use 8 threads. You can also add hyperparameters in `config.yaml` and use them in the command line arguments wrapping them in curly braces (e.g `-T {threads}`). - -### Running a RStream example - -```bash -./benchmark.py experiment_name rstream .. wordcount_p -- ~/rust_stream/data/gutenberg.txt -``` - -- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. -- `rstream` tell the script you are running a RStream benchmark. -- `..` is the path where the RStream project is cloned (`..` since we are inside `tools/`). -- `wordcount_p` is the name of the example to run. -- `--` what follows are arguments passed to the executable. - -### Running a Flink example - -Remember to set `flink_base_path` inside `config.yaml` to the correct path and to copy flink on all the hosts, including the workers. - -```bash -./benchmark.py experiment_name flink ../flink/WordCount wordCount-1.0.jar -- -input ~/rust_stream/data/gutenberg.txt -``` - -- `experiment_name` is used to keep track of which test you are performing, it will be used to organize the results in the `results/` folder. -- `flink` tell the script you are running a Flink benchmark. -- `../flink/WordCount` is the path where the Flink project is stored. There should be a `pom.xml` inside. -- `wordCount-1.0.jar` is the name of the jar file produced by `mvn package`. -- `--` what follows are arguments passed to the executable. \ No newline at end of file diff --git a/tools/benchmarks/aggregate_csv.py b/tools/benchmarks/aggregate_csv.py deleted file mode 100755 index 9bfa07bc..00000000 --- a/tools/benchmarks/aggregate_csv.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import csv -import sys -from typing import Dict - - -class Experiment: - def __init__(self, name): - self.name = name - self.times = {} - - def add(self, row, column_name): - hosts = int(row["num_hosts"]) - cores = int(row["procs_per_host"]) - time = float(row[column_name]) / 10 ** 9 - self.times.setdefault((hosts, cores), []).append(time) - - def row_ids(self): - return set(self.times.keys()) - - def get_data(self, row_id): - if row_id not in self.times: - return None - data = self.times[row_id] - delta = (max(data) - min(data)) / 2 - avg = sum(data) / len(data) - return avg, delta - - def get_row(self, row_id, format): - if row_id not in self.times: - if format in {"human", "latex"}: - return [""] - else: - return ["", ""] - avg, delta = self.get_data(row_id) - if format == "human": - return [f"{avg:.2f}s (± {delta:.2f}s)"] - elif format == "latex": - return [f"\sipm{{{avg:.2f}}}{{{delta:.2f}}}"] - else: - return [str(avg), str(delta)] - - -class System: - def __init__(self, system, time_column_name): - self.system = system - self.time_column_name = time_column_name - self.experiments = {} # type: Dict[str, Experiment] - - def add(self, row): - exp = row["experiment"] - if exp not in self.experiments: - self.experiments[exp] = Experiment(exp) - self.experiments[exp].add(row, self.time_column_name) - - def header(self, experiment, single_experiment, format): - if experiment not in self.experiments: - return [] - - if single_experiment: - if format in {"human", "latex"}: - return [self.system] - else: - return [f"{self.system} (s)", f"{self.system} (± s)"] - else: - if format in {"human", "latex"}: - return [f"{experiment} ({self.system})"] - else: - return [f"{experiment} ({self.system}) ({h}s)" for h in ["", "± "]] - - def get_row(self, experiment, row_id, format): - if experiment not in self.experiments: - return [] - return self.experiments[experiment].get_row(row_id, format) - - def get_experiments(self): - return set(self.experiments.keys()) - - def row_ids(self): - ids = set() - for exp in self.experiments.values(): - ids |= exp.row_ids() - return ids - - -class RStream1(System): - def __init__(self): - System.__init__(self, "rstream1", "Run") - - -class RStream2(System): - def __init__(self): - System.__init__(self, "rstream2", "max-remote-execution") - - -class Flink(System): - def __init__(self): - System.__init__(self, "flink", "total") - - -class MPI(System): - def __init__(self): - System.__init__(self, "mpi", "total") - - -class Timely(System): - def __init__(self): - System.__init__(self, "timely", "total") - - -def get_systems(): - return [RStream1(), RStream2(), Flink(), MPI(), Timely()] - - -def parse_stdin(systems): - for row in csv.DictReader(sys.stdin): - system = row["system"] - if system == "rstream": - systems[0].add(row) - elif system == "rstream2": - systems[1].add(row) - elif system == "flink": - systems[2].add(row) - elif system == "mpi": - systems[3].add(row) - elif system == "timely": - systems[4].add(row) - else: - raise ValueError("Unsupported system: " + system) - - -def main(args): - systems = get_systems() - parse_stdin(systems) - - experiments = set() - for system in systems: - experiments |= system.get_experiments() - experiments = list(sorted(experiments)) - single_experiment = len(experiments) == 1 - - headers = ["hosts", "cores"] - for experiment in experiments: - for system in systems: - headers += system.header(experiment, single_experiment, args.format) - - ids = set() - for system in systems: - ids |= system.row_ids() - ids = list(sorted(ids)) - - rows = [] - for row_id in ids: - row = [str(row_id[0]), str(row_id[0] * row_id[1])] - for experiment in experiments: - for system in systems: - row += system.get_row(experiment, row_id, args.format) - rows += [row] - - if args.format in {"human", "csv"}: - writer = csv.writer(sys.stdout) - writer.writerow(headers) - writer.writerows(rows) - else: - print(" & ".join(headers), end="\\\\\n") - print("\\midrule") - for row in rows: - print(" & ".join(row), end="\\\\\n") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Aggregate the output of gen_csv.py") - parser.add_argument( - "--format", - "-f", - choices=["human", "csv", "latex"], - default="human", - help="Output format", - ) - args = parser.parse_args() - main(args) diff --git a/tools/benchmarks/benchmark.py b/tools/benchmarks/benchmark.py deleted file mode 100755 index 1924f3cf..00000000 --- a/tools/benchmarks/benchmark.py +++ /dev/null @@ -1,745 +0,0 @@ -#!/usr/bin/env python3 - -import abc -import argparse -import json -import logging -import math -import os.path -import shlex -import subprocess -import sys -import tempfile - -import coloredlogs -import paramiko -import ruamel.yaml - -logger = logging.getLogger("benchmark") -verbosity = 0 - - -class Benchmark(abc.ABC): - @abc.abstractmethod - def prepare(self, args): - """Prepare the solution (e.g. compiling it if needed)""" - - def setup_hyp(self, hyperparameters): - """Setup the system for the selected hyperparameters, for example - raising up the cluster if needed. - """ - - @abc.abstractmethod - def run(self, args, hyperparameters, run_index): - """Run the benchmark of the prepared solution""" - - -class MPI(Benchmark): - exec_path = "/tmp/mpibin" - mpi_extra_args = [ - "-oversubscribe", - "-mca", - "btl_base_warn_component_unused", - "0", - "--map-by", - # "NUMA:PE=4", - "hwthread", - # "node", - "-display-map", - "--mca", - "mpi_yield_when_idle", - "1", - ] - - def __init__(self, config): - self.config = config - - def prepare(self, args): - logger.info("Preparing MPI") - compiler_host = self.config["known_hosts"][0] - source = args.directory - if not source.endswith("/"): - source += "/" - - logger.info("Copying source files from %s to %s", source, compiler_host) - compilation_dir = self.config["compilation_dir"] + "/" + args.experiment - sync_on(compiler_host, source, compilation_dir) - - logger.info("Compiling using cmake") - run_on(compiler_host, "mkdir", "-p", "%s/build" % compilation_dir) - run_on( - compiler_host, - "cd %s/build && cmake .. -DCMAKE_BUILD_TYPE=Release" % compilation_dir, - shell=True, - ) - run_on(compiler_host, "cd %s/build && make -j 8" % compilation_dir, shell=True) - run_on( - compiler_host, - "cd %s/build && ls -lah && file -E ./%s" % (compilation_dir, args.bin_name), - shell=True, - ) - logger.info( - "Executable compiled at %s/build/%s on %s" - % (compilation_dir, args.bin_name, compiler_host) - ) - - logger.info("Copying executable to this machine") - local_path = "%s/%s/mpi/%s" % ( - self.config["temp_dir"], - args.experiment, - args.bin_name, - ) - base = os.path.dirname(local_path) - os.makedirs(base, exist_ok=True) - sync_from( - compiler_host, "%s/build/%s" % (compilation_dir, args.bin_name), local_path - ) - - logger.info("Copying executable to all the hosts") - for host in self.config["known_hosts"]: - sync_on(host, local_path, self.exec_path) - - def run(self, args, hyperparameters, run_index): - num_hosts = hyperparameters["num_hosts"] - procs_per_host = hyperparameters["procs_per_host"] - with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: - hostfile_content = "" - for host in self.config["known_hosts"][:num_hosts]: - hostfile_content += "%s slots=%d\n" % (host, procs_per_host) - hostfile.write(hostfile_content) - hostfile.flush() - if verbosity > 1: - logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) - - num_proc = num_hosts * procs_per_host - extra_args = map( - lambda arg: replace_vars(arg, hyperparameters), - args.extra_args, - ) - cmd = [ - "mpirun", - # "-x", - # "LD_PRELOAD=/usr/bin/libmimalloc.so", - "-np", - str(num_proc), - "-hostfile", - hostfile.name, - *self.mpi_extra_args, - self.exec_path, - *extra_args, - ] - return run_benchmark(cmd) - - -class RStream(Benchmark): - exec_path = "/tmp/rstreambin" - compilation_suffix = "rstream" - - def __init__(self, config): - self.config = config - - def prepare(self, args): - logger.info("Preparing RStream") - compiler_host = self.config["known_hosts"][0] - source = args.directory - if not source.endswith("/"): - source += "/" - - logger.info("Copying source files from %s to %s", source, compiler_host) - compilation_dir = self.config["compilation_dir"] + "/" + self.compilation_suffix - run_on(compiler_host, "mkdir -p %s" % compilation_dir, shell=True) - sync_on(compiler_host, source, compilation_dir) - - logger.info("Compiling using cargo") - run_on( - compiler_host, - "cd %s && cargo build --release --example %s" - % (compilation_dir, args.example), - shell=True, - ) - remote_path = "%s/target/release/examples/%s" % (compilation_dir, args.example) - logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) - - logger.info("Copying executable to this machine") - self.local_path = "%s/%s/rstream/%s" % ( - self.config["temp_dir"], - args.experiment, - args.example, - ) - base = os.path.dirname(self.local_path) - os.makedirs(base, exist_ok=True) - sync_from(compiler_host, remote_path, self.local_path) - - logger.info("Copying executable to all the hosts") - for host in self.config["known_hosts"]: - sync_on(host, self.local_path, self.exec_path) - - def run(self, args, hyperparameters, run_index): - num_hosts = hyperparameters["num_hosts"] - procs_per_host = hyperparameters["procs_per_host"] - with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: - hostfile_content = "" - for i, host in enumerate(self.config["known_hosts"][:num_hosts]): - slots = procs_per_host * args.num_steps - if i == 0: - # source process - slots += 1 - if i == num_hosts - 1: - # sink process - slots += 1 - hostfile_content += "- hostname: %s\n" % host - hostfile_content += " slots: %d\n" % slots - hostfile.write(hostfile_content) - hostfile.flush() - logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) - - extra_args = map( - lambda arg: replace_vars(arg, hyperparameters), - args.extra_args, - ) - cmd = [ - "cargo", - "run", - "--release", - "--", - "--hostfile", - hostfile.name, - self.exec_path, - *extra_args, - ] - return run_benchmark(cmd, cwd=args.directory) - - -class RStream2(RStream): - compilation_suffix = "rstream2" - - def run(self, args, hyperparameters, run_index): - num_hosts = hyperparameters["num_hosts"] - procs_per_host = hyperparameters["procs_per_host"] - with tempfile.NamedTemporaryFile( - "w", dir=self.config["temp_dir"] - ) as configfile: - config_content = "hosts:\n" - for i, host in enumerate(self.config["known_hosts"][:num_hosts]): - config_content += " - address: %s\n" % host - config_content += " base_port: 10000\n" - config_content += " num_cores: %d\n" % procs_per_host - configfile.write(config_content) - configfile.flush() - logger.debug("Config at %s:\n%s", configfile.name, config_content) - - extra_args = map( - lambda arg: replace_vars(arg, hyperparameters), - args.extra_args, - ) - cmd = [ - self.local_path, - "--remote", - configfile.name, - *extra_args, - ] - return run_benchmark(cmd) - - -class TimelyDataflow(Benchmark): - exec_path = "/tmp/timelybin" - hostfile_path = "/tmp/timely.hosts" - compilation_suffix = "timely" - port_number = 2101 - - def __init__(self, config): - self.config = config - - def prepare(self, args): - logger.info("Preparing Timely Dataflow") - compiler_host = self.config["known_hosts"][0] - source = args.directory - if not source.endswith("/"): - source += "/" - - logger.info("Copying source files from %s to %s", source, compiler_host) - compilation_dir = self.config["compilation_dir"] + "/" + self.compilation_suffix - run_on(compiler_host, "mkdir -p %s" % compilation_dir, shell=True) - sync_on(compiler_host, source, compilation_dir) - - logger.info("Compiling using cargo") - run_on( - compiler_host, - "cd %s && cargo build --release --bin %s" % (compilation_dir, args.bin), - shell=True, - ) - remote_path = "%s/target/release/%s" % (compilation_dir, args.bin) - logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) - - logger.info("Copying executable to this machine") - self.local_path = "%s/%s/timely/%s" % ( - self.config["temp_dir"], - args.experiment, - args.bin, - ) - base = os.path.dirname(self.local_path) - os.makedirs(base, exist_ok=True) - sync_from(compiler_host, remote_path, self.local_path) - - logger.info("Copying executable to all the hosts") - for host in self.config["known_hosts"]: - sync_on(host, self.local_path, self.exec_path) - - def run(self, args, hyperparameters, run_index): - num_hosts = hyperparameters["num_hosts"] - hosts = self.config["known_hosts"][:num_hosts] - procs_per_host = hyperparameters["procs_per_host"] - with tempfile.NamedTemporaryFile("w", dir=self.config["temp_dir"]) as hostfile: - hostfile_content = "" - for host in hosts: - hostfile_content += "%s:%d\n" % (host, self.port_number) - hostfile.write(hostfile_content) - hostfile.flush() - logger.debug("Hostfile at %s:\n%s", hostfile.name, hostfile_content) - - logger.info("Copying hostfile to all the hosts") - for host in hosts: - sync_on(host, hostfile.name, self.hostfile_path) - - extra_args = " ".join( - map( - lambda arg: replace_vars(arg, hyperparameters), - args.extra_args, - ) - ) - - bash = "" - for i, host in enumerate(hosts): - bash += f"ssh {host} {self.exec_path} {extra_args} -w {procs_per_host} -n {num_hosts} -p {i} -h {self.hostfile_path} &\n" - bash += f"wait\n" - - cmd = ["bash", "-c", bash] - return run_benchmark(cmd, cwd=args.directory) - - -class Flink(Benchmark): - def __init__(self, config): - self.config = config - self.flink = self.config["flink_base_path"] - - def prepare(self, args): - logger.info("Preparing Flink") - compiler_host = self.config["known_hosts"][0] - source = args.directory - if not source.endswith("/"): - source += "/" - - logger.info("Copying source files from %s to %s", source, compiler_host) - compilation_dir = self.config["compilation_dir"] - sync_on(compiler_host, source, compilation_dir) - - logger.info("Compiling using mvn") - run_on( - compiler_host, - "cd %s && mvn package" % compilation_dir, - shell=True, - ) - run_on( - compiler_host, - "cd %s/target && ls -lah && file -E ./%s" - % (compilation_dir, args.jar_name), - shell=True, - ) - remote_path = "%s/target/%s" % (compilation_dir, args.jar_name) - logger.info("Executable compiled at %s on %s" % (remote_path, compiler_host)) - - logger.info("Copying executable to this machine") - local_path = "%s/%s/flink/%s" % ( - self.config["temp_dir"], - args.experiment, - args.jar_name, - ) - base = os.path.dirname(local_path) - os.makedirs(base, exist_ok=True) - sync_from(compiler_host, remote_path, local_path) - - def setup_hyp(self, hyperparameters): - logger.info("Stopping the cluster") - subprocess.run([self.flink + "/bin/stop-cluster.sh"], check=True) - - logger.info("Setting up ./conf/workers file") - workers = self.flink + "/conf/workers" - num_hosts = hyperparameters["num_hosts"] - with open(workers, "w") as f: - for host in self.config["known_hosts"][:num_hosts]: - f.write(host + "\n") - - logger.info("Starting the cluster") - subprocess.run([self.flink + "/bin/start-cluster.sh"], check=True) - - def run(self, args, hyperparameters, run_index): - num_hosts = hyperparameters["num_hosts"] - procs_per_host = hyperparameters["procs_per_host"] - parallelism = num_hosts * procs_per_host - local_path = "%s/%s/flink/%s" % ( - self.config["temp_dir"], - args.experiment, - args.jar_name, - ) - extra_args = map( - lambda arg: replace_vars(arg, hyperparameters), - args.extra_args, - ) - cmd = [ - self.flink + "/bin/flink", - "run", - "-p", - str(parallelism), - local_path, - *extra_args, - ] - return run_benchmark(cmd) - - -def run_benchmark(cmd, cwd=None): - logger.debug("Command: %s", shlex.join(cmd)) - out = subprocess.run(cmd, capture_output=True, cwd=cwd) - stdout = out.stdout.decode() - stderr = out.stderr.decode() - if out.returncode != 0: - raise RuntimeError( - "benchmark failed!\nExit code: %d\nStdout:\n%s\nStderr:\n%s" - % (out.returncode, stdout, stderr) - ) - return { - "stdout": stdout, - "stderr": stderr, - } - - -def run_on(host, *cmd, shell=False): - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - client.connect(host) - - if shell: - assert len(cmd) == 1 - cmd = "bash -l -c %s" % shlex.quote(cmd[0]) - else: - cmd = shlex.join(cmd) - logger.debug("Remote on %s: %s", host, cmd) - stdin, stdout, stderr = client.exec_command(cmd) - exit_code = stdout.channel.recv_exit_status() - stdout = stdout.read().decode() - stderr = stderr.read().decode() - if exit_code != 0: - raise RuntimeError( - "Remote command failed on host %s: %s\nStdout:\n%s\nStderr:\n%s" - % (host, cmd, stdout, stderr) - ) - return stdout + stderr - - -def sync_on(host, local_path, remote_path): - remote = host + ":" + remote_path - logger.debug("rsync: %s -> %s", local_path, remote) - subprocess.run( - [ - "rsync", - "-a", - "--copy-links", - "--exclude", - "target", - "--exclude", - "build", - "--exclude", - "data", - local_path, - remote, - ], - check=True, - ) - - -def sync_from(host, remote_path, local_path): - remote = host + ":" + remote_path - logger.debug("rsync: %s -> %s", remote, local_path) - subprocess.run( - ["rsync", "-a", "--exclude", "target", remote, local_path], - check=True, - ) - - -def ping_host(host): - proc = subprocess.run( - "ping %s -c 5 -i 0.2" % host, shell=True, capture_output=True, check=True - ) - stdout = proc.stdout.decode().splitlines() - times = [] - for line in stdout: - if "time=" in line: - time = float(line.split("time=")[1].split(" ")[0]) - times += [time] - return sum(times) / len(times) - - -def test_hosts(config): - logger.info("Testing hosts connection:") - for host in config["known_hosts"]: - model_name = run_on(host, 'lscpu | grep "Model name"', shell=True) - model_name = model_name.split(":")[1].strip() - ping = ping_host(host) - logger.info(" - %s: %s (%.3fms)", host, model_name, ping) - - -def hyperparameter_generator(hyperparameters): - """Yield all the possibile combinations of hyperparameters""" - if isinstance(hyperparameters, dict): - yield from hyperparameter_generator(list(hyperparameters.items())) - return - if not hyperparameters: - yield {} - return - head, *rest = hyperparameters - name, values = head - for value in values: - for next_hyp in hyperparameter_generator(rest): - yield {name: value, **next_hyp} - - -def sanity_check(config): - hyperparameters = config["hyperparameters"] - for name, values in hyperparameters.items(): - if not isinstance(values, list): - raise ValueError("Invalid hyperparameter: %s. It should be a list" % (name)) - if not values: - raise ValueError( - "Invalid hyperparameter: %s. An empty list will void all the " - "combinations, please comment it if you don't want to use that hyperparameter." - % (name) - ) - num_hosts = len(config["known_hosts"]) - max_hosts = max(hyperparameters["num_hosts"]) - if max_hosts > num_hosts: - raise ValueError( - "Invalid hyperparameter: num_hosts. Cannot use %d hosts, %d known" - % (max_hosts, num_hosts) - ) - if len(config["known_hosts"]) != len(set(config["known_hosts"])): - raise ValueError( - "known_hosts contains duplicated host names, this is not supported by MPI" - ) - - -def replace_vars(pattern, vars): - for name, value in vars.items(): - pattern = pattern.replace("{" + name + "}", str(value)) - return pattern - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "experiment", - help="Name of the experiment, used for naming the results file", - ) - parser.add_argument( - "--config", - help="Path to the config.yaml file", - default="config.yaml", - ) - parser.add_argument( - "--overwrite", - help="Overwrite the results instead of skipping", - default=False, - action="store_true", - ) - parser.add_argument( - "--verbose", - "-v", - help="Log more output", - default=0, - action="count", - ) - - subparsers = parser.add_subparsers( - required=True, help="System to test", dest="system" - ) - - mpi = subparsers.add_parser("mpi") - mpi.add_argument( - "directory", - help="Local path to the folder with the MPI project", - ) - mpi.add_argument( - "bin_name", - help="Name of the binary file produced by cmake", - ) - mpi.add_argument( - "extra_args", - help="Additional arguments to pass to the executable", - nargs="*", - ) - - rstream = subparsers.add_parser("rstream") - rstream.add_argument( - "directory", - help="Local path to the folder with the RStream repo", - ) - rstream.add_argument( - "example", - help="Name of the example to compile and run", - ) - rstream.add_argument( - "num_steps", - help="Number of steps of the pipeline: the number of processes to spawn is adjusted based on this.", - type=int, - ) - rstream.add_argument( - "extra_args", - help="Additional arguments to pass to the executable", - nargs="*", - ) - - rstream2 = subparsers.add_parser("rstream2") - rstream2.add_argument( - "directory", - help="Local path to the folder with the RStream2 repo", - ) - rstream2.add_argument( - "example", - help="Name of the example to compile and run", - ) - rstream2.add_argument( - "extra_args", - help="Additional arguments to pass to the executable", - nargs="*", - ) - - flink = subparsers.add_parser("flink") - flink.add_argument( - "directory", - help="Local path to the folder with the flink project", - ) - flink.add_argument( - "jar_name", - help="Name of the jar generated by 'mvn package'", - ) - flink.add_argument( - "extra_args", - help="Additional arguments to pass to the executable", - nargs="*", - ) - - timely = subparsers.add_parser("timely") - timely.add_argument( - "directory", - help="Local path to the folder with the Timely benchmark crate", - ) - timely.add_argument( - "bin", - help="Name of the bin to compile and run", - ) - timely.add_argument( - "extra_args", - help="Additional arguments to pass to the executable", - nargs="*", - ) - - args = parser.parse_args() - - verbosity = args.verbose - coloredlogs.install( - level="DEBUG" if args.verbose > 0 else "INFO", - milliseconds=True, - logger=logger, - fmt="%(asctime)s,%(msecs)03d %(levelname)s %(message)s", - ) - - logger.info("Using configuration file: %s", args.config) - with open(args.config) as f: - config = ruamel.yaml.safe_load(f) - - try: - sanity_check(config) - except ValueError as e: - logger.fatal(e.args[0]) - sys.exit(1) - - hyperparameters = config["hyperparameters"] - num_tests = math.prod([len(hyp) for hyp in hyperparameters.values()]) - logger.info("- %d hyperparameters", len(hyperparameters)) - logger.info("- %d total combinations", num_tests) - - if args.system == "mpi": - runner = MPI(config) - elif args.system == "rstream": - runner = RStream(config) - elif args.system == "rstream2": - runner = RStream2(config) - elif args.system == "flink": - runner = Flink(config) - elif args.system == "timely": - runner = TimelyDataflow(config) - else: - raise ValueError("%s is not supported yet" % args.system) - - test_hosts(config) - - logger.info("Preparation step") - runner.prepare(args) - - for hyp_index, hyp in enumerate(hyperparameter_generator(hyperparameters)): - logger.info("-" * 80) - logger.info("Current hyperparameters (%d / %d):" % (hyp_index + 1, num_tests)) - for name, value in hyp.items(): - logger.info(" - %s: %s", name, value) - logger.info("Running on:") - for host in config["known_hosts"][: hyp["num_hosts"]]: - logger.info(" - %s", host) - logger.info("Setting up...") - runner.setup_hyp(hyp) - - if config["warmup"]: - logger.info("Running a warmup run for caching the input") - try: - result = runner.run(args, hyp, None) - except: - logger.exception("Execution failed!", exc_info=True) - - for run_index in range(config["num_runs"]): - metadata = { - "system": args.system, - "run": run_index, - "experiment": args.experiment, - **hyp, - } - results_dir = replace_vars(config["results_dir"], metadata) - file_name = "-".join( - [ - "{}-{}".format(name, value).replace("/", "_") - for name, value in hyp.items() - ] - ) - file_name += "-run-" + str(run_index) + ".json" - results_file = results_dir + "/" + file_name - logger.info( - "Run %d / %d: %s", run_index + 1, config["num_runs"], results_file - ) - basedir = os.path.dirname(results_file) - os.makedirs(basedir, exist_ok=True) - if os.path.exists(results_file): - if args.overwrite: - logger.warning("Overwriting previous results!") - else: - logger.warning("Results already present, skipping!") - continue - try: - result = runner.run(args, hyp, run_index) - result = { - "hosts": config["known_hosts"][: hyp["num_hosts"]], - "result": result, - **metadata, - } - except: - logger.exception("Execution failed!", exc_info=True) - else: - with open(results_file, "w") as f: - json.dump(result, f, indent=4) - f.write("\n") diff --git a/tools/benchmarks/example.config.yaml b/tools/benchmarks/example.config.yaml deleted file mode 100644 index 72c2eccf..00000000 --- a/tools/benchmarks/example.config.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# The list of known workers -known_hosts: - - StreamWorker1 - - StreamWorker2 - - StreamWorker3 - - StreamWorker4 - -# Name of the directory where to place the execution results -results_dir: "./results/{experiment}/{system}" -# Directory where to put temporary files locally -temp_dir: ./temp -# Directory to use for compiling the binaries in the first worker -compilation_dir: /tmp/compilation -# Directory where flink is installed (with bin/ and conf/ inside) -# It should be the same in the master node as well as in the workers -flink_base_path: /home/ubuntu/flink - -# Whether to do an extra run (for each hyperparameter configuration) ignoring -# the results -warmup: true -# Number of runs from which data is collected -num_runs: 5 - -# List of hyper parameters to try. All the combinations of values will be tried. -hyperparameters: - # Number of hosts to run the benchmark on - num_hosts: [4, 3, 2, 1] - # When using RStream this is the number of "slots" in the hostfile (sources - # and sinks are added automatically). - # In Flink and MPI it's the number of slots assigned to each worker. - procs_per_host: [8] - - # You can add more hyperparameters here! - - # Example: an MPI/OpenMP implementation may spawn "threads_per_proc" threads - # for each process, taking this parameter as command line argument. You can - # use, for example, `-T {threads_per_proc}` as extra arguments to tell the - # program how many threads it is allowed to use. - # threads_per_proc: [4, 8] diff --git a/tools/benchmarks/gen_csv.py b/tools/benchmarks/gen_csv.py deleted file mode 100755 index 4f1943ea..00000000 --- a/tools/benchmarks/gen_csv.py +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import csv -import glob -import json -import logging -import re -import sys - -import coloredlogs - -logger = logging.getLogger("gen_csv") - -regex = re.compile(r"(?:(?P(?:timens|events)):)?(?P[a-zA-Z0-9-_]+):(?P[\d.]+)") - - -def parse_lines(content): - result = {} - for line in content.splitlines(): - match = regex.search(line) - if not match: - continue - typ = match.group("type") - name = match.group("name") - dur = match.group("amount") - if typ == "events": - continue - result[name] = dur - return result - - -def extract_dict(path): - with open(path) as f: - content = json.load(f) - result = { - name: value - for name, value in content.items() - if name not in ["hosts", "result"] - } - result.update(parse_lines(content["result"]["stdout"])) - result.update(parse_lines(content["result"]["stderr"])) - return result - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--verbose", help="Verbose output", action="store_true") - parser.add_argument("results_dir", help="Directory where the results are stored", nargs="+") - parser.add_argument( - "-o", - help="CSV file where to write the results. - for stdout", - dest="output_file", - default="-", - nargs="?", - ) - - args = parser.parse_args() - - coloredlogs.install( - "INFO" if args.verbose else "WARNING", - milliseconds=True, - logger=logger, - fmt="%(asctime)s,%(msecs)03d %(levelname)s %(message)s", - ) - - files = [] - for path in args.results_dir: - files += list(glob.glob(path + "/**/*.json", recursive=True)) - logger.info("%d JSON found", len(files)) - - if not files: - logger.fatal("No JSON found!") - sys.exit(1) - - result = [] - columns = [] - - for path in sorted(files): - logger.info("Processing %s", path) - file_data = extract_dict(path) - for key in file_data.keys(): - if key not in columns: - columns.append(key) - result += [file_data] - - if args.output_file == "-": - output_file = sys.stdout - else: - output_file = open(args.output_file, "w") - writer = csv.DictWriter(output_file, columns) - writer.writeheader() - writer.writerows(result) - output_file.close() diff --git a/tools/benchmarks/latency/.gitignore b/tools/benchmarks/latency/.gitignore deleted file mode 100644 index 03314f77..00000000 --- a/tools/benchmarks/latency/.gitignore +++ /dev/null @@ -1 +0,0 @@ -Cargo.lock diff --git a/tools/benchmarks/latency/Cargo.toml b/tools/benchmarks/latency/Cargo.toml deleted file mode 100644 index fe7822a0..00000000 --- a/tools/benchmarks/latency/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "latency" -version = "0.1.0" -edition = "2018" - -[lib] -proc-macro = true - -[dependencies] -noir = { path = "../../.." } - -[[bin]] -path = "src/main.rs" -name = "latency" \ No newline at end of file diff --git a/tools/benchmarks/latency/src/lib.rs b/tools/benchmarks/latency/src/lib.rs deleted file mode 100644 index 303e4f60..00000000 --- a/tools/benchmarks/latency/src/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -extern crate proc_macro; -use proc_macro::TokenStream; - -#[proc_macro] -pub fn repeat(args: TokenStream) -> TokenStream { - let args = args.to_string(); - let (num, what) = args.split_once(",").unwrap(); - let num: usize = num.parse().unwrap(); - - let (base, what) = what.split_once(",").unwrap(); - let mut res = base.to_string(); - for _ in 0..num { - res += "."; - res += what; - } - res.parse().unwrap() -} diff --git a/tools/benchmarks/latency/src/main.rs b/tools/benchmarks/latency/src/main.rs deleted file mode 100644 index a6dde26a..00000000 --- a/tools/benchmarks/latency/src/main.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::time::{Duration, SystemTime}; - -use noir::operator::source::ParallelIteratorSource; -use noir::{BatchMode, EnvironmentConfig, StreamEnvironment}; - -use latency::repeat; - -fn main() { - let (config, args) = EnvironmentConfig::from_args(); - if args.len() != 5 { - panic!( - "\n\nUsage: num_items items_per_sec batch_size batch_timeout num_threads\n\ - - num_items: number of items to put in the stream\n\ - - items_per_sec: number of items to generate per second (0 = as fast as possible)\n\ - - batch_size: the size of the BatchMode::Fixed or BatchMode::Adaptive\n\ - - batch_timeout: 0 => BatchMode::Fixed, n > 0 => BatchMode::Adaptive (in ms)\n\ - - num_threads: number of generating threads\n\n" - ); - } - - let num_items: usize = args[0].parse().expect("invalid num_items"); - let items_per_sec: u64 = args[1].parse().expect("invalid items_per_sec"); - let batch_size: usize = args[2].parse().expect("invalid batch_size"); - let batch_timeout: u64 = args[3].parse().expect("invalid batch_timeout"); - let num_threads: usize = args[4].parse().expect("invalid num_threads"); - - assert!(num_items >= 1, "num_items must be at least 1"); - assert!(num_threads >= 1, "num_threads must be at least 1"); - - let batch_mode = if batch_timeout == 0 { - BatchMode::fixed(batch_size) - } else { - BatchMode::adaptive(batch_size, Duration::from_millis(batch_timeout)) - }; - - let mut env = StreamEnvironment::new(config); - env.spawn_remote_workers(); - - let source = ParallelIteratorSource::new(move |id, _num_replica| { - let iter = if id < num_threads { - let to_generate = num_items / num_threads; - (to_generate * id)..(to_generate * (id + 1)) - } else { - 0..0 - }; - iter.map(move |i| { - if items_per_sec > 0 { - std::thread::sleep(Duration::from_micros( - 1_000_000 * num_threads as u64 / items_per_sec, - )); - } - i - }) - }); - let stream = env.stream(source).batch_mode(BatchMode::fixed(1)); - - // first shuffle: move the items to a predictable host - let stream = stream - .group_by(|_| 0) - .map(|(_, i)| (0, i, SystemTime::now())) - .batch_mode(batch_mode) - .drop_key(); - - // n-1 shuffles to accumulate latency - let stream = repeat!( - 4, - stream, - map(|(i, n, t)| (i + 1, n, t)) - .group_by(|&(i, _, _)| i) - .drop_key() - ); - - // final shuffle back to the first host - let stream = stream - .map(|(i, n, t)| (i + 1, n, t)) - .group_by(|_| 0) - .drop_key(); - - // compute the time durations; time are accurate because it's the same host (and we ignore clock - // skews) - stream.for_each(|(i, n, start)| { - let duration = start.elapsed().expect("Clock skewed"); - // num steps,item index,latency - eprintln!("{},{},{}", i, n, duration.as_nanos()); - }); - - env.execute(); -} diff --git a/tools/benchmarks/make_plot.py b/tools/benchmarks/make_plot.py deleted file mode 100755 index f7a81ad8..00000000 --- a/tools/benchmarks/make_plot.py +++ /dev/null @@ -1,236 +0,0 @@ -#!/usr/bin/env python3 - -import argparse - -import matplotlib.pyplot as plt - -from aggregate_csv import get_systems, parse_stdin - -SYSTEM_NAMES = { - "mpi": "MPI", - "rstream1": "RStream", - "rstream2": "Noir", - "flink": "Flink", - "timely": "Timely", -} - -SYSTEM_COLORS = { - "mpi": "blue", - "rstream1": "red", - "rstream2": "black", - "flink": "orange", - "timely": "green", -} - - -def get_ids(systems): - ids = set() - for system in systems: - ids |= system.row_ids() - ids = list(sorted(ids)) - cores = [a[0] * a[1] for a in ids] - assert len(cores) == len(set(cores)), "Number of cores is not unique" - return ids, cores - - -def make_time_plot(args, systems): - ids, cores = get_ids(systems) - lines = dict(l.rsplit(":", 1) for l in args.lines) - for system in systems: - exp = system.get_experiments() - for experiment in exp: - x = [] - times = [] - errors = [] - for num_cores, id in zip(cores, ids): - data = system.experiments[experiment].get_data(id) - if data is not None: - x.append(num_cores) - times.append(data[0]) - errors.append(data[1]) - - color = SYSTEM_COLORS[system.system] - label = SYSTEM_NAMES[system.system] - nlines = lines.get(f"{system.system}:{experiment}", None) - linestyle = None - if len(exp) > 1: - if experiment == args.base_experiment: - label = SYSTEM_NAMES[system.system] - elif experiment == args.extra_experiment: - label = f"{label} ({args.extra_experiment_name})" - linestyle = "--" - else: - raise RuntimeError( - f"Experiment {experiment} not passed to --extra-experiment" - ) - if nlines is not None: - label += f" [{nlines}]" - p = plt.errorbar( - x, - times, - yerr=errors, - capsize=3, - label=label, - color=color, - linestyle=linestyle, - ) - p.id = f"{system.system}:{experiment}" - - plt.gca().set_ylim(bottom=0) - plt.xlabel("Number of cores") - plt.xticks(cores) - plt.grid() - plt.ylabel("Execution time (s)") - - -def make_scaling_plot(args, systems): - ids, cores = get_ids(systems) - for system in systems: - exp = system.get_experiments() - for experiment in exp: - baseline_t = None - baseline_c = None - for num_cores, id in zip(cores, ids): - data = system.experiments[experiment].get_data(id) - if data is not None: - baseline_t = data[0] - baseline_c = num_cores - break - - x = [] - scale = [] - error = [] - for num_cores, id in zip(cores, ids): - data = system.experiments[experiment].get_data(id) - if data is not None: - s = (1 / data[0]) / ((1 / baseline_t) / baseline_c) / cores[0] - e = s - ( - (1 / (data[0] - data[1])) - / ((1 / baseline_t) / baseline_c) - / cores[0] - ) - x.append(num_cores) - scale.append(s) - error.append(e) - - color = SYSTEM_COLORS[system.system] - label = SYSTEM_NAMES[system.system] - linestyle = None - if len(exp) > 1: - if experiment == args.base_experiment: - label = SYSTEM_NAMES[system.system] - elif experiment == args.extra_experiment: - label = f"{label} ({args.extra_experiment_name})" - linestyle = "--" - else: - raise RuntimeError( - f"Experiment {experiment} not passed to --extra-experiment" - ) - plt.errorbar( - x, - scale, - yerr=error, - capsize=3, - label=label, - color=color, - linestyle=linestyle, - ) - if args.ideal: - ideal = [c / cores[0] for c in cores] - plt.plot( - cores, - ideal, - color="black", - linestyle="--", - linewidth=0.5, - label="ideal", - ) - - plt.xlabel("Number of cores") - plt.xticks(cores) - plt.grid() - plt.ylabel(f"Speedup") - - -def main(args): - systems = get_systems() - parse_stdin(systems) - - experiments = set() - for system in systems: - experiments |= system.get_experiments() - experiments = list(sorted(experiments)) - - if args.title: - title = args.title - else: - title = " ".join(experiments) - - plt.rc("font", size=14) - plt.rc("axes", titlesize=16) - plt.rc("axes", labelsize=16) - - if args.variant == "time": - make_time_plot(args, systems) - elif args.variant == "scaling": - make_scaling_plot(args, systems) - else: - raise ValueError(f"Unknown variant: {args.variant}") - - plt.title(title) - if not args.order: - plt.legend() - else: - handles, labels = plt.gca().get_legend_handles_labels() - reorder = [args.order.index(h.id) for h in handles] - reorder_rev = [0 for _ in reorder] - for i, r in enumerate(reorder): - reorder_rev[r] = i - plt.legend([handles[i] for i in reorder_rev], [labels[i] for i in reorder_rev]) - - # for h in handles: - # print(h.id) - # print(handles) - # print(labels) - if args.output: - plt.tight_layout() - plt.savefig(args.output) - else: - plt.show() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Aggregate the output of gen_csv.py and produce a plot" - ) - parser.add_argument("variant", choices=["time", "scaling"]) - parser.add_argument("--output", "-o", help="Where to save the plot") - parser.add_argument("--title", "-t", help="Title of the plot") - parser.add_argument( - "--size", - "-s", - help="Size of the dataset (for thoughput)", - type=int, - ) - parser.add_argument("--unit", "-u", help="Unit of the size") - parser.add_argument("--ideal", help="Display the ideal line", action="store_true") - parser.add_argument("--base-experiment", help="Name of the base experiment") - parser.add_argument("--extra-experiment", help="Name of the extra experiment") - parser.add_argument( - "--extra-experiment-name", - help="Name to give to the extra experiment", - ) - parser.add_argument( - "--lines", - help=":: lines of code for experiment in system ", - nargs="+", - default=[], - ) - parser.add_argument( - "--order", - help=": in the order of the legend", - nargs="+", - default=[], - ) - args = parser.parse_args() - main(args) diff --git a/tools/benchmarks/plots/Makefile b/tools/benchmarks/plots/Makefile deleted file mode 100644 index d2069b47..00000000 --- a/tools/benchmarks/plots/Makefile +++ /dev/null @@ -1,92 +0,0 @@ -TARGET = target -RESULTS = ../results - -EXPERIMENTS = \ - car-accidents \ - connected-components \ - enum-triangles \ - kmeans \ - kmeans-noflink \ - pagerank \ - transitive-closure \ - wordcount-gutenberg wordcount-randomwords \ - wordcount-assoc-gutenberg \ - wordcount-windowed-gutenberg wordcount-windowed-randomwords - -DEP_car-accidents := $(RESULTS)/car-accidents* -TITLE_car-accidents := "Car Accidents" -BASE_car-accidents := car-accidents -EXTRA_car-accidents := car-accidents-shared -EXTRANAME_car-accidents := shared - -DEP_connected-components := $(RESULTS)/connected-components -TITLE_connected-components := "Connected Components" -BASE_connected-components := connected-components - -DEP_enum-triangles := $(RESULTS)/enum-triangles/{flink,rstream2} $(RESULTS)/enum-triangles-new -TITLE_enum-triangles := "Enum Triangles" -BASE_enum-triangles := enum-triangles - -DEP_kmeans := $(RESULTS)/kmeans-30c-15it-200M -TITLE_kmeans := "K-Means (k = 30, h = 15, n = 10M)" -BASE_kmeans := kmeans - -DEP_kmeans-noflink := $(RESULTS)/kmeans-30c-15it-200M/{mpi,rstream*} -TITLE_kmeans-noflink := "K-Means (k = 30, h = 15, n = 10M)" -BASE_kmeans-noflink := kmeans - -DEP_pagerank := $(RESULTS)/pagerank -TITLE_pagerank := "PageRank" -BASE_pagerank := pagerank - -DEP_transitive-closure := $(RESULTS)/transitive-closure -TITLE_transitive-closure := "Transitive Closure" -BASE_transitive-closure := transitive-closure - -DEP_wordcount-gutenberg := $(RESULTS)/wordcount/{rstream2,timely} $(RESULTS)/wordcount-fixed-new -TITLE_wordcount-gutenberg := "Non-associative Wordcount (Gutenberg)" -BASE_wordcount-gutenberg := wordcount - -DEP_wordcount-randomwords := $(RESULTS)/wordcount-randomwords -TITLE_wordcount-randomwords := "Non-associative Wordcount (Randomwords)" -BASE_wordcount-randomwords := wordcount - -DEP_wordcount-assoc-gutenberg := $(RESULTS)/wordcount-assoc -TITLE_wordcount-assoc-gutenberg := "Associative Wordcount (Gutenberg)" -BASE_wordcount-assoc-gutenberg := wordcount-assoc - -DEP_wordcount-windowed-gutenberg := $(RESULTS)/wordcount-windowed/rstream2 $(RESULTS)/wordcount-windowed/mpi $(RESULTS)/wordcount-windowed/flink $(RESULTS)/wordcount-windowed-new -TITLE_wordcount-windowed-gutenberg := "Windowed Wordcount (Gutenberg)" -BASE_wordcount-windowed-gutenberg := wordcount - -DEP_wordcount-windowed-randomwords := $(RESULTS)/wordcount-windowed-randomwords/rstream2 $(RESULTS)/wordcount-windowed-randomwords/mpi $(RESULTS)/wordcount-windowed-randomwords/flink $(RESULTS)/wordcount-windowed-randomwords-fixed -TITLE_wordcount-windowed-randomwords := "Windowed Wordcount (Randomwords)" -BASE_wordcount-windowed-randomwords := wordcount - -PDF = $(EXPERIMENTS:%=$(TARGET)/%/time.pdf) $(EXPERIMENTS:%=$(TARGET)/%/scaling.pdf) - -all: $(PDF) - -$(TARGET)/%/: - mkdir -p $@ - -$(TARGET)/%/time.pdf: $(TARGET)/%/ - ../gen_csv.py $(DEP_$*) | ../make_plot.py time -o $@ \ - -t $(TITLE_$*) \ - --base-experiment "$(BASE_$*)" \ - --extra-experiment "$(EXTRA_$*)" \ - --extra-experiment-name "$(EXTRANAME_$*)" - -$(TARGET)/%/scaling.pdf: $(TARGET)/%/ - ../gen_csv.py $(DEP_$*) | ../make_plot.py scaling -o $@ \ - --ideal \ - -t $(TITLE_$*) \ - --base-experiment "$(BASE_$*)" \ - --extra-experiment "$(EXTRA_$*)" \ - --extra-experiment-name "$(EXTRANAME_$*)" - -clean: - rm -rf $(TARGET) - -.PHONY: all clean -.PRECIOUS: $(TARGET)/%/ diff --git a/tools/datasets/gen_words.py b/tools/datasets/gen_words.py deleted file mode 100755 index 8dce5a63..00000000 --- a/tools/datasets/gen_words.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import collections -import random -import string -import sys - -ALPHABET = string.ascii_lowercase - - -def main(filesize, num_words, word_length, words_per_line): - words = "".join(random.choices(ALPHABET, k=num_words + word_length)) - word_distribution = collections.Counter() - - def generate_word(): - start = random.randint(0, num_words - 1) - length = word_length - word = words[start:start + length] - word_distribution.update([word]) - return word - - def gen_line(): - words = [generate_word() for _ in range(words_per_line)] - return " ".join(words) - - size = 0 - lines = 0 - while size < filesize * 1024 * 1024: - line = gen_line() - size += len(line) + 1 - lines += 1 - print(line) - - freq_histogram = collections.Counter(word_distribution.values()) - - print(f"{lines} lines", file=sys.stderr) - print(f"{len(word_distribution)} unique words", file=sys.stderr) - print("Most frequent words", word_distribution.most_common(5), file=sys.stderr) - - max_freq = freq_histogram.most_common(1)[0][1] - print("word frequency vs number of words with that frequency", file=sys.stderr) - for freq, amount in sorted(freq_histogram.items()): - length = int(amount / max_freq * 30) - print(f"{freq:2} ({amount:8} times) ", "*" * length, file=sys.stderr) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument("filesize", help="dimension of the output in MiB", type=int) - parser.add_argument("--num-words", help="average number of unique words to generate", type=int, default=100_000) - parser.add_argument("--word-length", help="number of characters per word", type=int, default=8) - parser.add_argument("--words-per-line", help="average number of words per line", type=int, default=8) - - args = parser.parse_args() - main(args.filesize, args.num_words, args.word_length, args.words_per_line) diff --git a/tools/index.html b/tools/index.html deleted file mode 100644 index d34d3783..00000000 --- a/tools/index.html +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - Noir - - -

Noir

- - - - diff --git a/tools/run_tests.sh b/tools/run_tests.sh deleted file mode 100755 index f8ea3f4a..00000000 --- a/tools/run_tests.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env bash - -# This is a wrapper that sets some environment variables triggering more in depth tests - -export RSTREAM_TEST_LOCAL_CORES=1,2,4,8 -export RSTREAM_TEST_REMOTE_HOSTS=1,2,4 -export RSTREAM_TEST_REMOTE_CORES=1,2,4,8 -export RSTREAM_TEST_TIMEOUT=10 - -cargo test "$@" \ No newline at end of file diff --git a/tools/visualizer/css/style.css b/tools/visualizer/css/style.css deleted file mode 100644 index cbb11d51..00000000 --- a/tools/visualizer/css/style.css +++ /dev/null @@ -1,55 +0,0 @@ -body, html { - min-width: 100%; - min-height: 100%; - height: 100%; - width: 100%; - margin: 0; - padding: 0; - display: block; -} -#main { - height: 100%; - width: 100%; - display: grid; - grid-template-rows: 40px auto 200px; - grid-template-columns: auto 400px; - grid-template-areas: - "toolbar toolbar" - "network details" - "graph graph"; -} - -#toolbar { - grid-area: toolbar; - padding: 5px; - border-bottom: 1px solid; -} -#network { - grid-area: network; - font-size: 0; - overflow: hidden; -} -#details { - grid-area: details; - padding: 0 20px; - border-left: 1px solid; - overflow: auto; -} -#graph { - grid-area: graph; - border-top: 1px solid; -} - -.svg-container { - width: 100%; - height: 100%; -} -.svg-content { - width: 100%; - height: 100%; -} -.grid line { - stroke: lightgrey; - stroke-opacity: 0.7; - shape-rendering: crispEdges; -} \ No newline at end of file diff --git a/tools/visualizer/index.html b/tools/visualizer/index.html deleted file mode 100644 index 21c9f639..00000000 --- a/tools/visualizer/index.html +++ /dev/null @@ -1,50 +0,0 @@ - - - - - - - Noir visualizer - - - -
-
- Select a file: - - - - - - - - - - - - - - -
-
-
-
-
-
-
-

Details

-
-
-
-
-
-
-
-
- - - - - - - diff --git a/tools/visualizer/js/data.js b/tools/visualizer/js/data.js deleted file mode 100644 index 2ac7a396..00000000 --- a/tools/visualizer/js/data.js +++ /dev/null @@ -1,602 +0,0 @@ -const detailsContent = $("#details-content"); - -const drawJobGraph = (structures, profiler) => { - resetGraph(); - resetNetwork(); - const [nodes, links] = buildJobGraph(structures, profiler); - drawNetwork(nodes, links); -}; - -const drawExecutionGraph = (structures, profiler) => { - resetGraph(); - resetNetwork(); - const [nodes, links] = buildExecutionGraph(structures, profiler); - drawNetwork(nodes, links); -}; - -const formatBytes = (bytes) => { - const fmt = d3.format('.1f'); - if (bytes < 1024) return `${fmt(bytes)}B`; - if (bytes < 1024*1024) return `${fmt(bytes / 1024)}KiB`; - if (bytes < 1024*1024*1024) return `${fmt(bytes / 1024 / 1024)}MiB`; - return `${fmt(bytes / 1024 / 1024 / 1024)}GiB`; -}; - -const formatNumber = (num) => { - return num.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ","); -} - -const makeLink = (text, onclick) => { - return $("") - .attr("href", "#") - .on("click", (e) => { - e.preventDefault(); - onclick(); - }) - .text(text); -} - -const buildJobGraph = (structures, profiler) => { - - const drawOperatorDetails = (block_id, operator, replicas, linkMetrics) => { - detailsContent.html(""); - detailsContent.append( - $("

") - .append($("").text("Operator: ")) - .append($("").text(operator.title)) - ); - if (operator.subtitle) { - detailsContent.append( - $("

") - .append($("").text(operator.subtitle)) - ); - } - - const hostCounts = {}; - for (const {host_id} of replicas) { - if (!(host_id in hostCounts)) hostCounts[host_id] = 0; - hostCounts[host_id] += 1; - } - detailsContent.append($("

").append($("").text("Replicated at:"))); - const replicasList = $("

    "); - for (const [host_id, count] of Object.entries(hostCounts)) { - replicasList.append( - $("
  • ") - .append($("").text(`Host${host_id}`)) - .append(` × ${count}`)); - } - detailsContent.append(replicasList); - - detailsContent.append( - $("

    ") - .append($("").text("Produces: ")) - .append($("").text(operator.out_type)) - ); - if (operator.connections.length > 0) { - const list = $("

      "); - for (const connection of operator.connections) { - const to_block_id = connection.to_block_id; - const li = $("
    • ") - .append("Block " + to_block_id + " sending ") - .append($("").text(connection.data_type)) - .append(" with strategy ") - .append($("").text(connection.strategy)); - const key = ChannelMetric.blockPairKey(block_id, connection.to_block_id); - if (key in linkMetrics.items_out) { - const drawMessages = () => { - drawProfilerGraph(linkMetrics.items_out[key].series, `Items/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatNumber(v)); - }; - const drawNetworkMessages = () => { - drawProfilerGraph(linkMetrics.net_messages_out[key].series, `Network messages/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(linkMetrics.net_bytes_out[key].series, `Network bytes/s in ${block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - const total = linkMetrics.items_out[key].series.total; - li.append(": ") - .append(makeLink(`${formatNumber(total)} items sent`, () => drawMessages())); - - drawMessages(); - if (key in linkMetrics.net_messages_out) { - const numMex = linkMetrics.net_messages_out[key].series.total; - const bytes = linkMetrics.net_bytes_out[key].series.total; - li - .append(" (in ") - .append(makeLink(`${numMex} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) - .append(")"); - - } - } - list.append(li); - } - detailsContent.append($("

      ") - .append($("").text("Connects to: ")) - .append(list)); - } - if (operator.receivers.length > 0) { - const list = $("

        "); - for (const receiver of operator.receivers) { - const from_block_id = receiver.previous_block_id; - const li = $("
      • ") - .append("Block " + from_block_id + " receiving ") - .append($("").text(receiver.data_type)); - const key = ChannelMetric.blockPairKey(receiver.previous_block_id, block_id); - if (key in linkMetrics.items_in) { - const drawMessages = () => { - drawProfilerGraph(linkMetrics.items_in[key].series, `Items/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries, (v) => formatNumber(v)); - }; - const drawNetworkMessages = () => { - drawProfilerGraph(linkMetrics.net_messages_in[key].series, `Network messages/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(linkMetrics.net_bytes_in[key].series, `Network bytes/s in ${from_block_id} → ${block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - const total = linkMetrics.items_in[key].series.total; - li.append(": ") - .append(makeLink(`${formatNumber(total)} items received`, () => drawMessages())); - - drawMessages(); - if (key in linkMetrics.net_messages_in) { - const numMex = linkMetrics.net_messages_in[key].series.total; - const bytes = linkMetrics.net_bytes_in[key].series.total; - li - .append(" (in ") - .append(makeLink(`${numMex} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) - .append(")"); - - } - } - list.append(li); - } - detailsContent.append($("

        ") - .append($("").text("Receives data from: ")) - .append(list)); - } - }; - - const drawLinkDetails = (from_block_id, connection, linkMetrics) => { - const to_block_id = connection.to_block_id; - detailsContent.html(""); - detailsContent.append( - $("

        ") - .append($("").text("Connection: ")) - .append($("").text(`${from_block_id} → ${to_block_id}`)) - ); - detailsContent.append( - $("

        ") - .append($("").text("Data type: ")) - .append($("").text(connection.data_type)) - ); - detailsContent.append( - $("

        ") - .append($("").text("Strategy: ")) - .append($("").text(connection.strategy)) - ); - - const metricsKey = ChannelMetric.blockPairKey(from_block_id, to_block_id); - if (metricsKey in linkMetrics.net_messages_in) { - const message = linkMetrics.net_messages_in[metricsKey].series.total; - const bytes = linkMetrics.net_bytes_in[metricsKey].series.total; - const drawNetworkMessages = () => { - drawProfilerGraph(linkMetrics.net_messages_in[metricsKey].series, `Network messages/s in ${from_block_id} → ${to_block_id}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(linkMetrics.net_bytes_in[metricsKey].series, `Network bytes/s in ${from_block_id} → ${to_block_id}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - detailsContent.append($("

        ") - .append($("").text("Traffic: ")) - .append(makeLink(`${message} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(bytes)}`, () => drawNetworkBytes())) - .append(` (${formatBytes(bytes/message)}/message)`)); - drawNetworkBytes(); - } - } - - const linkMetrics = { - items_in: profiler.channel_metrics.items_in.groupByBlockId(), - items_out: profiler.channel_metrics.items_out.groupByBlockId(), - net_messages_in: profiler.channel_metrics.net_messages_in.groupByBlockId(), - net_messages_out: profiler.channel_metrics.net_messages_out.groupByBlockId(), - net_bytes_in: profiler.channel_metrics.net_bytes_in.groupByBlockId(), - net_bytes_out: profiler.channel_metrics.net_bytes_out.groupByBlockId(), - }; - - const byBlockId = {}; - const blockReplicas = {}; - for (const entry of structures) { - const [coord, structure] = entry; - const block_id = coord["block_id"]; - byBlockId[block_id] = structure; - if (!(block_id in blockReplicas)) blockReplicas[block_id] = []; - blockReplicas[block_id].push(coord); - } - const nodes = []; - const links = []; - const receivers = {}; - - const operatorId = (block_id, index) => { - return 100000 + block_id * 1000 + index; - }; - - const maxChannelBytes = Math.max(...Object.values(linkMetrics.net_bytes_in).map((d) => d.series.total)); - const linkWidth = (from_block_id, to_block_id) => { - const key = ChannelMetric.blockPairKey(from_block_id, to_block_id); - const minWidth = 1; - const maxWidth = 3; - const metric = linkMetrics.net_bytes_in[key]; - if (!metric) return minWidth; - const value = metric.series.total; - return minWidth + (maxWidth - minWidth) * (value / maxChannelBytes); - } - - for (const [block_id, structure] of Object.entries(byBlockId)) { - const block = { - id: block_id, - data: { - text: "Block " + block_id - }, - children: structure.operators.map((operator, index) => { - return { - id: operatorId(block_id, index), - data: { - text: operator["title"], - onclick: () => drawOperatorDetails(block_id, operator, blockReplicas[block_id], linkMetrics) - } - }; - }) - }; - nodes.push(block); - structure.operators.map((operator, index) => { - if (index < structure.operators.length - 1) { - links.push({ - source: operatorId(block_id, index), - target: operatorId(block_id, index+1), - data: { - text: operator.out_type, - } - }) - } - for (const receiver of operator.receivers) { - const prev_block_id = receiver.previous_block_id; - if (!(prev_block_id in receivers)) receivers[prev_block_id] = {}; - receivers[prev_block_id][block_id] = index; - } - }); - } - for (const [block_id, structure] of Object.entries(byBlockId)) { - structure.operators.map((operator, index) => { - for (const connection of operator.connections) { - const receiverIndex = receivers[block_id][connection.to_block_id]; - const source = operatorId(block_id, index); - const target = operatorId(connection.to_block_id, receiverIndex); - links.push({ - source, - target, - data: { - type: "solid", - text: connection.data_type, - onclick: () => drawLinkDetails(block_id, connection, linkMetrics), - width: linkWidth(block_id, connection.to_block_id), - } - }) - } - }); - } - - return [nodes, links]; -}; - -const buildExecutionGraph = (structures, profiler) => { - - const formatCoord = (coord) => { - return `Host${coord.host_id} Block${coord.block_id} Replica${coord.replica_id}`; - } - - const drawOperatorDetails = (coord, index, operator) => { - detailsContent.html(""); - detailsContent.append( - $("

        ") - .append($("").text("Operator: ")) - .append($("").text(operator.title)) - ); - if (operator.subtitle) { - detailsContent.append( - $("

        ") - .append($("").text(operator.subtitle)) - ); - } - - detailsContent.append( - $("

        ") - .append($("").text("At: ")) - .append($("").text(formatCoord(coord))) - ); - - detailsContent.append( - $("

        ") - .append($("").text("Produces: ")) - .append($("").text(operator.out_type)) - ); - const id = operatorId(coord.host_id, coord.block_id, coord.replica_id, index); - const conn = connections[id]; - if (conn) { - const list = $("

          "); - for (const {to, connection} of conn) { - const li = $("
        • ") - .append($("").text(formatCoord(to))) - .append(" sending ") - .append($("").text(connection.data_type)) - .append(" with strategy ") - .append($("").text(connection.strategy)); - const key = ChannelMetric.coordPairKey(coord, to); - if (key in profiler.channel_metrics.items_out.data) { - const messages = profiler.channel_metrics.items_out.data[key]; - const net_messages = profiler.channel_metrics.net_messages_out.data[key]; - const net_bytes = profiler.channel_metrics.net_bytes_out.data[key]; - const drawMessages = () => { - drawProfilerGraph(messages.series, `Items/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries, (v) => formatNumber(v)); - }; - const drawNetworkMessages = () => { - drawProfilerGraph(net_messages.series, `Network messages/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(net_bytes.series, `Network bytes/s in ${formatCoord(coord)} → ${formatCoord(to)}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - const total = messages.series.total; - li.append(": ") - .append(makeLink(`${formatNumber(total)} items sent`, () => drawMessages())); - - drawMessages(); - if (net_messages) { - li - .append(" (in ") - .append(makeLink(`${net_messages.series.total} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(net_bytes.series.total)}`, () => drawNetworkBytes())) - .append(")"); - - } - } - list.append(li); - } - detailsContent.append($("

          ") - .append($("").text("Connects to: ")) - .append(list)); - } - const recv_conn = recv_connections[id]; - if (recv_conn) { - const list = $("

            "); - for (const {from, connection} of recv_conn) { - const li = $("
          • ") - .append($("").text(formatCoord(from))) - .append(" receiving ") - .append($("").text(connection.data_type)); - const key = ChannelMetric.coordPairKey(from, coord); - if (key in profiler.channel_metrics.items_in.data) { - const messages = profiler.channel_metrics.items_in.data[key]; - const net_messages = profiler.channel_metrics.net_messages_in.data[key]; - const net_bytes = profiler.channel_metrics.net_bytes_in.data[key]; - const drawMessages = () => { - drawProfilerGraph(messages.series, `Items/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries, (v) => formatNumber(v)); - }; - const drawNetworkMessages = () => { - drawProfilerGraph(net_messages.series, `Network messages/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(net_bytes.series, `Network bytes/s in ${formatCoord(from)} → ${formatCoord(coord)}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - li.append(": ") - .append(makeLink(`${formatNumber(messages.series.total)} items received`, () => drawMessages())); - - drawMessages(); - if (net_messages) { - li - .append(" (in ") - .append(makeLink(`${net_messages.series.total} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(net_bytes.series.total)}`, () => drawNetworkBytes())) - .append(")"); - - } - } - list.append(li); - } - detailsContent.append($("

            ") - .append($("").text("Receives data from: ")) - .append(list)); - } - }; - - const drawLinkDetails = (from, to, connection) => { - detailsContent.html(""); - const coordPair = `${formatCoord(from)} → ${formatCoord(to)}`; - detailsContent.append( - $("

            ") - .append($("").text("Connection: ")) - .append($("").text(coordPair)) - ); - detailsContent.append( - $("

            ") - .append($("").text("Data type: ")) - .append($("").text(connection.data_type)) - ); - detailsContent.append( - $("

            ") - .append($("").text("Strategy: ")) - .append($("").text(connection.strategy)) - ); - - const metricsKey = ChannelMetric.coordPairKey(from, to); - if (metricsKey in profiler.channel_metrics.net_messages_in.data) { - const messages = profiler.channel_metrics.net_messages_in.data[metricsKey].series; - const bytes = profiler.channel_metrics.net_bytes_in.data[metricsKey].series; - const drawNetworkMessages = () => { - drawProfilerGraph(messages, `Network messages/s in ${coordPair}`, profiler.iteration_boundaries); - }; - const drawNetworkBytes = () => { - drawProfilerGraph(bytes, `Network bytes/s in ${coordPair}`, profiler.iteration_boundaries, (v) => formatBytes(v)+"/s"); - }; - detailsContent.append($("

            ") - .append($("").text("Traffic: ")) - .append(makeLink(`${messages.total} messages`, () => drawNetworkMessages())) - .append(", for a ") - .append(makeLink(`total of ${formatBytes(bytes.total)}`, () => drawNetworkBytes())) - .append(` (${formatBytes(bytes.total/messages.total)}/message)`)); - drawNetworkBytes(); - } - } - - const byHostId = {}; - for (const entry of structures) { - const [coord, structure] = entry; - const host_id = coord["host_id"]; - const block_id = coord["block_id"]; - const replica_id = coord["replica_id"]; - if (!(host_id in byHostId)) byHostId[host_id] = {}; - if (!(block_id in byHostId[host_id])) byHostId[host_id][block_id] = []; - byHostId[host_id][block_id][replica_id] = structure; - } - - const nodes = []; - const links = []; - const receivers = {}; - const connections = {}; - const recv_connections = {}; - - const hostId = (host_id) => { - return `h${host_id}`; - }; - const blockId = (host_id, block_id, replica_id) => { - return `h${host_id}b${block_id}r${replica_id}`; - }; - const operatorId = (host_id, block_id, replica_id, index) => { - return `h${host_id}b${block_id}r${replica_id}o${index}`; - }; - - const maxChannelBytes = Math.max(...Object.values(profiler.channel_metrics.net_bytes_in.data).map((d) => d.series.total)); - const linkWidth = (from, to) => { - const key = ChannelMetric.coordPairKey(from, to); - const minWidth = 1; - const maxWidth = 3; - const metric = profiler.channel_metrics.net_bytes_in.data[key]; - if (!metric) return minWidth; - const value = metric.series.total; - return minWidth + (maxWidth - minWidth) * (value / maxChannelBytes); - } - - const buildOperatorNode = (host_id, block_id, replica_id, operator_index, structure) => { - for (const receiver of structure.receivers) { - const prev_block_id = receiver.previous_block_id; - if (!(prev_block_id in receivers)) receivers[prev_block_id] = {}; - if (!(block_id in receivers[prev_block_id])) receivers[prev_block_id][block_id] = []; - receivers[prev_block_id][block_id].push([host_id, block_id, replica_id, operator_index]); - } - return { - id: operatorId(host_id, block_id, replica_id, operator_index), - data: { - text: structure["title"], - onclick: () => drawOperatorDetails({host_id, block_id, replica_id}, operator_index, structure) - } - } - }; - - const buildBlockNode = (host_id, block_id, structures) => { - return structures.map((replica, replica_id) => { - const node = { - id: blockId(host_id, block_id, replica_id), - data: { - text: `Block ${block_id} / Replica ${replica_id}`, - }, - children: replica["operators"].map((operator, index) => buildOperatorNode(host_id, block_id, replica_id, index, operator)) - }; - // in-block connections - replica.operators.map((operator, index) => { - if (index < replica.operators.length - 1) { - links.push({ - source: operatorId(host_id, block_id, replica_id, index), - target: operatorId(host_id, block_id, replica_id, index+1), - data: { - text: operator.out_type, - } - }) - } - }); - return node; - }); - }; - - const buildHostNode = (host_id, blocks) => { - const node = { - id: hostId(host_id), - data: { - text: `Host ${host_id}` - }, - children: Object.entries(blocks).flatMap(([block_id, structures]) => { - return buildBlockNode(host_id, block_id, structures); - }), - }; - nodes.push(node); - }; - - for (const [host_id, blocks] of Object.entries(byHostId)) { - buildHostNode(host_id, blocks); - } - - for (const [host_id, blocks] of Object.entries(byHostId)) { - for (const [block_id, replicas] of Object.entries(blocks)) { - replicas.map((replica, replica_id) => { - replica.operators.map((operator, index) => { - for (const connection of operator.connections) { - const {to_block_id, data_type, strategy} = connection; - const recv = receivers[block_id][to_block_id]; - const addLink = (to) => { - const fromCoord = {host_id, block_id, replica_id} - const fromId = operatorId(host_id, block_id, replica_id, index); - const toCoord = {host_id: to[0], block_id: to[1], replica_id: to[2]}; - const toId = operatorId(...to); - if (!(fromId in connections)) connections[fromId] = []; - connections[fromId].push({to: toCoord, connection}); - if (!(toId in recv_connections)) recv_connections[toId] = []; - recv_connections[toId].push({from: fromCoord, connection}); - links.push({ - source: fromId, - target: toId, - data: { - text: data_type, - width: linkWidth(fromCoord, toCoord), - onclick: () => drawLinkDetails(fromCoord, toCoord, connection), - } - }); - }; - - if (strategy === "OnlyOne") { - if (recv.length === 1) { - addLink(recv[0]); - } else { - for (const r of recv) { - const [host_id2, block_id2, replica_id2, operator_id2] = r; - if (host_id === host_id2 && replica_id === replica_id2) { - addLink(r); - } - } - } - } else if (strategy === "GroupBy" || strategy === "Random") { - for (const r of recv) { - addLink(r); - } - } else { - throw Error("Invalid strategy: " + strategy); - } - } - }); - }); - } - } - - return [nodes, links]; -}; \ No newline at end of file diff --git a/tools/visualizer/js/graph.js b/tools/visualizer/js/graph.js deleted file mode 100644 index 9046cf3a..00000000 --- a/tools/visualizer/js/graph.js +++ /dev/null @@ -1,117 +0,0 @@ -const BUCKET_RESOLUTION = 20; -const BUCKET_MERGE_FACTOR = 5; - -const graphContainerId = "graph-content"; - -const resetGraph = () => { - d3.select("#" + graphContainerId).select("svg").remove(); -}; - -const drawProfilerGraph = (series, title, iteration_boundaries, yFormat) => { - if (!yFormat) yFormat = (d) => d; - - const iterToIndex = {}; - let maxIterTime = 0; - Object.entries(iteration_boundaries.data).map(([block_id, times], index) => { - iterToIndex[block_id] = index; - maxIterTime = Math.max(maxIterTime, times[times.length - 1]); - }); - - const container = document.getElementById(graphContainerId); - const svgWidth = container.clientWidth; - const svgHeight = container.clientHeight; - const [left, right, top, bottom] = [60, 10, 20, 30]; - - resetGraph(); - const svg = d3.select("#" + graphContainerId) - .append("svg") - .attr("width", svgWidth) - .attr("height", svgHeight); - - const root = svg.append("g"); - - const rawData = series.asList(); - const data = []; - const scaleFactor = 1000 / BUCKET_RESOLUTION / BUCKET_MERGE_FACTOR; - let bucketTime = 0; - let bucketValue = 0; - for (let i = 0; i < rawData.length; i++) { - const [time, value] = rawData[i]; - if (time < bucketTime + BUCKET_RESOLUTION * BUCKET_MERGE_FACTOR) { - bucketValue += value; - } else { - if (bucketValue > 0) - data.push([bucketTime, bucketValue * scaleFactor]); - bucketTime = time; - bucketValue = value; - } - } - data.push([bucketTime, bucketValue * scaleFactor]); - - const x = d3.scaleLinear() - .domain([0, Math.max(d3.max(data, (d) => d[0]), maxIterTime)]) - .range([left, svgWidth-right]); - // x-axis - root - .append("g") - .attr("transform", `translate(0,${svgHeight - bottom})`) - .call(d3.axisBottom(x).ticks(5)); - // vertical grid - root - .append("g") - .attr("transform", `translate(0,${svgHeight - bottom})`) - .attr("class", "grid") - .call(d3 - .axisBottom(x) - .tickSize(-(svgHeight - bottom - top)) - .tickFormat("") - .ticks(5)); - for (const [block_id, times] of Object.entries(iteration_boundaries.data)) { - const color = d3.schemeCategory20[iterToIndex[block_id]]; - for (const time of times) { - const xPos = x(time); - root.append("line") - .attr("stroke", color) - .attr("x1", xPos) - .attr("x2", xPos) - .attr("y1", top) - .attr("y2", svgHeight-bottom); - } - } - - const y = d3.scaleLinear() - .domain([0, d3.max(data, (d) => d[1])]) - .range([svgHeight - bottom, top]); - // y-axis - root - .append("g") - .attr("transform", `translate(${left},0)`) - .call(d3.axisLeft(y).ticks(10).tickFormat(yFormat)); - // horizontal grid - root - .append("g") - .attr("transform", `translate(${left},0)`) - .attr("class", "grid") - .call(d3 - .axisLeft(y) - .tickSize(-(svgWidth - left - right)) - .tickFormat("") - .ticks(10)); - - root.append("path") - .datum(data) - .attr("fill", "none") - .attr("stroke", "blue") - .attr("stroke-width", 1.5) - .attr("d", d3.line() - .x((d) => x(d[0])) - .y((d) => y(d[1]))) - - root - .append("text") - .attr("text-anchor", "middle") - .attr("alignment-baseline", "middle") - .attr("x", svgWidth / 2) - .attr("y", top / 2) - .text(title); -} \ No newline at end of file diff --git a/tools/visualizer/js/network.js b/tools/visualizer/js/network.js deleted file mode 100644 index b20c6fb7..00000000 --- a/tools/visualizer/js/network.js +++ /dev/null @@ -1,519 +0,0 @@ -const constant = (x) => () => x; - -function rectCollide() { - var nodes, sizes, masses - var size = constant([0, 0]) - var strength = 1 - var iterations = 1 - - function force() { - var node, size, mass, xi, yi - var i = -1 - while (++i < iterations) { iterate() } - - function iterate() { - var j = -1 - var tree = d3.quadtree(nodes, xCenter, yCenter).visitAfter(prepare) - - while (++j < nodes.length) { - node = nodes[j] - size = sizes[j] - mass = masses[j] - xi = xCenter(node) - yi = yCenter(node) - - tree.visit(apply) - } - } - - function apply(quad, x0, y0, x1, y1) { - var data = quad.data - var xSize = (size[0] + quad.size[0]) / 2 - var ySize = (size[1] + quad.size[1]) / 2 - if (data) { - if (data.index <= node.index) { return } - - var x = xi - xCenter(data) - var y = yi - yCenter(data) - var xd = Math.abs(x) - xSize - var yd = Math.abs(y) - ySize - - if (xd < 0 && yd < 0) { - var l = Math.sqrt(x * x + y * y) - var m = masses[data.index] / (mass + masses[data.index]) - - if (Math.abs(xd) < Math.abs(yd)) { - node.vx -= (x *= xd / l * strength) * m - data.vx += x * (1 - m) - } else { - node.vy -= (y *= yd / l * strength) * m - data.vy += y * (1 - m) - } - } - } - - return x0 > xi + xSize || y0 > yi + ySize || - x1 < xi - xSize || y1 < yi - ySize - } - - function prepare(quad) { - if (quad.data) { - quad.size = sizes[quad.data.index] - } else { - quad.size = [0, 0] - var i = -1 - while (++i < 4) { - if (quad[i] && quad[i].size) { - quad.size[0] = Math.max(quad.size[0], quad[i].size[0]) - quad.size[1] = Math.max(quad.size[1], quad[i].size[1]) - } - } - } - } - } - - function xCenter(d) { return d.x + d.vx + sizes[d.index][0] / 2 } - function yCenter(d) { return d.y + d.vy + sizes[d.index][1] / 2 } - - force.initialize = function (_) { - sizes = (nodes = _).map(size) - masses = sizes.map(function (d) { return d[0] * d[1] }) - } - - force.size = function (_) { - return (arguments.length - ? (size = typeof _ === 'function' ? _ : constant(_), force) - : size) - } - - force.strength = function (_) { - return (arguments.length ? (strength = +_, force) : strength) - } - - force.iterations = function (_) { - return (arguments.length ? (iterations = +_, force) : iterations) - } - - return force -} - -let gravityEnabled = true; -let gravityCallbacks = []; - -const changeGravity = (enabled) => { - gravityEnabled = enabled; - for (const callback of gravityCallbacks) callback(); -}; - -const resetNetwork = () => { - gravityCallbacks = []; -}; - -const drawNetwork = (nodes, links) => { - const nodeById = {}; - - const root = { - id: -1, - data: {}, - children: nodes, - parent: null, - }; - - const dfs = (node, depth) => { - nodeById[node.id] = node; - node.depth = depth; - if (node.children) { - node.links = []; - for (const child of node.children) { - child.parent = node; - dfs(child, depth + 1); - } - } - }; - - dfs(root, 0); - - // put each link in the lowest-common-ancestor of the 2 nodes - const groupLinks = (links) => { - for (const link of links) { - let source = nodeById[link.source]; - let target = nodeById[link.target]; - - while (source.depth > target.depth) source = source.parent; - while (target.depth > source.depth) target = target.parent; - - // link cannot point to a children - if (source.id === target.id) { - throw new Error("Invalid link from " + link.source + " to " + link.target); - } - - while (source.parent.id !== target.parent.id) { - source = source.parent; - target = target.parent; - } - - const parent = source.parent; - parent.links.push({ - source: source.id, - target: target.id, - link: link, - }); - } - }; - - groupLinks(links); - - const assignInitialPositions = (node) => { - if (!node.children) return; - const adj = {}; - for (const link of node.links) { - if (!(link.source in adj)) adj[link.source] = []; - adj[link.source].push(link.target); - } - const visited = {}; - const order = []; - const dfs = (node) => { - if (node in visited) return; - visited[node] = true; - if (node in adj) { - for (const next of adj[node]) { - dfs(next); - } - } - order.push(node); - } - const ranks = {}; - for (const child of node.children) { - dfs(child.id); - } - for (const child of node.children) { - visited[child.id] = false; - ranks[child.id] = 0; - } - for (let i = order.length - 1; i >= 0; i--) { - const id = order[i]; - if (id in adj) { - for (const next of adj[id]) { - if (visited[next]) continue; - ranks[next] = Math.max(ranks[next], ranks[id] + 1); - } - } - visited[id] = true; - } - - const rankFreq = {}; - const rankSpacingX = 400; - const rankSpacingY = 200; - const initialPosition = {}; - for (let i = order.length - 1; i >= 0; i--) { - const id = order[i]; - const rank = ranks[id]; - const y = rankSpacingY * rank; - if (!(rank in rankFreq)) rankFreq[rank] = 0; - const x = rankSpacingX * rankFreq[rank]; - rankFreq[rank]++; - initialPosition[id] = [x, y]; - } - - for (const child of node.children) { - const [x, y] = initialPosition[child.id]; - child.x = x; - child.y = y; - - assignInitialPositions(child); - } - }; - - assignInitialPositions(root); - - const contentId = "network-content"; - - const container = document.getElementById(contentId); - const svgWidth = container.clientWidth; - const svgHeight = container.clientHeight; - const nodeWidth = 100; - const nodeHeight = 40; - - d3.select("#" + contentId).select("svg").remove(); - const svg = d3.select("#" + contentId) - .append("svg") - .attr("width", svgWidth) - .attr("height", svgHeight); - - const defs = svg.append("defs"); - defs - .append("marker") - .attr("id", "arrowhead") - .attr("markerWidth", "10") - .attr("markerHeight", "7") - .attr("refX", "7.14") - .attr("refY", "3.5") - .attr("orient", "auto") - .append("polygon") - .attr("points", "0 0, 10 3.5, 0 7"); - - const nodeSize = (node) => { - if (!node.children) return [ - node.x, node.y, node.width, node.height, - ]; - - const padding = 10; - let left = 1e9; - let right = -1e9; - let top = 1e9; - let bottom = -1e9; - for (const child of node.children) { - const childLeft = child.x - child.width / 2; - const childRight = childLeft + child.width; - const childTop = child.y - child.height / 2; - const childBottom = childTop + child.height; - - if (childLeft < left) left = childLeft; - if (childRight > right) right = childRight; - if (childTop < top) top = childTop; - if (childBottom > bottom) bottom = childBottom; - } - const width = right - left + 2 * padding; - const height = bottom - top + 2 * padding; - return [left - padding, top - padding, width, height]; - }; - - function pointOnRect(x, y, minX, minY, maxX, maxY) { - const midX = (minX + maxX) / 2; - const midY = (minY + maxY) / 2; - // if (midX - x == 0) -> m == ±Inf -> minYx/maxYx == x (because value / ±Inf = ±0) - const m = (midY - y) / (midX - x); - - if (x <= midX) { // check "left" side - const minXy = m * (minX - x) + y; - if (minY <= minXy && minXy <= maxY) - return [minX, minXy]; - } - - if (x >= midX) { // check "right" side - const maxXy = m * (maxX - x) + y; - if (minY <= maxXy && maxXy <= maxY) - return [maxX, maxXy]; - } - - if (y <= midY) { // check "top" side - const minYx = (minY - y) / m + x; - if (minX <= minYx && minYx <= maxX) - return [minYx, minY]; - } - - if (y >= midY) { // check "bottom" side - const maxYx = (maxY - y) / m + x; - if (minX <= maxYx && maxYx <= maxX) - return [maxYx, maxY]; - } - - // edge case when finding midpoint intersection: m = 0/0 = NaN - if (x === midX && y === midY) return [x, y]; - } - - const drawNode = (node, parent) => { - if (!node.children) { - node.width = nodeWidth; - node.height = nodeHeight; - const group = parent.append("g"); - group - .append("rect") - .style("fill", "#dddddd") - .style("stroke", "blue") - .attr("x", -nodeWidth/2) - .attr("y", -nodeHeight/2) - .attr("width", nodeWidth) - .attr("height", nodeHeight); - if (node.data && node.data.text) { - group - .append("text") - .attr("text-anchor", "middle") - .attr("dominant-baseline", "middle") - .attr("font-size", "22") - .text(node.data.text) - } - if (node.data && node.data.onclick) { - group.on("click", () => node.data.onclick()); - group.style("cursor", "pointer"); - } - return; - } - - const innerRect = parent - .append("rect") - .attr("class", "node" + node.id) - .style("fill", "white") - .style("stroke", "blue"); - - const innerNodes = parent - .selectAll("g") - .data(node.children, (d) => d.id) - .enter() - .append("g") - .attr("id", (d) => "node" + d.id); - - const innerText = parent - .append("text") - .attr("text-anchor", "start") - .attr("dominant-baseline", "middle") - .attr("font-size", "22") - .text(node.data.text); - - innerNodes.each((inner, i, innerNodeElements) => { - const innerNodeElement = d3.select(innerNodeElements[i]); - drawNode(node.children[i], innerNodeElement); - }); - - const links = node.links.map((link) => { - const {type, text, width, onclick} = link.link.data; - const lineElem = parent - .append("line") - .attr("class", link.link.source + "_" + link.link.target) - .attr("stroke", "black") - .attr("stroke-width", width || 1) - .attr("marker-end", "url(#arrowhead)"); - const textElem = parent - .append("text") - .attr("text-anchor", "start") - .attr("dominant-baseline", "middle") - .attr("font-size", "22") - .text(text || ""); - if (type === "dashed") { - lineElem.attr("stroke-dasharray", "4") - } - if (onclick) { - lineElem - .style("cursor", "pointer") - .on("click", () => onclick()); - textElem - .style("cursor", "pointer") - .on("click", () => onclick()); - } - return [lineElem, textElem]; - }); - - const tick = () => { - innerNodes - .attr("transform", (d) => "translate(" + d.x + "," + d.y + ")"); - - if (node.parent) { - const [x, y, width, height] = nodeSize(node); - node.width = width; - node.height = height; - innerRect - .attr("x", x) - .attr("y", y) - .attr("width", width) - .attr("height", height); - innerText - .attr("x", x+20) - .attr("y", y-20) - .attr("width", width) - .attr("height", height); - } - - links.forEach(([line, text], i) => { - const link = node.links[i]; - const getCoord = (child) => { - if (!child || child.id === node.id) { - return [0, 0]; - } - const [x, y] = getCoord(child.parent); - return [x+child.x, y+child.y]; - }; - const source = nodeById[link.link.source]; - const target = nodeById[link.link.target]; - const [x1, y1] = getCoord(source); - const [x2, y2] = getCoord(target); - - const rect1 = [x1-source.width/2, y1-source.height/2, x1+source.width/2, y1+source.height/2]; - const rect2 = [x2-target.width/2, y2-target.height/2, x2+target.width/2, y2+target.height/2]; - const [px1, py1] = pointOnRect(x2, y2, ...rect1); - const [px2, py2] = pointOnRect(x1, y1, ...rect2); - - line - .attr("x1", px1) - .attr("y1", py1) - .attr("x2", px2) - .attr("y2", py2); - text - .attr("x", (px1 + px2) / 2) - .attr("y", (py1 + py2) / 2); - }); - }; - - const collisionForce = rectCollide() - .size((d) => { - const [,, w, h] = nodeSize(d); - return [w, h]; - }); - const simulation = d3.forceSimulation() - .force("link", d3.forceLink().id((d) => d.id).strength(0.001)) - .force("charge", d3.forceManyBody().strength(-30)) - .force("collision", collisionForce) - .force("center", d3.forceCenter(0, 0)) - .alphaMin(0.1); - - simulation.nodes(node.children).on("tick", tick); - simulation.force("link").links(node.links); - simulation.alpha(1).restart(); - - gravityCallbacks.push(() => { - if (gravityEnabled) { - for (const n of node.children) { - node.fx = null; - node.fy = null; - simulation.alpha(1).restart(); - } - } else { - for (const n of node.children) { - node.fx = node.x; - node.fy = node.y; - } - } - }); - - const drag = () => { - return d3.drag() - .on("start", (d) => { - if (!d3.event.active) simulation.alpha(1).restart(); - d.fx = d.x; - d.fy = d.y; - }) - .on("drag", (d) => { - d.fx = d3.event.x; - d.fy = d3.event.y; - }) - .on("end", (d) => { - if (!d3.event.active) simulation.alphaTarget(0); - if (gravityEnabled) { - d.fx = null; - d.fy = null; - } - }); - } - - innerNodes.call(drag()); - }; - - const rootElem = svg.append("g"); - const contentElem = rootElem - .append("g") - .attr("transform", "translate(" + svgWidth/2 + "," + svgHeight/2 + ")"); - drawNode(root, contentElem); - - const zoom = d3.zoom() - .scaleExtent([0.1, 10]) - .on("zoom", () => rootElem.attr("transform", d3.event.transform)); - svg.call(zoom); - - const resize = () => { - const width = container.clientWidth; - const height = container.clientHeight; - svg - .attr("width", width) - .attr("height", height); - } - window.addEventListener("resize", resize); -} \ No newline at end of file diff --git a/tools/visualizer/js/profiler.js b/tools/visualizer/js/profiler.js deleted file mode 100644 index 4c6b7379..00000000 --- a/tools/visualizer/js/profiler.js +++ /dev/null @@ -1,118 +0,0 @@ -class TimeSeries { - constructor() { - this.data = {}; - this.total = 0; - } - - add(bucket, value) { - if (!(bucket in this.data)) this.data[bucket] = 0; - this.data[bucket] += value; - this.total += value; - } - - asList() { - const res = Object.entries(this.data); - return res - .sort(([a,], [b,]) => a - b) - .map(([t, v]) => [+t, +v]); - } - - static merge(a, b) { - const result = new TimeSeries(); - for (const [bucket, value] of Object.entries(a.data)) { - result.data[bucket] = value; - } - for (const [bucket, value] of Object.entries(b.data)) { - if (!(bucket in result.data)) result.data[bucket] = 0; - result.data[bucket] += value; - } - result.total = a.total + b.total; - return result; - } -} - -class ChannelMetric { - constructor() { - this.data = {}; - } - - add(from, to, bucket, value) { - const key = ChannelMetric.coordPairKey(from, to); - if (!(key in this.data)) { - this.data[key] = { - from, - to, - series: new TimeSeries() - }; - } - this.data[key].series.add(bucket, value); - } - - groupByBlockId() { - const byBlockId = {}; - for (const {from, to, series} of Object.values(this.data)) { - const key = ChannelMetric.blockPairKey(from.block_id, to.block_id); - if (!(key in byBlockId)) byBlockId[key] = {from, to, series: new TimeSeries()}; - byBlockId[key].series = TimeSeries.merge(byBlockId[key].series, series); - } - return byBlockId; - } - - static blockPairKey(from, to) { - return `${from}:${to}`; - } - - static coordPairKey(from, to) { - return `${this.coordKey(from)}|${this.coordKey(to)}`; - } - - static coordKey(coord) { - return `${coord.host_id}:${coord.block_id}:${coord.replica_id}`; - } -} - -class IterationBoundaries { - constructor() { - this.data = {}; - } - - add(block_id, time) { - if (!(block_id in this.data)) this.data[block_id] = []; - this.data[block_id].push(time); - } -} - -class Profiler { - constructor(data) { - this.channel_metrics = { - items_in: new ChannelMetric(), - items_out: new ChannelMetric(), - net_messages_in: new ChannelMetric(), - net_messages_out: new ChannelMetric(), - net_bytes_in: new ChannelMetric(), - net_bytes_out: new ChannelMetric(), - }; - this.iteration_boundaries = new IterationBoundaries(); - for (const profiler of data) { - for (const {metrics, start_ms} of profiler.buckets) { - for (const {from, to, value} of metrics.items_in) { - this.channel_metrics.items_in.add(from, to, start_ms, value); - } - for (const {from, to, value} of metrics.items_out) { - this.channel_metrics.items_out.add(from, to, start_ms, value); - } - for (const {from, to, value} of metrics.net_messages_in) { - this.channel_metrics.net_messages_in.add(from, to, start_ms, value[0]); - this.channel_metrics.net_bytes_in.add(from, to, start_ms, value[1]); - } - for (const {from, to, value} of metrics.net_messages_out) { - this.channel_metrics.net_messages_out.add(from, to, start_ms, value[0]); - this.channel_metrics.net_bytes_out.add(from, to, start_ms, value[1]); - } - for (const [block_id, time] of metrics.iteration_boundaries) { - this.iteration_boundaries.add(block_id, time); - } - } - } - } -} \ No newline at end of file diff --git a/tools/visualizer/js/script.js b/tools/visualizer/js/script.js deleted file mode 100644 index fba2900a..00000000 --- a/tools/visualizer/js/script.js +++ /dev/null @@ -1,67 +0,0 @@ -const inputSelector = document.getElementById("file-picker"); -const jobGraphRadio = document.getElementById("job-graph"); -const executionGraphRadio = document.getElementById("execution-graph"); -const gravitySelect = document.getElementById("gravity"); -const redrawButton = document.getElementById("redraw"); - -window.graphMode = "job"; -window.profiler = null; - -const drawGraph = (structures, profiler) => { - if (window.graphMode === "job") { - drawJobGraph(structures, profiler); - } else { - drawExecutionGraph(structures, profiler); - } -} - -inputSelector.addEventListener("change", (event) => { - const files = event.target.files; - if (files.length !== 1) { - return; - } - const file = files[0]; - const reader = new FileReader(); - reader.addEventListener("load", (event) => { - const content = event.target.result; - let json; - try { - json = JSON.parse(content); - } catch (e) { - alert("Malformed JSON data: " + e); - } - window.structures = json["structures"]; - window.profilers = json["profilers"]; - if (!window.structures || !window.profilers) { - alert("Invalid JSON data: structures or profilers missing"); - return; - } - window.profiler = new Profiler(window.profilers); - drawGraph(window.structures, window.profiler); - }); - reader.readAsText(file); -}); - -jobGraphRadio.addEventListener("change", () => { - window.graphMode = "job"; - if (window.profiler) { - drawGraph(window.structures, window.profiler); - } -}); - -executionGraphRadio.addEventListener("change", () => { - window.graphMode = "execution"; - if (window.profiler) { - drawGraph(window.structures, window.profiler); - } -}); - -gravitySelect.addEventListener("change", () => { - changeGravity(!gravityEnabled); -}); - -redrawButton.addEventListener("click", () => { - if (window.profiler) { - drawGraph(window.structures, window.profiler); - } -}); \ No newline at end of file