Skip to content

Commit

Permalink
Economy added to scheduling simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Oct 23, 2023
1 parent 8f3231c commit e0a4608
Showing 1 changed file with 149 additions and 48 deletions.
197 changes: 149 additions & 48 deletions scripts/scheduling_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
MAX_EPOCHS = int(os.environ.get('MAX_EPOCHS', '0'))
NUM_REPS = int(os.environ.get('NUM_REPS', '1'))
SCHEDULER_TYPE = os.environ.get('SCHEDULER_TYPE', 'random')
MIX_ALL_REPLICAS = 'MIX_ALL_REPLICAS' in os.environ
MIXED_UNITS_RATIO = float(os.environ.get('MIXED_UNITS_RATIO', '0.1'))
MIXING_RECENT_UNIT_WEIGHT = float(os.environ.get('MIXING_RECENT_UNIT_WEIGHT', '10'))
NUM_SQUIDS_PER_EPOCH = int(os.environ.get('NUM_SQUIDS_PER_EPOCH', '1000'))
QUALIFIED_WORKER_THRESHOLD = float(os.environ.get('QUALIFIED_WORKER_THRESHOLD', '0.25'))
SQD_SUPPLY = int(os.environ.get('SQD_SUPPLY', '1_337_000_000'))
WORKER_STAKE = int(os.environ.get('WORKER_STAKE', '100_000'))
EPOCH_LEN_HOURS = int(os.environ.get('EPOCH_LEN_HOURS', '3'))
RESPONSE_SIZE_AVG = float(os.environ.get('RESPONSE_SIZE_AVG', '5.0'))
RESPONSE_SIZE_SD = float(os.environ.get('RESPONSE_SIZE_SD', '1.0'))
OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', './out'))

assert 0 <= MIXED_UNITS_RATIO <= 1, 'MIXED_UNITS_RATIO should be in range [0,1]'
Expand Down Expand Up @@ -69,12 +75,15 @@ class Worker:
def __init__(self, epoch_joined=0):
self.id = random_id()
self.epoch_joined = epoch_joined
self.last_epoch = epoch_joined
self.epoch_retired: 'Optional[int]' = None
self.assigned_units: 'Set[Unit]' = set()
self.downloaded_units: 'Set[Unit]' = set()
self.request_count_history: [int] = [0]
self.response_size_history: [float] = [0.0]
self.total_rewards = 0
self.total_downloaded_data = 0
self.initial_sync_data = 0
self.num_requests = 0
self.jailed = False

def __eq__(self, other):
Expand All @@ -87,6 +96,10 @@ def __lt__(self, other):
return NotImplemented
return self.assigned_data < other.assigned_data

@property
def num_requests(self) -> int:
return sum(self.request_count_history)

@property
def stored_data(self) -> int:
return len(self.downloaded_units) * UNIT_SIZE_MB
Expand Down Expand Up @@ -131,7 +144,9 @@ def download_assigned(self) -> int:
return download_size

def make_request(self):
self.num_requests += 1
response_size = max(min(numpy.random.normal(loc=RESPONSE_SIZE_AVG, scale=RESPONSE_SIZE_SD, size=1)[0], 10), 0)
self.request_count_history[-1] += 1
self.response_size_history[-1] += response_size

def jail(self):
self.jailed = True
Expand All @@ -140,11 +155,56 @@ def jail(self):
def release(self):
self.jailed = False

def retire(self, epoch):
def retire(self):
assert self.epoch_retired is None
self.epoch_retired = epoch
self.epoch_retired = self.last_epoch
self.purge_assignment()

def next_epoch(self):
assert self.epoch_retired is None
self.last_epoch += 1
self.request_count_history.append(0)
self.response_size_history.append(0.0)

def get_reward(self, reward: int):
assert self.epoch_retired is None
self.total_rewards += reward

@property
def epochs_active(self) -> int:
assert self.epoch_retired is None or self.epoch_retired == self.last_epoch
return self.last_epoch - self.epoch_joined + 1

