Skip to content

Commit

Permalink
Split the convoluted "flush_buffers" into three scheduled tasks
Browse files Browse the repository at this point in the history
flush_buffers: syncs oldest cache entry (period: 1s)
clean_cache: evicts old cache entries (period: 60s)
scavenge_dead_tasks: (period: 60s).

This PR should also fix the deadlock official-stockfish#2041.
  • Loading branch information
vdbergh authored and ppigazzini committed Jun 1, 2024
1 parent d90610c commit 02bfad2
Showing 1 changed file with 54 additions and 54 deletions.
108 changes: 54 additions & 54 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ def schedule_tasks(self):
if self.scheduler is None:
self.scheduler = Scheduler(jitter=0.05)
self.scheduler.add_task(1.0, self.flush_buffers)
self.scheduler.add_task(60.0, self.clean_cache)
self.scheduler.add_task(60.0, self.scavenge_dead_tasks)
self.scheduler.add_task(180.0, self.validate_random_run)

def validate_random_run(self):
Expand Down Expand Up @@ -477,70 +479,68 @@ def flush_all(self):
print("done", flush=True)

# For documentation of the cache format see "cache_schema" in schemas.py.

def flush_buffers(self):
try:
self.run_cache_lock.acquire()
now = time.time()
old = now + 1
oldest_entry = None
# We make this a list to be able to change run_cache during iteration
for r_id, cache_entry in list(self.run_cache.items()):
oldest_entry = None
old = float("inf")
with self.run_cache_lock:
for cache_entry in self.run_cache.values():
run = cache_entry["run"]
if not cache_entry["is_changed"]:
if not run["finished"] and (
"last_scavenge_time" not in cache_entry
or cache_entry["last_scavenge_time"] < now - 60
):
cache_entry["last_scavenge_time"] = now
if self.scavenge(run):
with self.run_cache_write_lock:
self.runs.replace_one({"_id": run["_id"]}, run)
# Presently run["finished"] implies run["cores"]==0 but
# this was not always true in the past.
if (run["cores"] <= 0 or run["finished"]) and cache_entry[
"last_access_time"
] < now - 300:
del self.run_cache[r_id]
elif cache_entry["last_sync_time"] < old:
if cache_entry["is_changed"] and cache_entry["last_sync_time"] < old:
old = cache_entry["last_sync_time"]
oldest_entry = cache_entry
if oldest_entry is not None:
oldest_run = oldest_entry["run"]
self.scavenge(oldest_run)
oldest_entry["last_scavenge_time"] = now
oldest_entry["is_changed"] = False
oldest_entry["last_sync_time"] = time.time()
with self.run_cache_write_lock:
self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run)
except Exception as e:
print(f"Flush exception: {str(e)}", flush=True)
finally:
self.run_cache_lock.release()

def scavenge(self, run):
if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300):
return False
dead_task = False
old = datetime.now(timezone.utc) - timedelta(minutes=6)
task_id = -1
for task in run["tasks"]:
task_id += 1
if task["active"] and task["last_updated"] < old:
self.set_inactive_task(task_id, run)
dead_task = True
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
run["_id"], task_id, worker_name(task["worker_info"])
),
flush=True,
)
self.handle_crash_or_time(run, task_id)
self.actiondb.dead_task(
username=task["worker_info"]["username"],
run=run,
task_id=task_id,
)
return dead_task

def clean_cache(self):
now = time.time()
with self.run_cache_lock:
# We make this a list to be able to change run_cache during iteration
for r_id, cache_entry in list(self.run_cache.items()):
run = cache_entry["run"]
# Presently run["finished"] implies run["cores"]==0 but
# this was not always true in the past.
if (
not cache_entry["is_changed"]
and (run["cores"] <= 0 or run["finished"])
and cache_entry["last_access_time"] < now - 300
):
del self.run_cache[r_id]

def scavenge_dead_tasks(self):
now = time.time()
dead_tasks = []
with self.run_cache_lock:
for cache_entry in self.run_cache.values():
run = cache_entry["run"]
if not cache_entry["is_changed"] and not run["finished"]:
for task_id, task in enumerate(run["tasks"]):
if (
task["active"]
and task["last_updated"].timestamp() < now - 360
):
cache_entry["is_changed"] = True
dead_tasks.append((task_id, run))
# We release the lock to avoid deadlock
for task_id, run in dead_tasks:
task = run["tasks"][task_id]
print(
"dead task: run: https://tests.stockfishchess.org/tests/view/{} task_id: {} worker: {}".format(
run["_id"], task_id, worker_name(task["worker_info"])
),
flush=True,
)
self.handle_crash_or_time(run, task_id)
self.actiondb.dead_task(
username=task["worker_info"]["username"],
run=run,
task_id=task_id,
)
self.set_inactive_task(task_id, run)

def get_unfinished_runs_id(self):
with self.run_cache_write_lock:
Expand Down

0 comments on commit 02bfad2

Please sign in to comment.