diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index b5dbcccb1..4a0ea3158 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -769,27 +769,44 @@ def priority(run): # lower is better self.task_runs.sort(key=priority) - # We go through the list of unfinished runs to see if the worker + # get the list of active tasks + + active_tasks = [ + task for run in self.task_runs for task in run["tasks"] if task["active"] + ] + + # We go through the list of active tasks to see if a worker with the same + # name is already connected. + + my_name = "-".join(worker_name(worker_info).split("-")[0:3]) + now = datetime.utcnow() + for task in active_tasks: + task_name = "-".join(worker_name(task["worker_info"]).split("-")[0:3]) + if my_name == task_name: + last_update = (now - task["last_updated"]).seconds + # 120 = period of heartbeat in worker. + if last_update <= 120: + error = 'Request_task: There is already a worker running with name "{}" which sent an update {} seconds ago'.format( + my_name, + last_update, + ) + return {"task_waiting": False, "error": error} + + # We go through the list of active tasks to see if the worker # has reached the number of allowed connections from the same ip # address. connections = 0 connections_limit = self.userdb.get_machine_limit(worker_info["username"]) - for run in self.task_runs: - for task in run["tasks"]: - if ( - task["active"] - and task["worker_info"]["remote_addr"] == worker_info["remote_addr"] - ): - connections += 1 - if connections >= connections_limit: - error = ( - "Request_task: Machine limit reached for user {}".format( - worker_info["username"] - ) - ) - print(error, flush=True) - return {"task_waiting": False, "error": error} + for task in active_tasks: + if task["worker_info"]["remote_addr"] == worker_info["remote_addr"]: + connections += 1 + if connections >= connections_limit: + error = "Request_task: Machine limit reached for user {}".format( + worker_info["username"] + ) + print(error, flush=True) + return {"task_waiting": False, "error": error} # Now go through the sorted list of unfinished runs. # We will add a task to the first run that is suitable. diff --git a/server/tests/test_api.py b/server/tests/test_api.py index e9d0891ef..3ac2e6775 100644 --- a/server/tests/test_api.py +++ b/server/tests/test_api.py @@ -521,17 +521,6 @@ def test_auto_purge_runs(self): task1 = self.rundb.get_run(run_id)["tasks"][0] task_size1 = task1["num_games"] - # Request task 2 of 2 - request = self.correct_password_request() - response = ApiView(request).request_task() - self.assertEqual(response["run"]["_id"], str(run["_id"])) - self.assertEqual(response["task_id"], 1) - task2 = self.rundb.get_run(run_id)["tasks"][1] - task_size2 = task2["num_games"] - task_start2 = task2["start"] - - self.assertEqual(task_start2, task_size1) - # Finish task 1 of 2 n_wins = task_size1 // 5 n_losses = task_size1 // 5 @@ -556,6 +545,17 @@ def test_auto_purge_runs(self): run = self.rundb.get_run(run_id) self.assertFalse(run["finished"]) + # Request task 2 of 2 + request = self.correct_password_request() + response = ApiView(request).request_task() + self.assertEqual(response["run"]["_id"], str(run["_id"])) + self.assertEqual(response["task_id"], 1) + task2 = self.rundb.get_run(run_id)["tasks"][1] + task_size2 = task2["num_games"] + task_start2 = task2["start"] + + self.assertEqual(task_start2, task_size1) + # Finish task 2 of 2 n_wins = task_size2 // 5 n_losses = task_size2 // 5