diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 3dfd2c794..478710d05 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -76,14 +76,14 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True): def set_inactive_run(self, run): run_id = run["_id"] - with self.active_run_lock(str(run_id)): + with self.active_run_lock(run_id, name="aggregates"): run["workers"] = run["cores"] = 0 for task in run["tasks"]: task["active"] = False def set_inactive_task(self, task_id, run): run_id = run["_id"] - with self.active_run_lock(str(run_id)): + with self.active_run_lock(run_id, name="aggregates"): task = run["tasks"][task_id] if task["active"]: run["workers"] -= 1 @@ -98,7 +98,8 @@ def update_workers_cores(self): workers = cores = 0 run_id = r["_id"] run = self.get_run(run_id) - with self.active_run_lock(str(run_id)): + changed = False + with self.active_run_lock(run_id, name="aggregates"): for task in run["tasks"]: if task["active"]: workers += 1 @@ -110,7 +111,9 @@ def update_workers_cores(self): flush=True, ) run["workers"], run["cores"] = workers, cores - self.buffer(run, False) + changed = True + if changed: + self.buffer(run, False) def new_run( self, @@ -1015,50 +1018,51 @@ def priority(run): # lower is better # Now we create a new task for this run. run_id = run["_id"] - with self.active_run_lock(str(run_id)): - # It may happen that the run we have selected is now finished. - # Since this is very rare we just return instead of cluttering the - # code with remedial actions. - if run["finished"]: - info = ( - f"Request_task: alas the run {run_id} corresponding to the " - "assigned task has meanwhile finished. Please try again..." - ) - print(info, flush=True) - return {"task_waiting": False, "info": info} + with self.active_run_lock(run_id): + with self.active_run_lock(run_id, name="aggregates"): + # It may happen that the run we have selected is now finished. + # Since this is very rare we just return instead of cluttering the + # code with remedial actions. + if run["finished"]: + info = ( + f"Request_task: alas the run {run_id} corresponding to the " + "assigned task has meanwhile finished. Please try again..." + ) + print(info, flush=True) + return {"task_waiting": False, "info": info} - opening_offset = 0 - for task in run["tasks"]: - opening_offset += task["num_games"] + opening_offset = 0 + for task in run["tasks"]: + opening_offset += task["num_games"] - if "sprt" in run["args"]: - sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"] - remaining = sprt_batch_size_games * math.ceil( - remaining / sprt_batch_size_games - ) + if "sprt" in run["args"]: + sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"] + remaining = sprt_batch_size_games * math.ceil( + remaining / sprt_batch_size_games + ) - task_size = min(self.worker_cap(run, worker_info), remaining) - task = { - "num_games": task_size, - "active": True, - "worker_info": worker_info, - "last_updated": datetime.now(timezone.utc), - "start": opening_offset, - "stats": { - "wins": 0, - "losses": 0, - "draws": 0, - "crashes": 0, - "time_losses": 0, - "pentanomial": 5 * [0], - }, - } - run["tasks"].append(task) + task_size = min(self.worker_cap(run, worker_info), remaining) + task = { + "num_games": task_size, + "active": True, + "worker_info": worker_info, + "last_updated": datetime.now(timezone.utc), + "start": opening_offset, + "stats": { + "wins": 0, + "losses": 0, + "draws": 0, + "crashes": 0, + "time_losses": 0, + "pentanomial": 5 * [0], + }, + } + run["tasks"].append(task) - task_id = len(run["tasks"]) - 1 + task_id = len(run["tasks"]) - 1 - run["workers"] += 1 - run["cores"] += task["worker_info"]["concurrency"] + run["workers"] += 1 + run["cores"] += task["worker_info"]["concurrency"] self.buffer(run, False) @@ -1081,7 +1085,10 @@ def priority(run): # lower is better active_runs = {} purge_count = 0 - def active_run_lock(self, id): + def active_run_lock(self, id, name="run"): + valid_names = {"run", "aggregates"} + assert name in valid_names + id = str(id) with self.run_lock: self.purge_count = self.purge_count + 1 if self.purge_count > 100000: @@ -1090,12 +1097,13 @@ def active_run_lock(self, id): (k, v) for k, v in self.active_runs.items() if v["time"] >= old ) self.purge_count = 0 - if id in self.active_runs: - active_lock = self.active_runs[id]["lock"] - self.active_runs[id]["time"] = time.time() + key = (id, name) + if key in self.active_runs: + active_lock = self.active_runs[key]["lock"] + self.active_runs[key]["time"] = time.time() else: - active_lock = threading.RLock() - self.active_runs[id] = {"time": time.time(), "lock": active_lock} + active_lock = threading.Lock() + self.active_runs[key] = {"time": time.time(), "lock": active_lock} return active_lock def finished_run_message(self, run): @@ -1168,7 +1176,7 @@ def handle_crash_or_time(self, run, task_id): ) def update_task(self, worker_info, run_id, task_id, stats, spsa): - lock = self.active_run_lock(str(run_id)) + lock = self.active_run_lock(run_id) with lock: return self.sync_update_task(worker_info, run_id, task_id, stats, spsa)