From e0a46081049fd9d8135533083d629386f6b6aa29 Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Fri, 20 Oct 2023 15:35:22 +0200 Subject: [PATCH] Economy added to scheduling simulation --- scripts/scheduling_simulation.py | 197 +++++++++++++++++++++++-------- 1 file changed, 149 insertions(+), 48 deletions(-) diff --git a/scripts/scheduling_simulation.py b/scripts/scheduling_simulation.py index d54a51b..6df1d38 100644 --- a/scripts/scheduling_simulation.py +++ b/scripts/scheduling_simulation.py @@ -23,10 +23,16 @@ MAX_EPOCHS = int(os.environ.get('MAX_EPOCHS', '0')) NUM_REPS = int(os.environ.get('NUM_REPS', '1')) SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'random') +MIX_ALL_REPLICAS = 'MIX_ALL_REPLICAS' in os.environ MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.1')) MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '10')) NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '1000')) QUALIFIED_WORKER_THRESHOLD = float(os.environ.get('QUALIFIED_WORKER_THRESHOLD', '0.25')) +SQD_SUPPLY = int(os.environ.get('SQD_SUPPLY', '1_337_000_000')) +WORKER_STAKE = int(os.environ.get('WORKER_STAKE', '100_000')) +EPOCH_LEN_HOURS = int(os.environ.get('EPOCH_LEN_HOURS', '3')) +RESPONSE_SIZE_AVG = float(os.environ.get('RESPONSE_SIZE_AVG', '5.0')) +RESPONSE_SIZE_SD = float(os.environ.get('RESPONSE_SIZE_SD', '1.0')) OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', './out')) assert 0 <= MIXED_UNITS_RATIO <= 1, 'MIXED_UNITS_RATIO should be in range [0,1]' @@ -69,12 +75,15 @@ class Worker: def __init__(self, epoch_joined=0): self.id = random_id() self.epoch_joined = epoch_joined + self.last_epoch = epoch_joined self.epoch_retired: 'Optional[int]' = None self.assigned_units: 'Set[Unit]' = set() self.downloaded_units: 'Set[Unit]' = set() + self.request_count_history: [int] = [0] + self.response_size_history: [float] = [0.0] + self.total_rewards = 0 self.total_downloaded_data = 0 self.initial_sync_data = 0 - self.num_requests = 0 self.jailed = False def __eq__(self, other): @@ -87,6 +96,10 @@ def __lt__(self, other): return NotImplemented return self.assigned_data < other.assigned_data + @property + def num_requests(self) -> int: + return sum(self.request_count_history) + @property def stored_data(self) -> int: return len(self.downloaded_units) * UNIT_SIZE_MB @@ -131,7 +144,9 @@ def download_assigned(self) -> int: return download_size def make_request(self): - self.num_requests += 1 + response_size = max(min(numpy.random.normal(loc=RESPONSE_SIZE_AVG, scale=RESPONSE_SIZE_SD, size=1)[0], 10), 0) + self.request_count_history[-1] += 1 + self.response_size_history[-1] += response_size def jail(self): self.jailed = True @@ -140,11 +155,56 @@ def jail(self): def release(self): self.jailed = False - def retire(self, epoch): + def retire(self): assert self.epoch_retired is None - self.epoch_retired = epoch + self.epoch_retired = self.last_epoch self.purge_assignment() + def next_epoch(self): + assert self.epoch_retired is None + self.last_epoch += 1 + self.request_count_history.append(0) + self.response_size_history.append(0.0) + + def get_reward(self, reward: int): + assert self.epoch_retired is None + self.total_rewards += reward + + @property + def epochs_active(self) -> int: + assert self.epoch_retired is None or self.epoch_retired == self.last_epoch + return self.last_epoch - self.epoch_joined + 1 + + @property + def avg_download_gb(self) -> float: + """ Average data downloaded per epoch, excluding initial sync, in GB """ + return (self.total_downloaded_data - self.initial_sync_data) / self.epochs_active / 1024 + + @property + def avg_requests(self) -> float: + """ Average requests served per epoch, in thousands """ + return self.num_requests / self.epochs_active / 1000 + + @property + def avg_reward(self) -> float: + return self.total_rewards / self.epochs_active + + @property + def stake(self) -> int: + return WORKER_STAKE + + @property + def liveness(self) -> float: + return 1.0 + + @property + def tenure(self) -> float: + if self.epochs_active >= 10: + return 1.0 + if self.epochs_active >= 2: + return 0.5 + self.epochs_active // 2 * 0.1 + return 0.5 + class Unit: def __init__(self): @@ -165,9 +225,15 @@ def missing_replicas(self) -> int: def remove_random_replica(self): assert len(self.assigned_to) > 0 - _, worker = self.assigned_to.popitem() + worker_id = random.choice(list(self.assigned_to.keys())) + worker = self.assigned_to.pop(worker_id) worker.assigned_units.remove(self) + def remove_all_replicas(self): + while len(self.assigned_to) > 0: + _, worker = self.assigned_to.popitem() + worker.assigned_units.remove(self) + def query(self): assert len(self.assigned_to) > 0 random.choice(list(self.assigned_to.values())).make_request() @@ -178,34 +244,16 @@ def __init__(self): super().__init__("Not enough workers to assign unit") -@dataclass -class QualifiedWorker: - worker: 'Worker' - num_epochs: int - - @property - def id(self) -> 'Id': - return self.worker.id - - @property - def avg_download_gb(self) -> int: - """ Average data downloaded per epoch, excluding initial sync, in GB """ - return (self.worker.total_downloaded_data - self.worker.initial_sync_data) // self.num_epochs // 1024 - - @property - def avg_requests(self) -> int: - """ Average requests served per epoch, in thousands """ - return self.worker.num_requests // self.num_epochs // 1000 - - class History: def __init__(self): self.download_avg: [float] = [] # Average data downloaded per worker per epoch in GB self.download_cv: [float] = [] # Coefficient of variation of downloaded data self.requests_avg: [float] = [] # Average number of requests served per worker per epoch in thousands self.requests_cv: [float] = [] # Coefficient of variation of number of served requests + self.rewards_avg: [float] = [] # Average number worker award per epoch + self.rewards_cv: [float] = [] # Coefficient of variation of worker rewards - def update(self, workers: '[QualifiedWorker]'): + def update(self, workers: '[Worker]'): downloads = [w.avg_download_gb for w in workers] self.download_avg.append(numpy.mean(downloads)) self.download_cv.append(numpy.std(downloads) / self.download_avg[-1]) @@ -214,6 +262,10 @@ def update(self, workers: '[QualifiedWorker]'): self.requests_avg.append(numpy.mean(requests)) self.requests_cv.append(numpy.std(requests) / self.requests_avg[-1]) + rewards = [w.avg_reward for w in workers] + self.rewards_avg.append(numpy.mean(rewards)) + self.rewards_cv.append(numpy.std(rewards) / self.rewards_avg[-1]) + class Scheduler(ABC): def __init__(self): @@ -237,6 +289,42 @@ def total_downloaded_data(self) -> int: def initial_sync_data(self) -> int: return sum(w.initial_sync_data for w in (self.workers + self.retired_workers)) + def base_apr(self) -> float: + target_capacity = len(self.units) * UNIT_SIZE_MB * REPLICATION_FACTOR + actual_capacity = int(len(self.workers) * WORKER_STORAGE_MB * 0.9) + u_rate = (target_capacity - actual_capacity) / target_capacity + if u_rate >= 1.0: + return 0.70 + if u_rate >= 0: + return 0.25 + int(u_rate * 10) * 0.05 + if u_rate >= -3: + return 0.20 + int(u_rate) * 0.05 + return 0.05 + + def discount_factor(self) -> float: + stake_ratio = len(self.workers) * WORKER_STAKE / SQD_SUPPLY + if stake_ratio >= 1.0: + return 0.1 + if stake_ratio >= 0.25: + return 1.0 - ((stake_ratio - 0.25) / 0.25 * 0.3) + return 1.0 + + def give_worker_rewards(self): + r_apr = self.base_apr() * self.discount_factor() + max_epoch_reward = r_apr * sum(w.stake for w in self.workers) * EPOCH_LEN_HOURS / 365 / 24 + + total_requests = sum(w.request_count_history[-1] for w in self.workers) + total_egress = sum(w.response_size_history[-1] for w in self.workers) + total_staked = sum(w.stake for w in self.workers) + + for worker in self.workers: + t_scanned = worker.request_count_history[-1] / total_requests + t_egress = worker.response_size_history[-1] / total_egress + traffic = numpy.sqrt(t_scanned * t_egress) + stake_share = worker.stake / total_staked + reward = int(max_epoch_reward * worker.liveness * worker.tenure * min((traffic / stake_share)**0.1, 1)) + worker.get_reward(reward) + @abstractmethod def assign_units(self, initial=False, mid_epoch=False): raise NotImplementedError @@ -254,7 +342,7 @@ def jail_random_worker(self): def retire_random_worker(self): worker = self.workers.pop(random.randint(0, len(self.workers)-1)) - worker.retire(self.epoch) + worker.retire() self.retired_workers_data += worker.stored_data self.retired_workers.append(worker) @@ -289,26 +377,19 @@ def run_epoch(self): for _ in range(NEW_UNITS_PER_EPOCH): self.units.append(Unit()) - # update history records - workers = self.get_qualified_workers() - self.history.update(workers) + self.give_worker_rewards() + self.history.update(self.workers + self.retired_workers) self.epoch += 1 - - def get_qualified_workers(self, threshold=0.0) -> '[QualifiedWorker]': - def epochs_active(worker: 'Worker') -> int: - last_epoch = worker.epoch_retired if worker.epoch_retired is not None else self.epoch - return last_epoch - worker.epoch_joined + 1 - min_epochs = int(self.epoch * threshold) - - return [ - QualifiedWorker(worker=worker, num_epochs=num_epochs) - for worker in (self.workers + self.retired_workers) - if (num_epochs := epochs_active(worker)) > min_epochs - ] + for worker in self.workers: + worker.next_epoch() def summary(self) -> 'Summary': - qualified_workers = self.get_qualified_workers(threshold=QUALIFIED_WORKER_THRESHOLD) + min_epochs = int(self.epoch * QUALIFIED_WORKER_THRESHOLD) + qualified_workers = [ + worker for worker in (self.workers + self.retired_workers) + if worker.epochs_active > min_epochs + ] return Summary( last_epoch=self.epoch, downloaded_data_gb=(self.total_downloaded_data - self.initial_sync_data) // 1024, @@ -317,7 +398,8 @@ def summary(self) -> 'Summary': new_chunks_data_gb=self.epoch * NEW_UNITS_PER_EPOCH * UNIT_SIZE_MB * REPLICATION_FACTOR // 1024, avg_worker_download={w.id: w.avg_download_gb for w in qualified_workers}, avg_worker_requests={w.id: w.avg_requests for w in qualified_workers}, - history=self.history + avg_worker_rewards={w.id: w.avg_reward for w in qualified_workers}, + history=self.history, ) def run_simulation(self): @@ -360,7 +442,10 @@ def assign_units(self, initial=False, mid_epoch=False): unit_weights = numpy.linspace(1.0, MIXING_RECENT_UNIT_WEIGHT, num=len(assigned_units)) probabilities = unit_weights / sum(unit_weights) for unit in numpy.random.choice(assigned_units, size=num_mixed_units, replace=False, p=probabilities): - unit.remove_random_replica() + if MIX_ALL_REPLICAS: + unit.remove_all_replicas() + else: + unit.remove_random_replica() # Workers shuffled and ordered by number of assigned units random.shuffle(self.workers) @@ -393,8 +478,9 @@ class Summary: jailed_workers_data_gb: int # Total data that was held by all jailed workers at the time they were jailed retired_workers_data_gb: int # Total data that was held by all retired workers at the time they were retired new_chunks_data_gb: int # Total size of new chunks which appeared throughout the simulation - avg_worker_download: dict['Id', int] # Average size of data downloaded per epoch for each worker - avg_worker_requests: dict['Id', int] # Average number of received requests per epoch for each worker + avg_worker_download: dict['Id', float] # Average size of data downloaded per epoch for each worker + avg_worker_requests: dict['Id', float] # Average number of received requests per epoch for each worker + avg_worker_rewards: dict['Id', float] # Average SQD received per epoch for each worker history: 'History' @property @@ -414,12 +500,15 @@ def to_string(self) -> str: f"Retired workers data: {self.retired_workers_data_gb} GB.\n" f"New chunks data: {self.new_chunks_data_gb} GB.\n" f"Unnecessary reshuffled {self.reshuffled_per_epoch} GB/epoch.\n" + f"Download variance coefficient: {self.history.download_cv[-1]}.\n" + f"Requests variance coefficient: {self.history.requests_cv[-1]}.\n" + f"Rewards variance coefficient: {self.history.rewards_cv[-1]}.\n" ) def save_plot(self, plot_path: Path): print(f"Saving plot to {plot_path}") - fig, ax = plt.subplots(3, 2, figsize=(8, 12)) + fig, ax = plt.subplots(3, 3, figsize=(12, 12)) seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0, 0]) ax[0, 0].set_xlabel("average data downloaded per epoch [GB]") @@ -429,6 +518,10 @@ def save_plot(self, plot_path: Path): ax[0, 1].set_xlabel("average requests served per epoch [thousands]") ax[0, 1].set_ylabel("# workers") + seaborn.histplot(data=self.avg_worker_rewards, kde=True, ax=ax[0, 2]) + ax[0, 2].set_xlabel("average reward per epoch [SQD]") + ax[0, 2].set_ylabel("# workers") + seaborn.lineplot(data=self.history.download_avg, ax=ax[1, 0]) ax[1, 0].set_xlabel("epoch") ax[1, 0].set_ylabel("avg data downloaded p/worker p/epoch [GB]") @@ -437,6 +530,10 @@ def save_plot(self, plot_path: Path): ax[1, 1].set_xlabel("epoch") ax[1, 1].set_ylabel("avg requests served p/worker p/epoch [thousands]") + seaborn.lineplot(data=self.history.rewards_avg, ax=ax[1, 2]) + ax[1, 2].set_xlabel("epoch") + ax[1, 2].set_ylabel("avg worker reward p/epoch [SQD]") + seaborn.lineplot(data=self.history.download_cv, ax=ax[2, 0]) ax[2, 0].set_xlabel("epoch") ax[2, 0].set_ylabel("variance coefficient of data downloaded") @@ -445,6 +542,10 @@ def save_plot(self, plot_path: Path): ax[2, 1].set_xlabel("epoch") ax[2, 1].set_ylabel("variance coefficient of requests served") + seaborn.lineplot(data=self.history.rewards_cv, ax=ax[2, 2]) + ax[2, 2].set_xlabel("epoch") + ax[2, 2].set_ylabel("variance coefficient of worker rewards") + plt.tight_layout() plt.savefig(plot_path, format="png") plt.close()