From 40d51982c283689d36545cb29e0f80e1c2d9b63c Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Sat, 1 Jun 2024 08:00:17 +0000 Subject: [PATCH] Split the convoluted "flush_buffers" into three scheduled tasks flush_buffers: syncs oldest cache entry (period: 1s) clean_cache: evicts old cache entries (period: 60s) scavenge_dead_tasks: (period: 60s). This PR should also fix the deadlock #2041. --- server/fishtest/rundb.py | 95 +++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 458712391..8e7fd3843 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -84,6 +84,8 @@ def schedule_tasks(self): if self.scheduler is None: self.scheduler = Scheduler(jitter=0.05) self.scheduler.add_task(1.0, self.flush_buffers) + self.scheduler.add_task(60.0, self.clean_cache) + self.scheduler.add_task(60.0, self.scavenge_dead_tasks) self.scheduler.add_task(180.0, self.validate_random_run) def validate_random_run(self): @@ -468,70 +470,55 @@ def flush_all(self): print("done", flush=True) # For documentation of the cache format see "cache_schema" in schemas.py. + def flush_buffers(self): - try: - self.run_cache_lock.acquire() - now = time.time() - old = now + 1 - oldest_entry = None - # We make this a list to be able to change run_cache during iteration - for r_id, cache_entry in list(self.run_cache.items()): + oldest_entry = None + old = float("inf") + with self.run_cache_lock: + for cache_entry in self.run_cache.values(): run = cache_entry["run"] - if not cache_entry["is_changed"]: - if not run["finished"] and ( - "last_scavenge_time" not in cache_entry - or cache_entry["last_scavenge_time"] < now - 60 - ): - cache_entry["last_scavenge_time"] = now - if self.scavenge(run): - with self.run_cache_write_lock: - self.runs.replace_one({"_id": run["_id"]}, run) - # Presently run["finished"] implies run["cores"]==0 but - # this was not always true in the past. - if (run["cores"] <= 0 or run["finished"]) and cache_entry[ - "last_access_time" - ] < now - 300: - del self.run_cache[r_id] - elif cache_entry["last_sync_time"] < old: + if cache_entry["is_changed"] and cache_entry["last_sync_time"] < old: old = cache_entry["last_sync_time"] oldest_entry = cache_entry if oldest_entry is not None: oldest_run = oldest_entry["run"] - self.scavenge(oldest_run) - oldest_entry["last_scavenge_time"] = now oldest_entry["is_changed"] = False oldest_entry["last_sync_time"] = time.time() with self.run_cache_write_lock: self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run) - except Exception as e: - print(f"Flush exception: {str(e)}", flush=True) - finally: - self.run_cache_lock.release() - - def scavenge(self, run): - if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300): - return False - dead_task = False - old = datetime.now(timezone.utc) - timedelta(minutes=6) - task_id = -1 - for task in run["tasks"]: - task_id += 1 - if task["active"] and task["last_updated"] < old: - self.set_inactive_task(task_id, run) - dead_task = True - print( - "dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format( - run["_id"], task_id, worker_name(task["worker_info"]) - ), - flush=True, - ) - self.handle_crash_or_time(run, task_id) - self.actiondb.dead_task( - username=task["worker_info"]["username"], - run=run, - task_id=task_id, - ) - return dead_task + + def clean_cache(self): + now = time.time() + with self.run_cache_lock: + # We make this a list to be able to change run_cache during iteration + for r_id, cache_entry in list(self.run_cache.items()): + run = cache_entry["run"] + # Presently run["finished"] implies run["cores"]==0 but + # this was not always true in the past. + if ( + not cache_entry["is_changed"] + and (run["cores"] <= 0 or run["finished"]) + and cache_entry["last_access_time"] < now - 300 + ): + del self.run_cache[r_id] + + def scavenge_dead_tasks(self): + now = time.time() + dead_tasks = [] + with self.run_cache_lock: + for cache_entry in self.run_cache.values(): + run = cache_entry["run"] + if not cache_entry["is_changed"] and not run["finished"]: + for task_id, task in enumerate(run["tasks"]): + if ( + task["active"] + and task["last_updated"].timestamp() < now - 360 + ): + cache_entry["is_changed"] = True + dead_tasks.append((task_id, run)) + # We release the lock to avoid deadlock + for task_id, run in dead_tasks: + self.set_inactive_task(task_id, run) def get_unfinished_runs_id(self): with self.run_cache_write_lock: