Skip to content

Commit

Permalink
Presumably a fix for deadlock (see official-stockfish#2041).
Browse files Browse the repository at this point in the history
We use a separate lock to update aggregates. To this end we
extend self.active_run_lock() with an optional argument
"name" to be able to have different locks associated with the same
run_id.
  • Loading branch information
vdbergh committed May 29, 2024
1 parent dac6960 commit 425d898
Showing 1 changed file with 58 additions and 50 deletions.
108 changes: 58 additions & 50 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):

def set_inactive_run(self, run):
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
with self.active_run_lock(run_id, name="aggregates"):
run["workers"] = run["cores"] = 0
for task in run["tasks"]:
task["active"] = False

def set_inactive_task(self, task_id, run):
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
with self.active_run_lock(run_id, name="aggregates"):
task = run["tasks"][task_id]
if task["active"]:
run["workers"] -= 1
Expand All @@ -98,7 +98,8 @@ def update_workers_cores(self):
workers = cores = 0
run_id = r["_id"]
run = self.get_run(run_id)
with self.active_run_lock(str(run_id)):
changed = False
with self.active_run_lock(run_id, name="aggregates"):
for task in run["tasks"]:
if task["active"]:
workers += 1
Expand All @@ -110,7 +111,9 @@ def update_workers_cores(self):
flush=True,
)
run["workers"], run["cores"] = workers, cores
self.buffer(run, False)
changed = True
if changed:
self.buffer(run, False)

def new_run(
self,
Expand Down Expand Up @@ -1015,50 +1018,51 @@ def priority(run): # lower is better

# Now we create a new task for this run.
run_id = run["_id"]
with self.active_run_lock(str(run_id)):
# It may happen that the run we have selected is now finished.
# Since this is very rare we just return instead of cluttering the
# code with remedial actions.
if run["finished"]:
info = (
f"Request_task: alas the run {run_id} corresponding to the "
"assigned task has meanwhile finished. Please try again..."
)
print(info, flush=True)
return {"task_waiting": False, "info": info}
with self.active_run_lock(run_id):
with self.active_run_lock(run_id, name="aggregates"):
# It may happen that the run we have selected is now finished.
# Since this is very rare we just return instead of cluttering the
# code with remedial actions.
if run["finished"]:
info = (
f"Request_task: alas the run {run_id} corresponding to the "
"assigned task has meanwhile finished. Please try again..."
)
print(info, flush=True)
return {"task_waiting": False, "info": info}

opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]
opening_offset = 0
for task in run["tasks"]:
opening_offset += task["num_games"]

if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)
if "sprt" in run["args"]:
sprt_batch_size_games = 2 * run["args"]["sprt"]["batch_size"]
remaining = sprt_batch_size_games * math.ceil(
remaining / sprt_batch_size_games
)

task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)
task_size = min(self.worker_cap(run, worker_info), remaining)
task = {
"num_games": task_size,
"active": True,
"worker_info": worker_info,
"last_updated": datetime.now(timezone.utc),
"start": opening_offset,
"stats": {
"wins": 0,
"losses": 0,
"draws": 0,
"crashes": 0,
"time_losses": 0,
"pentanomial": 5 * [0],
},
}
run["tasks"].append(task)

task_id = len(run["tasks"]) - 1
task_id = len(run["tasks"]) - 1

run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]
run["workers"] += 1
run["cores"] += task["worker_info"]["concurrency"]

self.buffer(run, False)

Expand All @@ -1081,7 +1085,10 @@ def priority(run): # lower is better
active_runs = {}
purge_count = 0

def active_run_lock(self, id):
def active_run_lock(self, id, name="run"):
valid_names = {"run", "aggregates"}
assert name in valid_names
id = str(id)
with self.run_lock:
self.purge_count = self.purge_count + 1
if self.purge_count > 100000:
Expand All @@ -1090,12 +1097,13 @@ def active_run_lock(self, id):
(k, v) for k, v in self.active_runs.items() if v["time"] >= old
)
self.purge_count = 0
if id in self.active_runs:
active_lock = self.active_runs[id]["lock"]
self.active_runs[id]["time"] = time.time()
key = (id, name)
if key in self.active_runs:
active_lock = self.active_runs[key]["lock"]
self.active_runs[key]["time"] = time.time()
else:
active_lock = threading.RLock()
self.active_runs[id] = {"time": time.time(), "lock": active_lock}
active_lock = threading.Lock()
self.active_runs[key] = {"time": time.time(), "lock": active_lock}
return active_lock

def finished_run_message(self, run):
Expand Down Expand Up @@ -1168,7 +1176,7 @@ def handle_crash_or_time(self, run, task_id):
)

def update_task(self, worker_info, run_id, task_id, stats, spsa):
lock = self.active_run_lock(str(run_id))
lock = self.active_run_lock(run_id)
with lock:
return self.sync_update_task(worker_info, run_id, task_id, stats, spsa)

Expand Down

0 comments on commit 425d898

Please sign in to comment.