@property
def avg_download_gb(self) -> float:
""" Average data downloaded per epoch, excluding initial sync, in GB """
return (self.total_downloaded_data - self.initial_sync_data) / self.epochs_active / 1024

@property
def avg_requests(self) -> float:
""" Average requests served per epoch, in thousands """
return self.num_requests / self.epochs_active / 1000

@property
def avg_reward(self) -> float:
return self.total_rewards / self.epochs_active

@property
def stake(self) -> int:
return WORKER_STAKE

@property
def liveness(self) -> float:
return 1.0

@property
def tenure(self) -> float:
if self.epochs_active >= 10:
return 1.0
if self.epochs_active >= 2:
return 0.5 + self.epochs_active // 2 * 0.1
return 0.5


class Unit:
def __init__(self):
Expand All @@ -165,9 +225,15 @@ def missing_replicas(self) -> int:

def remove_random_replica(self):
assert len(self.assigned_to) > 0
_, worker = self.assigned_to.popitem()
worker_id = random.choice(list(self.assigned_to.keys()))
worker = self.assigned_to.pop(worker_id)
worker.assigned_units.remove(self)

def remove_all_replicas(self):
while len(self.assigned_to) > 0:
_, worker = self.assigned_to.popitem()
worker.assigned_units.remove(self)

def query(self):
assert len(self.assigned_to) > 0
random.choice(list(self.assigned_to.values())).make_request()
Expand All @@ -178,34 +244,16 @@ def __init__(self):
super().__init__("Not enough workers to assign unit")


@dataclass
class QualifiedWorker:
worker: 'Worker'
num_epochs: int

@property
def id(self) -> 'Id':
return self.worker.id

@property
def avg_download_gb(self) -> int:
""" Average data downloaded per epoch, excluding initial sync, in GB """
return (self.worker.total_downloaded_data - self.worker.initial_sync_data) // self.num_epochs // 1024

@property
def avg_requests(self) -> int:
""" Average requests served per epoch, in thousands """
return self.worker.num_requests // self.num_epochs // 1000


class History:
def __init__(self):
self.download_avg: [float] = [] # Average data downloaded per worker per epoch in GB
self.download_cv: [float] = [] # Coefficient of variation of downloaded data
self.requests_avg: [float] = [] # Average number of requests served per worker per epoch in thousands
self.requests_cv: [float] = [] # Coefficient of variation of number of served requests
self.rewards_avg: [float] = [] # Average number worker award per epoch
self.rewards_cv: [float] = [] # Coefficient of variation of worker rewards

def update(self, workers: '[QualifiedWorker]'):
def update(self, workers: '[Worker]'):
downloads = [w.avg_download_gb for w in workers]
self.download_avg.append(numpy.mean(downloads))
self.download_cv.append(numpy.std(downloads) / self.download_avg[-1])
Expand All @@ -214,6 +262,10 @@ def update(self, workers: '[QualifiedWorker]'):
self.requests_avg.append(numpy.mean(requests))
self.requests_cv.append(numpy.std(requests) / self.requests_avg[-1])

rewards = [w.avg_reward for w in workers]
self.rewards_avg.append(numpy.mean(rewards))
self.rewards_cv.append(numpy.std(rewards) / self.rewards_avg[-1])


class Scheduler(ABC):
def __init__(self):
Expand All @@ -237,6 +289,42 @@ def total_downloaded_data(self) -> int:
def initial_sync_data(self) -> int:
return sum(w.initial_sync_data for w in (self.workers + self.retired_workers))

def base_apr(self) -> float:
target_capacity = len(self.units) * UNIT_SIZE_MB * REPLICATION_FACTOR
actual_capacity = int(len(self.workers) * WORKER_STORAGE_MB * 0.9)
u_rate = (target_capacity - actual_capacity) / target_capacity
if u_rate >= 1.0:
return 0.70
if u_rate >= 0:
return 0.25 + int(u_rate * 10) * 0.05
if u_rate >= -3:
return 0.20 + int(u_rate) * 0.05
return 0.05

