diff --git a/scripts/scheduling_simulation.py b/scripts/scheduling_simulation.py index c270154..d54a51b 100644 --- a/scripts/scheduling_simulation.py +++ b/scripts/scheduling_simulation.py @@ -2,6 +2,7 @@ import heapq import itertools import matplotlib.pyplot as plt +import numpy import os import random import seaborn @@ -10,27 +11,27 @@ from datetime import datetime from pathlib import Path from typing import Set, Optional, Iterator -import numpy NUM_WORKERS = int(os.environ.get('NUM_WORKERS', '100')) -WORKERS_JAILED_PER_EPOCH = int(os.environ.get('WORKERS_JAILED_PER_EPOCH', '5')) -WORKER_CHURN_PER_EPOCH = int(os.environ.get('WORKER_CHURN_PER_EPOCH', '2')) -NUM_UNITS = int(os.environ.get('NUM_UNITS', '5000')) -NEW_UNITS_PER_EPOCH = int(os.environ.get('NEW_UNITS_PER_EPOCH', '10')) +WORKERS_JAILED_PER_EPOCH = int(os.environ.get('WORKERS_JAILED_PER_EPOCH', '0')) +WORKER_CHURN_PER_EPOCH = int(os.environ.get('WORKER_CHURN_PER_EPOCH', '0')) +NUM_UNITS = int(os.environ.get('NUM_UNITS', '12000')) +NEW_UNITS_PER_EPOCH = int(os.environ.get('NEW_UNITS_PER_EPOCH', '0')) UNIT_SIZE_MB = int(os.environ.get('UNIT_SIZE_MB', '2500')) -WORKER_STORAGE_MB = int(os.environ.get('WORKER_STORAGE_MB', '500000')) +WORKER_STORAGE_MB = int(os.environ.get('WORKER_STORAGE_MB', '1000000')) REPLICATION_FACTOR = int(os.environ.get('REPLICATION_FACTOR', '3')) MAX_EPOCHS = int(os.environ.get('MAX_EPOCHS', '0')) NUM_REPS = int(os.environ.get('NUM_REPS', '1')) -SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'xor') -MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.05')) -MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '1')) -NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '10000')) +SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'random') +MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.1')) +MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '10')) +NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '1000')) QUALIFIED_WORKER_THRESHOLD = float(os.environ.get('QUALIFIED_WORKER_THRESHOLD', '0.25')) OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', './out')) assert 0 <= MIXED_UNITS_RATIO <= 1, 'MIXED_UNITS_RATIO should be in range [0,1]' assert MIXING_RECENT_UNIT_WEIGHT >= 1, 'MIXING_RECENT_UNIT_WEIGHT should be >= 1' +assert NUM_WORKERS * WORKER_STORAGE_MB > NUM_UNITS * REPLICATION_FACTOR * UNIT_SIZE_MB, 'Not enough worker storage' def params_summary() -> str: @@ -177,6 +178,43 @@ def __init__(self): super().__init__("Not enough workers to assign unit") +@dataclass +class QualifiedWorker: + worker: 'Worker' + num_epochs: int + + @property + def id(self) -> 'Id': + return self.worker.id + + @property + def avg_download_gb(self) -> int: + """ Average data downloaded per epoch, excluding initial sync, in GB """ + return (self.worker.total_downloaded_data - self.worker.initial_sync_data) // self.num_epochs // 1024 + + @property + def avg_requests(self) -> int: + """ Average requests served per epoch, in thousands """ + return self.worker.num_requests // self.num_epochs // 1000 + + +class History: + def __init__(self): + self.download_avg: [float] = [] # Average data downloaded per worker per epoch in GB + self.download_cv: [float] = [] # Coefficient of variation of downloaded data + self.requests_avg: [float] = [] # Average number of requests served per worker per epoch in thousands + self.requests_cv: [float] = [] # Coefficient of variation of number of served requests + + def update(self, workers: '[QualifiedWorker]'): + downloads = [w.avg_download_gb for w in workers] + self.download_avg.append(numpy.mean(downloads)) + self.download_cv.append(numpy.std(downloads) / self.download_avg[-1]) + + requests = [w.avg_requests for w in workers] + self.requests_avg.append(numpy.mean(requests)) + self.requests_cv.append(numpy.std(requests) / self.requests_avg[-1]) + + class Scheduler(ABC): def __init__(self): self.workers: '[Worker]' = [Worker() for _ in range(NUM_WORKERS)] @@ -185,6 +223,7 @@ def __init__(self): self.epoch = 0 self.jailed_workers_data = 0 self.retired_workers_data = 0 + self.history = History() # Perform initial assignment self.assign_units(initial=True) @@ -250,35 +289,35 @@ def run_epoch(self): for _ in range(NEW_UNITS_PER_EPOCH): self.units.append(Unit()) + # update history records + workers = self.get_qualified_workers() + self.history.update(workers) + self.epoch += 1 - def summary(self) -> 'Summary': + def get_qualified_workers(self, threshold=0.0) -> '[QualifiedWorker]': def epochs_active(worker: 'Worker') -> int: last_epoch = worker.epoch_retired if worker.epoch_retired is not None else self.epoch return last_epoch - worker.epoch_joined + 1 - min_epochs = int(self.epoch * QUALIFIED_WORKER_THRESHOLD) - qualified_workers = [ - (worker, num_epochs) + min_epochs = int(self.epoch * threshold) + + return [ + QualifiedWorker(worker=worker, num_epochs=num_epochs) for worker in (self.workers + self.retired_workers) if (num_epochs := epochs_active(worker)) > min_epochs ] - avg_worker_download = { - worker.id: (worker.total_downloaded_data - worker.initial_sync_data) // num_epochs // 1024 - for worker, num_epochs in qualified_workers - } - avg_worker_requests = { - worker.id: worker.num_requests // num_epochs // 1000 - for worker, num_epochs in qualified_workers - } + def summary(self) -> 'Summary': + qualified_workers = self.get_qualified_workers(threshold=QUALIFIED_WORKER_THRESHOLD) return Summary( last_epoch=self.epoch, downloaded_data_gb=(self.total_downloaded_data - self.initial_sync_data) // 1024, jailed_workers_data_gb=self.jailed_workers_data // 1024, retired_workers_data_gb=self.retired_workers_data // 1024, new_chunks_data_gb=self.epoch * NEW_UNITS_PER_EPOCH * UNIT_SIZE_MB * REPLICATION_FACTOR // 1024, - avg_worker_download=avg_worker_download, - avg_worker_requests=avg_worker_requests, + avg_worker_download={w.id: w.avg_download_gb for w in qualified_workers}, + avg_worker_requests={w.id: w.avg_requests for w in qualified_workers}, + history=self.history ) def run_simulation(self): @@ -356,6 +395,7 @@ class Summary: new_chunks_data_gb: int # Total size of new chunks which appeared throughout the simulation avg_worker_download: dict['Id', int] # Average size of data downloaded per epoch for each worker avg_worker_requests: dict['Id', int] # Average number of received requests per epoch for each worker + history: 'History' @property def downloaded_per_epoch(self) -> int: @@ -379,15 +419,31 @@ def to_string(self) -> str: def save_plot(self, plot_path: Path): print(f"Saving plot to {plot_path}") - fig, ax = plt.subplots(1, 2, figsize=(8, 4)) + fig, ax = plt.subplots(3, 2, figsize=(8, 12)) + + seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0, 0]) + ax[0, 0].set_xlabel("average data downloaded per epoch [GB]") + ax[0, 0].set_ylabel("# workers") + + seaborn.histplot(data=self.avg_worker_requests, kde=True, ax=ax[0, 1]) + ax[0, 1].set_xlabel("average requests served per epoch [thousands]") + ax[0, 1].set_ylabel("# workers") - seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0]) - ax[0].set_xlabel("average data downloaded per epoch [GB]") - ax[0].set_ylabel("# workers") + seaborn.lineplot(data=self.history.download_avg, ax=ax[1, 0]) + ax[1, 0].set_xlabel("epoch") + ax[1, 0].set_ylabel("avg data downloaded p/worker p/epoch [GB]") - seaborn.histplot(data=self.avg_worker_requests, kde=True, ax=ax[1]) - ax[1].set_xlabel("average requests served per epoch [thousands]") - ax[1].set_ylabel("# workers") + seaborn.lineplot(data=self.history.requests_avg, ax=ax[1, 1]) + ax[1, 1].set_xlabel("epoch") + ax[1, 1].set_ylabel("avg requests served p/worker p/epoch [thousands]") + + seaborn.lineplot(data=self.history.download_cv, ax=ax[2, 0]) + ax[2, 0].set_xlabel("epoch") + ax[2, 0].set_ylabel("variance coefficient of data downloaded") + + seaborn.lineplot(data=self.history.requests_cv, ax=ax[2, 1]) + ax[2, 1].set_xlabel("epoch") + ax[2, 1].set_ylabel("variance coefficient of requests served") plt.tight_layout() plt.savefig(plot_path, format="png") @@ -401,17 +457,17 @@ def run_simulation() -> 'Iterator[Summary]': yield scheduler.run_simulation() -def aggregate_summaries(summaries: ['Summary']) -> 'Summary': - total_epochs = sum(s.last_epoch for s in summaries) - return Summary( - last_epoch=total_epochs // len(summaries), - downloaded_data_gb=sum(s.downloaded_data_gb * s.last_epoch for s in summaries) // total_epochs, - jailed_workers_data_gb=sum(s.jailed_workers_data_gb * s.last_epoch for s in summaries) // total_epochs, - retired_workers_data_gb=sum(s.retired_workers_data_gb * s.last_epoch for s in summaries) // total_epochs, - new_chunks_data_gb=sum(s.new_chunks_data_gb * s.last_epoch for s in summaries) // total_epochs, - avg_worker_download={w: d for s in summaries for w, d in s.avg_worker_download.items()}, - avg_worker_requests={w: d for s in summaries for w, d in s.avg_worker_requests.items()}, - ) +# def aggregate_summaries(summaries: ['Summary']) -> 'Summary': +# total_epochs = sum(s.last_epoch for s in summaries) +# return Summary( +# last_epoch=total_epochs // len(summaries), +# downloaded_data_gb=sum(s.downloaded_data_gb * s.last_epoch for s in summaries) // total_epochs, +# jailed_workers_data_gb=sum(s.jailed_workers_data_gb * s.last_epoch for s in summaries) // total_epochs, +# retired_workers_data_gb=sum(s.retired_workers_data_gb * s.last_epoch for s in summaries) // total_epochs, +# new_chunks_data_gb=sum(s.new_chunks_data_gb * s.last_epoch for s in summaries) // total_epochs, +# avg_worker_download={w: d for s in summaries for w, d in s.avg_worker_download.items()}, +# avg_worker_requests={w: d for s in summaries for w, d in s.avg_worker_requests.items()}, +# ) def main(): @@ -419,7 +475,7 @@ def main(): OUTPUT_DIR.mkdir(exist_ok=True, parents=True) print(params_summary()) - summary = aggregate_summaries(list(run_simulation())) + summary = next(run_simulation()) print(f"\nSimulation ended. Average results from {NUM_REPS} rounds:\n{summary.to_string()}")