Skip to content

Commit

Permalink
Downloads and requests history, changed default params
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Oct 17, 2023
1 parent cc2ce4d commit 8f3231c
Showing 1 changed file with 99 additions and 43 deletions.
142 changes: 99 additions & 43 deletions scripts/scheduling_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import heapq
import itertools
import matplotlib.pyplot as plt
import numpy
import os
import random
import seaborn
Expand All @@ -10,27 +11,27 @@
from datetime import datetime
from pathlib import Path
from typing import Set, Optional, Iterator
import numpy

NUM_WORKERS = int(os.environ.get('NUM_WORKERS', '100'))
WORKERS_JAILED_PER_EPOCH = int(os.environ.get('WORKERS_JAILED_PER_EPOCH', '5'))
WORKER_CHURN_PER_EPOCH = int(os.environ.get('WORKER_CHURN_PER_EPOCH', '2'))
NUM_UNITS = int(os.environ.get('NUM_UNITS', '5000'))
NEW_UNITS_PER_EPOCH = int(os.environ.get('NEW_UNITS_PER_EPOCH', '10'))
WORKERS_JAILED_PER_EPOCH = int(os.environ.get('WORKERS_JAILED_PER_EPOCH', '0'))
WORKER_CHURN_PER_EPOCH = int(os.environ.get('WORKER_CHURN_PER_EPOCH', '0'))
NUM_UNITS = int(os.environ.get('NUM_UNITS', '12000'))
NEW_UNITS_PER_EPOCH = int(os.environ.get('NEW_UNITS_PER_EPOCH', '0'))
UNIT_SIZE_MB = int(os.environ.get('UNIT_SIZE_MB', '2500'))
WORKER_STORAGE_MB = int(os.environ.get('WORKER_STORAGE_MB', '500000'))
WORKER_STORAGE_MB = int(os.environ.get('WORKER_STORAGE_MB', '1000000'))
REPLICATION_FACTOR = int(os.environ.get('REPLICATION_FACTOR', '3'))
MAX_EPOCHS = int(os.environ.get('MAX_EPOCHS', '0'))
NUM_REPS = int(os.environ.get('NUM_REPS', '1'))
SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'xor')
MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.05'))
MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '1'))
NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '10000'))
SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'random')
MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.1'))
MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '10'))
NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '1000'))
QUALIFIED_WORKER_THRESHOLD = float(os.environ.get('QUALIFIED_WORKER_THRESHOLD', '0.25'))
OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', './out'))

assert 0 <= MIXED_UNITS_RATIO <= 1, 'MIXED_UNITS_RATIO should be in range [0,1]'
assert MIXING_RECENT_UNIT_WEIGHT >= 1, 'MIXING_RECENT_UNIT_WEIGHT should be >= 1'
assert NUM_WORKERS * WORKER_STORAGE_MB > NUM_UNITS * REPLICATION_FACTOR * UNIT_SIZE_MB, 'Not enough worker storage'


def params_summary() -> str:
Expand Down Expand Up @@ -177,6 +178,43 @@ def __init__(self):
super().__init__("Not enough workers to assign unit")


@dataclass
class QualifiedWorker:
worker: 'Worker'
num_epochs: int

@property
def id(self) -> 'Id':
return self.worker.id

@property
def avg_download_gb(self) -> int:
""" Average data downloaded per epoch, excluding initial sync, in GB """
return (self.worker.total_downloaded_data - self.worker.initial_sync_data) // self.num_epochs // 1024

@property
def avg_requests(self) -> int:
""" Average requests served per epoch, in thousands """
return self.worker.num_requests // self.num_epochs // 1000


class History:
def __init__(self):
self.download_avg: [float] = [] # Average data downloaded per worker per epoch in GB
self.download_cv: [float] = [] # Coefficient of variation of downloaded data
self.requests_avg: [float] = [] # Average number of requests served per worker per epoch in thousands
self.requests_cv: [float] = [] # Coefficient of variation of number of served requests

def update(self, workers: '[QualifiedWorker]'):
downloads = [w.avg_download_gb for w in workers]
self.download_avg.append(numpy.mean(downloads))
self.download_cv.append(numpy.std(downloads) / self.download_avg[-1])

requests = [w.avg_requests for w in workers]
self.requests_avg.append(numpy.mean(requests))
self.requests_cv.append(numpy.std(requests) / self.requests_avg[-1])


class Scheduler(ABC):
def __init__(self):
self.workers: '[Worker]' = [Worker() for _ in range(NUM_WORKERS)]
Expand All @@ -185,6 +223,7 @@ def __init__(self):
self.epoch = 0
self.jailed_workers_data = 0
self.retired_workers_data = 0
self.history = History()

# Perform initial assignment
self.assign_units(initial=True)
Expand Down Expand Up @@ -250,35 +289,35 @@ def run_epoch(self):
for _ in range(NEW_UNITS_PER_EPOCH):
self.units.append(Unit())

# update history records
workers = self.get_qualified_workers()
self.history.update(workers)

self.epoch += 1