def discount_factor(self) -> float:
stake_ratio = len(self.workers) * WORKER_STAKE / SQD_SUPPLY
if stake_ratio >= 1.0:
return 0.1
if stake_ratio >= 0.25:
return 1.0 - ((stake_ratio - 0.25) / 0.25 * 0.3)
return 1.0

def give_worker_rewards(self):
r_apr = self.base_apr() * self.discount_factor()
max_epoch_reward = r_apr * sum(w.stake for w in self.workers) * EPOCH_LEN_HOURS / 365 / 24

total_requests = sum(w.request_count_history[-1] for w in self.workers)
total_egress = sum(w.response_size_history[-1] for w in self.workers)
total_staked = sum(w.stake for w in self.workers)

for worker in self.workers:
t_scanned = worker.request_count_history[-1] / total_requests
t_egress = worker.response_size_history[-1] / total_egress
traffic = numpy.sqrt(t_scanned * t_egress)
stake_share = worker.stake / total_staked
reward = int(max_epoch_reward * worker.liveness * worker.tenure * min((traffic / stake_share)**0.1, 1))
worker.get_reward(reward)

@abstractmethod
def assign_units(self, initial=False, mid_epoch=False):
raise NotImplementedError
Expand All @@ -254,7 +342,7 @@ def jail_random_worker(self):

def retire_random_worker(self):
worker = self.workers.pop(random.randint(0, len(self.workers)-1))
worker.retire(self.epoch)
worker.retire()
self.retired_workers_data += worker.stored_data
self.retired_workers.append(worker)

Expand Down Expand Up @@ -289,26 +377,19 @@ def run_epoch(self):
for _ in range(NEW_UNITS_PER_EPOCH):
self.units.append(Unit())

# update history records
workers = self.get_qualified_workers()
self.history.update(workers)
self.give_worker_rewards()
self.history.update(self.workers + self.retired_workers)

self.epoch += 1

def get_qualified_workers(self, threshold=0.0) -> '[QualifiedWorker]':
def epochs_active(worker: 'Worker') -> int:
last_epoch = worker.epoch_retired if worker.epoch_retired is not None else self.epoch
return last_epoch - worker.epoch_joined + 1
min_epochs = int(self.epoch * threshold)

return [
QualifiedWorker(worker=worker, num_epochs=num_epochs)
for worker in (self.workers + self.retired_workers)
if (num_epochs := epochs_active(worker)) > min_epochs
]
for worker in self.workers:
worker.next_epoch()

def summary(self) -> 'Summary':
qualified_workers = self.get_qualified_workers(threshold=QUALIFIED_WORKER_THRESHOLD)
min_epochs = int(self.epoch * QUALIFIED_WORKER_THRESHOLD)
qualified_workers = [
worker for worker in (self.workers + self.retired_workers)
if worker.epochs_active > min_epochs
]
return Summary(
last_epoch=self.epoch,
downloaded_data_gb=(self.total_downloaded_data - self.initial_sync_data) // 1024,
Expand All @@ -317,7 +398,8 @@ def summary(self) -> 'Summary':
new_chunks_data_gb=self.epoch * NEW_UNITS_PER_EPOCH * UNIT_SIZE_MB * REPLICATION_FACTOR // 1024,
avg_worker_download={w.id: w.avg_download_gb for w in qualified_workers},
avg_worker_requests={w.id: w.avg_requests for w in qualified_workers},
history=self.history
avg_worker_rewards={w.id: w.avg_reward for w in qualified_workers},
history=self.history,
)

def run_simulation(self):
Expand Down Expand Up @@ -360,7 +442,10 @@ def assign_units(self, initial=False, mid_epoch=False):
unit_weights = numpy.linspace(1.0, MIXING_RECENT_UNIT_WEIGHT, num=len(assigned_units))
probabilities = unit_weights / sum(unit_weights)
for unit in numpy.random.choice(assigned_units, size=num_mixed_units, replace=False, p=probabilities):
unit.remove_random_replica()
if MIX_ALL_REPLICAS:
unit.remove_all_replicas()
else:
unit.remove_random_replica()

