Skip to content

Commit

Permalink
Save a run with finished tasks with higher priority.
Browse files Browse the repository at this point in the history
Sometimes the worker list on the main page shows a worker
seemingly working on two different tasks - which is impossible.
This illusion occurs when one of the tasks is a new one and the
other task is a finished task by the same worker which has not been
written to disk yet. So from the point of view of the db it is
still unfinished.

This PR mitigates this issue by buffering a run with finished tasks
with priority 1 (note: a run with new tasks is buffered with
priority 2, other runs are buffered with priority 0).

Another reason for this PR is that we want to reduce the number
of direct buffer calls. This PR moves some buffer calls deeper
in the call chain.
  • Loading branch information
vdbergh authored and ppigazzini committed Jan 18, 2025
1 parent 0087ae4 commit e810977
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
1 change: 0 additions & 1 deletion server/fishtest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ def stop_run(self):
self.request.rundb.stop_run(self.run_id())
else:
self.request.rundb.set_inactive_task(self.task_id(), run)
self.request.rundb.buffer(run, True)

self.handle_error(error, exception=HTTPUnauthorized)
return self.add_time({})
Expand Down
12 changes: 9 additions & 3 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def set_inactive_task(self, task_id, run):
del self.connections_counter[remote_addr]
except Exception as e:
print(f"Error while deleting connection: {str(e)}", flush=True)
self.buffer(run, False, priority=1)

def set_bad_task(self, task_id, run, residual=None, residual_color=None):
zero_stats = {
Expand Down Expand Up @@ -365,6 +366,7 @@ def set_bad_task(self, task_id, run, residual=None, residual_color=None):
# to zero.
task["bad"] = True
task["stats"] = copy.deepcopy(zero_stats)
self.buffer(run, False, priority=1)

# Do not run two copies of this function in parallel!
def update_aggregated_data(self):
Expand Down Expand Up @@ -738,6 +740,12 @@ def get_run(self, r_id):
return self.runs.find_one({"_id": r_id_obj})

def buffer(self, run, flush, priority=0, create=False):
"""
Guidelines
==========
priority=1 : finished task
priority=2 : new task
"""
if not self.is_primary_instance():
print(
"Warning: attempt to use the run_cache on the",
Expand Down Expand Up @@ -859,7 +867,6 @@ def scavenge_dead_tasks(self):
task_id=task_id,
)
self.set_inactive_task(task_id, run)
self.buffer(run, False)

def get_unfinished_runs_id(self):
unfinished_runs = self.runs.find(
Expand Down Expand Up @@ -1279,7 +1286,7 @@ def priority(run): # lower is better

self.insert_in_wtt_map(run, task_id)

self.buffer(run, False, priority=1)
self.buffer(run, False, priority=2)

# Cache some data. Currently we record the id's
# the worker has seen, as well as the last id that was seen.
Expand Down Expand Up @@ -1532,7 +1539,6 @@ def failed_task(self, run_id, task_id, message="Unknown reason"):
# Mark the task as inactive.
self.set_inactive_task(task_id, run)
self.handle_crash_or_time(run, task_id)
self.buffer(run, False)
print(
"Failed_task: failure for: https://tests.stockfishchess.org/tests/view/{}, "
"task_id: {}, worker: {}, reason: '{}'".format(
Expand Down

0 comments on commit e810977

Please sign in to comment.