From b091b4ef8a0bbde86db4bf2e251cffa11a34f04e Mon Sep 17 00:00:00 2001 From: Michel Van den Bergh Date: Thu, 24 Aug 2023 12:37:30 +0000 Subject: [PATCH] Try to avoid workers with the same name. During request_task, we check if there are active tasks for workers with the same name, which have recently been updated (i.e. they are not dead). If so then we return an error. Should fix https://github.com/official-stockfish/fishtest/issues/1360#issuecomment-1689436054 . --- server/fishtest/rundb.py | 49 +++++++++++++++++++++++++++------------- server/tests/test_api.py | 22 +++++++++--------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index b5dbcccb1..4a0ea3158 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -769,27 +769,44 @@ def priority(run): # lower is better self.task_runs.sort(key=priority) - # We go through the list of unfinished runs to see if the worker + # get the list of active tasks + + active_tasks = [ + task for run in self.task_runs for task in run["tasks"] if task["active"] + ] + + # We go through the list of active tasks to see if a worker with the same + # name is already connected. + + my_name = "-".join(worker_name(worker_info).split("-")[0:3]) + now = datetime.utcnow() + for task in active_tasks: + task_name = "-".join(worker_name(task["worker_info"]).split("-")[0:3]) + if my_name == task_name: + last_update = (now - task["last_updated"]).seconds + # 120 = period of heartbeat in worker. + if last_update <= 120: + error = 'Request_task: There is already a worker running with name "{}" which sent an update {} seconds ago'.format( + my_name, + last_update, + ) + return {"task_waiting": False, "error": error} + + # We go through the list of active tasks to see if the worker # has reached the number of allowed connections from the same ip # address. connections = 0 connections_limit = self.userdb.get_machine_limit(worker_info["username"]) - for run in self.task_runs: - for task in run["tasks"]: - if ( - task["active"] - and task["worker_info"]["remote_addr"] == worker_info["remote_addr"] - ): - connections += 1 - if connections >= connections_limit: - error = ( - "Request_task: Machine limit reached for user {}".format( - worker_info["username"] - ) - ) - print(error, flush=True) - return {"task_waiting": False, "error": error} + for task in active_tasks: + if task["worker_info"]["remote_addr"] == worker_info["remote_addr"]: + connections += 1 + if connections >= connections_limit: + error = "Request_task: Machine limit reached for user {}".format( + worker_info["username"] + ) + print(error, flush=True) + return {"task_waiting": False, "error": error} # Now go through the sorted list of unfinished runs. # We will add a task to the first run that is suitable. diff --git a/server/tests/test_api.py b/server/tests/test_api.py index e9d0891ef..3ac2e6775 100644 --- a/server/tests/test_api.py +++ b/server/tests/test_api.py @@ -521,17 +521,6 @@ def test_auto_purge_runs(self): task1 = self.rundb.get_run(run_id)["tasks"][0] task_size1 = task1["num_games"] - # Request task 2 of 2 - request = self.correct_password_request() - response = ApiView(request).request_task() - self.assertEqual(response["run"]["_id"], str(run["_id"])) - self.assertEqual(response["task_id"], 1) - task2 = self.rundb.get_run(run_id)["tasks"][1] - task_size2 = task2["num_games"] - task_start2 = task2["start"] - - self.assertEqual(task_start2, task_size1) - # Finish task 1 of 2 n_wins = task_size1 // 5 n_losses = task_size1 // 5 @@ -556,6 +545,17 @@ def test_auto_purge_runs(self): run = self.rundb.get_run(run_id) self.assertFalse(run["finished"]) + # Request task 2 of 2 + request = self.correct_password_request() + response = ApiView(request).request_task() + self.assertEqual(response["run"]["_id"], str(run["_id"])) + self.assertEqual(response["task_id"], 1) + task2 = self.rundb.get_run(run_id)["tasks"][1] + task_size2 = task2["num_games"] + task_start2 = task2["start"] + + self.assertEqual(task_start2, task_size1) + # Finish task 2 of 2 n_wins = task_size2 // 5 n_losses = task_size2 // 5