def summary(self) -> 'Summary':
def get_qualified_workers(self, threshold=0.0) -> '[QualifiedWorker]':
def epochs_active(worker: 'Worker') -> int:
last_epoch = worker.epoch_retired if worker.epoch_retired is not None else self.epoch
return last_epoch - worker.epoch_joined + 1
min_epochs = int(self.epoch * QUALIFIED_WORKER_THRESHOLD)
qualified_workers = [
(worker, num_epochs)
min_epochs = int(self.epoch * threshold)

return [
QualifiedWorker(worker=worker, num_epochs=num_epochs)
for worker in (self.workers + self.retired_workers)
if (num_epochs := epochs_active(worker)) > min_epochs
]

avg_worker_download = {
worker.id: (worker.total_downloaded_data - worker.initial_sync_data) // num_epochs // 1024
for worker, num_epochs in qualified_workers
}
avg_worker_requests = {
worker.id: worker.num_requests // num_epochs // 1000
for worker, num_epochs in qualified_workers
}
def summary(self) -> 'Summary':
qualified_workers = self.get_qualified_workers(threshold=QUALIFIED_WORKER_THRESHOLD)
return Summary(
last_epoch=self.epoch,
downloaded_data_gb=(self.total_downloaded_data - self.initial_sync_data) // 1024,
jailed_workers_data_gb=self.jailed_workers_data // 1024,
retired_workers_data_gb=self.retired_workers_data // 1024,
new_chunks_data_gb=self.epoch * NEW_UNITS_PER_EPOCH * UNIT_SIZE_MB * REPLICATION_FACTOR // 1024,
avg_worker_download=avg_worker_download,
avg_worker_requests=avg_worker_requests,
avg_worker_download={w.id: w.avg_download_gb for w in qualified_workers},
avg_worker_requests={w.id: w.avg_requests for w in qualified_workers},
history=self.history
)

def run_simulation(self):
Expand Down Expand Up @@ -356,6 +395,7 @@ class Summary:
new_chunks_data_gb: int # Total size of new chunks which appeared throughout the simulation
avg_worker_download: dict['Id', int] # Average size of data downloaded per epoch for each worker
avg_worker_requests: dict['Id', int] # Average number of received requests per epoch for each worker
history: 'History'

@property
def downloaded_per_epoch(self) -> int:
Expand All @@ -379,15 +419,31 @@ def to_string(self) -> str:
def save_plot(self, plot_path: Path):
print(f"Saving plot to {plot_path}")

fig, ax = plt.subplots(1, 2, figsize=(8, 4))
fig, ax = plt.subplots(3, 2, figsize=(8, 12))

seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0, 0])
ax[0, 0].set_xlabel("average data downloaded per epoch [GB]")
ax[0, 0].set_ylabel("# workers")

seaborn.histplot(data=self.avg_worker_requests, kde=True, ax=ax[0, 1])
ax[0, 1].set_xlabel("average requests served per epoch [thousands]")
ax[0, 1].set_ylabel("# workers")

seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0])
ax[0].set_xlabel("average data downloaded per epoch [GB]")
ax[0].set_ylabel("# workers")
seaborn.lineplot(data=self.history.download_avg, ax=ax[1, 0])
ax[1, 0].set_xlabel("epoch")
ax[1, 0].set_ylabel("avg data downloaded p/worker p/epoch [GB]")

seaborn.histplot(data=self.avg_worker_requests, kde=True, ax=ax[1])
ax[1].set_xlabel("average requests served per epoch [thousands]")
ax[1].set_ylabel("# workers")
seaborn.lineplot(data=self.history.requests_avg, ax=ax[1, 1])
ax[1, 1].set_xlabel("epoch")
ax[1, 1].set_ylabel("avg requests served p/worker p/epoch [thousands]")

seaborn.lineplot(data=self.history.download_cv, ax=ax[2, 0])
ax[2, 0].set_xlabel("epoch")
ax[2, 0].set_ylabel("variance coefficient of data downloaded")

seaborn.lineplot(data=self.history.requests_cv, ax=ax[2, 1])
ax[2, 1].set_xlabel("epoch")
ax[2, 1].set_ylabel("variance coefficient of requests served")

plt.tight_layout()
plt.savefig(plot_path, format="png")
Expand All @@ -401,25 +457,25 @@ def run_simulation() -> 'Iterator[Summary]':
yield scheduler.run_simulation()


def aggregate_summaries(summaries: ['Summary']) -> 'Summary':
total_epochs = sum(s.last_epoch for s in summaries)
return Summary(
last_epoch=total_epochs // len(summaries),
downloaded_data_gb=sum(s.downloaded_data_gb * s.last_epoch for s in summaries) // total_epochs,
jailed_workers_data_gb=sum(s.jailed_workers_data_gb * s.last_epoch for s in summaries) // total_epochs,
retired_workers_data_gb=sum(s.retired_workers_data_gb * s.last_epoch for s in summaries) // total_epochs,
new_chunks_data_gb=sum(s.new_chunks_data_gb * s.last_epoch for s in summaries) // total_epochs,
avg_worker_download={w: d for s in summaries for w, d in s.avg_worker_download.items()},
avg_worker_requests={w: d for s in summaries for w, d in s.avg_worker_requests.items()},
)
# def aggregate_summaries(summaries: ['Summary']) -> 'Summary':
# total_epochs = sum(s.last_epoch for s in summaries)
# return Summary(
# last_epoch=total_epochs // len(summaries),
# downloaded_data_gb=sum(s.downloaded_data_gb * s.last_epoch for s in summaries) // total_epochs,
# jailed_workers_data_gb=sum(s.jailed_workers_data_gb * s.last_epoch for s in summaries) // total_epochs,
# retired_workers_data_gb=sum(s.retired_workers_data_gb * s.last_epoch for s in summaries) // total_epochs,
# new_chunks_data_gb=sum(s.new_chunks_data_gb * s.last_epoch for s in summaries) // total_epochs,
# avg_worker_download={w: d for s in summaries for w, d in s.avg_worker_download.items()},
# avg_worker_requests={w: d for s in summaries for w, d in s.avg_worker_requests.items()},
# )


def main():
print("Starting simulation.\n")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)
print(params_summary())

summary = aggregate_summaries(list(run_simulation()))
summary = next(run_simulation())

print(f"\nSimulation ended. Average results from {NUM_REPS} rounds:\n{summary.to_string()}")

Expand Down

0 comments on commit 8f3231c

Please sign in to comment.