diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 838bf7fdb..a7e514c0c 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -117,6 +117,33 @@ def schedule_tasks(self): self.scheduler.create_task( 900.0, self.validate_data_structures, initial_delay=60.0 ) + self.scheduler.create_task(60.0, self.update_nps_gpm) + + def update_nps_gpm(self): + with self.unfinished_runs_lock: + unfinished_runs = [self.get_run(run_id) for run_id in self.unfinished_runs] + for run in unfinished_runs: + nps = 0.0 + games_per_minute = 0.0 + # excess of caution + with self.active_run_lock(str(run["_id"])): + tasks = copy.copy(run["tasks"]) + for task in tasks: + if task["active"]: + concurrency = task["worker_info"]["concurrency"] + nps += concurrency * float(task["worker_info"]["nps"]) + if task["worker_info"]["nps"] != 0: + games_per_minute += ( + (task["worker_info"]["nps"] / 691680.0) + * (60.0 / estimate_game_duration(run["args"]["tc"])) + * ( + int(task["worker_info"]["concurrency"]) + // run["args"].get("threads", 1) + ) + ) + run["nps"] = nps + run["games_per_minute"] = games_per_minute + self.buffer(run, False) def validate_data_structures(self): # The main purpose of task is to ensure that the schemas @@ -252,6 +279,8 @@ def set_inactive_run(self, run): self.set_inactive_task(task_id, run) self.unfinished_runs.discard(run_id) run["finished"] = True + run["nps"] = 0.0 + run["games_per_minute"] = 0.0 flags = compute_flags(run) run.update(flags) @@ -421,6 +450,7 @@ def update_aggregated_data(self): self.insert_in_wtt_map(run, task_id) self.update_itp() + self.update_nps_gpm() def new_run( self, @@ -527,6 +557,8 @@ def new_run( "cores": 0, "committed_games": 0, "total_games": 0, + "nps": 0.0, + "games_per_minute": 0.0, } # Administrative flags. @@ -819,7 +851,7 @@ def get_unfinished_runs_id(self): def get_unfinished_runs(self, username=None): # Note: the result can be only used once. - unfinished_runs = self.runs.find({"finished": False}) + unfinished_runs = self.runs.find({"finished": False}, {"_id": 1, "tasks": 0}) if username: unfinished_runs = ( r for r in unfinished_runs if r["args"].get("username") == username @@ -869,25 +901,14 @@ def aggregate_unfinished_runs(self, username=None): ) cores = 0 - nps = 0 - games_per_minute = 0.0 machines_count = 0 + nps = 0.0 + games_per_minute = 0.0 for run in runs["active"]: machines_count += run["workers"] cores += run["cores"] - for task_id, task in enumerate(run["tasks"]): - if task["active"]: - concurrency = int(task["worker_info"]["concurrency"]) - nps += concurrency * task["worker_info"]["nps"] - if task["worker_info"]["nps"] != 0: - games_per_minute += ( - (task["worker_info"]["nps"] / 691680) - * (60.0 / estimate_game_duration(run["args"]["tc"])) - * ( - int(task["worker_info"]["concurrency"]) - // run["args"].get("threads", 1) - ) - ) + nps += run["nps"] + games_per_minute += run["games_per_minute"] pending_hours = 0 for run in runs["pending"] + runs["active"]: diff --git a/server/fishtest/schemas.py b/server/fishtest/schemas.py index 45ed5db4c..9d05aefda 100644 --- a/server/fishtest/schemas.py +++ b/server/fishtest/schemas.py @@ -640,7 +640,7 @@ def flags_must_match(run): # about non-validation of runs created with the prior # schema. -RUN_VERSION = 4 +RUN_VERSION = 5 runs_schema = intersect( { @@ -663,6 +663,8 @@ def flags_must_match(run): "committed_games": uint, "total_games": uint, "results": results_schema, + "nps": ufloat, + "games_per_minute": ufloat, "args": intersect( { "base_tag": str, @@ -798,7 +800,13 @@ def flags_must_match(run): lax( ifthen( {"finished": True}, - {"workers": 0, "cores": 0, "tasks": [{"active": False}, ...]}, + { + "workers": 0, + "cores": 0, + "nps": 0.0, + "games_per_minute": 0.0, + "tasks": [{"active": False}, ...], + }, ) ), valid_aggregated_data,