# Workers shuffled and ordered by number of assigned units
random.shuffle(self.workers)
Expand Down Expand Up @@ -393,8 +478,9 @@ class Summary:
jailed_workers_data_gb: int # Total data that was held by all jailed workers at the time they were jailed
retired_workers_data_gb: int # Total data that was held by all retired workers at the time they were retired
new_chunks_data_gb: int # Total size of new chunks which appeared throughout the simulation
avg_worker_download: dict['Id', int] # Average size of data downloaded per epoch for each worker
avg_worker_requests: dict['Id', int] # Average number of received requests per epoch for each worker
avg_worker_download: dict['Id', float] # Average size of data downloaded per epoch for each worker
avg_worker_requests: dict['Id', float] # Average number of received requests per epoch for each worker
avg_worker_rewards: dict['Id', float] # Average SQD received per epoch for each worker
history: 'History'

@property
Expand All @@ -414,12 +500,15 @@ def to_string(self) -> str:
f"Retired workers data: {self.retired_workers_data_gb} GB.\n"
f"New chunks data: {self.new_chunks_data_gb} GB.\n"
f"Unnecessary reshuffled {self.reshuffled_per_epoch} GB/epoch.\n"
f"Download variance coefficient: {self.history.download_cv[-1]}.\n"
f"Requests variance coefficient: {self.history.requests_cv[-1]}.\n"
f"Rewards variance coefficient: {self.history.rewards_cv[-1]}.\n"
)

def save_plot(self, plot_path: Path):
print(f"Saving plot to {plot_path}")

fig, ax = plt.subplots(3, 2, figsize=(8, 12))
fig, ax = plt.subplots(3, 3, figsize=(12, 12))

seaborn.histplot(data=self.avg_worker_download, kde=True, ax=ax[0, 0])
ax[0, 0].set_xlabel("average data downloaded per epoch [GB]")
Expand All @@ -429,6 +518,10 @@ def save_plot(self, plot_path: Path):
ax[0, 1].set_xlabel("average requests served per epoch [thousands]")
ax[0, 1].set_ylabel("# workers")

seaborn.histplot(data=self.avg_worker_rewards, kde=True, ax=ax[0, 2])
ax[0, 2].set_xlabel("average reward per epoch [SQD]")
ax[0, 2].set_ylabel("# workers")

seaborn.lineplot(data=self.history.download_avg, ax=ax[1, 0])
ax[1, 0].set_xlabel("epoch")
ax[1, 0].set_ylabel("avg data downloaded p/worker p/epoch [GB]")
Expand All @@ -437,6 +530,10 @@ def save_plot(self, plot_path: Path):
ax[1, 1].set_xlabel("epoch")
ax[1, 1].set_ylabel("avg requests served p/worker p/epoch [thousands]")

seaborn.lineplot(data=self.history.rewards_avg, ax=ax[1, 2])
ax[1, 2].set_xlabel("epoch")
ax[1, 2].set_ylabel("avg worker reward p/epoch [SQD]")

seaborn.lineplot(data=self.history.download_cv, ax=ax[2, 0])
ax[2, 0].set_xlabel("epoch")
ax[2, 0].set_ylabel("variance coefficient of data downloaded")
Expand All @@ -445,6 +542,10 @@ def save_plot(self, plot_path: Path):
ax[2, 1].set_xlabel("epoch")
ax[2, 1].set_ylabel("variance coefficient of requests served")

seaborn.lineplot(data=self.history.rewards_cv, ax=ax[2, 2])
ax[2, 2].set_xlabel("epoch")
ax[2, 2].set_ylabel("variance coefficient of worker rewards")

plt.tight_layout()
plt.savefig(plot_path, format="png")
plt.close()
Expand Down

0 comments on commit e0a4608

Please sign in to comment.