From 0a6963de7994731cb5278c16bc1cab9993931f09 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 10:52:56 -0400 Subject: [PATCH 01/10] Bump docker (6.0.1 -> 7.1.0) --- poetry.lock | 15 ++++++++------- pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index 164b5af..1de0061 100644 --- a/poetry.lock +++ b/poetry.lock @@ -781,24 +781,25 @@ wmi = ["wmi (>=1.5.1)"] [[package]] name = "docker" -version = "6.0.1" +version = "7.1.0" description = "A Python library for the Docker Engine API." optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "docker-6.0.1-py3-none-any.whl", hash = "sha256:dbcb3bd2fa80dca0788ed908218bf43972772009b881ed1e20dfc29a65e49782"}, - {file = "docker-6.0.1.tar.gz", hash = "sha256:896c4282e5c7af5c45e8b683b0b0c33932974fe6e50fc6906a0a83616ab3da97"}, + {file = "docker-7.1.0-py3-none-any.whl", hash = "sha256:c96b93b7f0a746f9e77d325bcfb87422a3d8bd4f03136ae8a85b37f1898d5fc0"}, + {file = "docker-7.1.0.tar.gz", hash = "sha256:ad8c70e6e3f8926cb8a92619b832b4ea5299e2831c14284663184e200546fa6c"}, ] [package.dependencies] -packaging = ">=14.0" pywin32 = {version = ">=304", markers = "sys_platform == \"win32\""} requests = ">=2.26.0" urllib3 = ">=1.26.0" -websocket-client = ">=0.32.0" [package.extras] +dev = ["coverage (==7.2.7)", "pytest (==7.4.2)", "pytest-cov (==4.1.0)", "pytest-timeout (==2.1.0)", "ruff (==0.1.8)"] +docs = ["myst-parser (==0.18.0)", "sphinx (==5.1.1)"] ssh = ["paramiko (>=2.4.3)"] +websockets = ["websocket-client (>=1.3.0)"] [[package]] name = "docutils" @@ -3712,4 +3713,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "80b06c5cc42cde0fb898aec773b0b2988451dd60b87040d7bdad23268ead2700" +content-hash = "5bcfb9776521a6dc50f5df1d94cc106cf5bdb900da3e99efdfcf9a0578ad8c75" diff --git a/pyproject.toml b/pyproject.toml index 29d6195..7d77698 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ prefect = "^2.8.2" boto3 = "^1.24.59" numpy = "^1.24.2" pandas = "^1.5.3" -docker = "^6.0.1" +docker = "^7.1.0" deepdiff = "^5.8.1" tabulate = "^0.9.0" From 5a0810ba409ca3db76d5662e550f43908c5788c3 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:32:56 -0400 Subject: [PATCH 02/10] Add docstrings and unit tests for create docker volume task --- .../docker/create_docker_volume.py | 24 +++++++++--- .../docker/test_create_docker_volume.py | 38 +++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) create mode 100644 tests/container_collection/docker/test_create_docker_volume.py diff --git a/src/container_collection/docker/create_docker_volume.py b/src/container_collection/docker/create_docker_volume.py index 16b2b46..b3c1efe 100644 --- a/src/container_collection/docker/create_docker_volume.py +++ b/src/container_collection/docker/create_docker_volume.py @@ -1,9 +1,23 @@ -import docker +from docker import APIClient -def create_docker_volume(path: str) -> docker.models.volumes.Volume: - client = docker.DockerClient(base_url="unix://var/run/docker.sock") - volume = client.volumes.create( +def create_docker_volume(api_client: APIClient, path: str) -> dict: + """ + Create a docker volume that copies content to specified path. + + Parameters + ---------- + api_client + Docker API client. + path + Local path for volume. + + Returns + ------- + : + Created volume reference object. + """ + + return api_client.create_volume( driver="local", driver_opts={"type": "none", "device": path, "o": "bind"} ) - return volume diff --git a/tests/container_collection/docker/test_create_docker_volume.py b/tests/container_collection/docker/test_create_docker_volume.py new file mode 100644 index 0000000..bffe697 --- /dev/null +++ b/tests/container_collection/docker/test_create_docker_volume.py @@ -0,0 +1,38 @@ +import datetime +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.create_docker_volume import create_docker_volume + + +def mock_create_volume(**kwargs): + name = kwargs.get("name", secrets.token_hex(32)) + return { + "CreatedAt": datetime.datetime.now().astimezone().replace(microsecond=0).isoformat(), + "Driver": kwargs.get("driver", "local"), + "Labels": kwargs.get("labels", {"com.docker.volume.anonymous": ""}), + "Mountpoint": f"/var/lib/docker/volumes/{name}/_data", + "Name": name, + "Options": kwargs.get("driver_opts", None), + "Scope": "local", + } + + +class TestCreateDockerVolume(unittest.TestCase): + def test_create_docker_volume(self): + client = mock.MagicMock(spec=APIClient) + client.create_volume.side_effect = mock_create_volume + path = "/docker/volume/path" + + volume = create_docker_volume(client, path) + + self.assertEqual(path, volume["Options"]["device"]) + self.assertEqual("none", volume["Options"]["type"]) + self.assertEqual("bind", volume["Options"]["o"]) + + +if __name__ == "__main__": + unittest.main() From 70836411cb9a0120f1a7631746aee776036e54e4 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:33:28 -0400 Subject: [PATCH 03/10] Add docstrings and unit tests for terminate docker job task --- .../docker/terminate_docker_job.py | 18 +++++++++++++---- .../docker/test_terminate_docker_job.py | 20 +++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 tests/container_collection/docker/test_terminate_docker_job.py diff --git a/src/container_collection/docker/terminate_docker_job.py b/src/container_collection/docker/terminate_docker_job.py index 8f94c5a..d73f417 100644 --- a/src/container_collection/docker/terminate_docker_job.py +++ b/src/container_collection/docker/terminate_docker_job.py @@ -1,6 +1,16 @@ -import docker +from docker import APIClient -def terminate_docker_job(container_id: str) -> None: - client = docker.APIClient(base_url="unix://var/run/docker.sock") - client.stop(container=container_id, timeout=1) +def terminate_docker_job(api_client: APIClient, container_id: str) -> None: + """ + Terminate specific docker container. + + Parameters + ---------- + api_client + Docker API client. + container_id + ID of container to terminate. + """ + + api_client.stop(container=container_id, timeout=1) diff --git a/tests/container_collection/docker/test_terminate_docker_job.py b/tests/container_collection/docker/test_terminate_docker_job.py new file mode 100644 index 0000000..d6e3c6f --- /dev/null +++ b/tests/container_collection/docker/test_terminate_docker_job.py @@ -0,0 +1,20 @@ +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.terminate_docker_job import terminate_docker_job + + +class TestTerminateDockerJob(unittest.TestCase): + def test_terminate_docker_job(self): + client = mock.MagicMock(spec=APIClient) + container_id = "container-id" + + terminate_docker_job(client, container_id) + + client.stop.assert_called_with(container=container_id, timeout=1) + + +if __name__ == "__main__": + unittest.main() From bf168ab7c2078b0bf613d21c0959164f549d8b57 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:56:40 -0400 Subject: [PATCH 04/10] Add docstrings and unit tests for make docker job task --- .../docker/make_docker_job.py | 47 +++++++++++++++---- .../docker/test_make_docker_job.py | 42 +++++++++++++++++ 2 files changed, 81 insertions(+), 8 deletions(-) create mode 100644 tests/container_collection/docker/test_make_docker_job.py diff --git a/src/container_collection/docker/make_docker_job.py b/src/container_collection/docker/make_docker_job.py index 0ce3761..c2103e0 100644 --- a/src/container_collection/docker/make_docker_job.py +++ b/src/container_collection/docker/make_docker_job.py @@ -1,11 +1,42 @@ -def make_docker_job(name: str, image: str, index: int) -> dict: - return { +from typing import Optional + + +def make_docker_job(name: str, image: str, environment: Optional[list[str]] = None) -> dict: + """ + Create docker job definition. + + Environment variables are passed as strings using the following structure: + + .. code-block:: python + + [ + "envName1=envValue1", + "envName2=envValue2", + ... + ] + + Parameters + ---------- + name + Job definition name. + image + Docker image. + environment + List of environment variables as strings. + + Returns + ------- + : + Job definition. + """ + + job_definition = { "image": image, - "name": f"{name}_{index}", - "environment": [ - "SIMULATION_TYPE=LOCAL", - f"FILE_SET_NAME={name}", - f"JOB_ARRAY_INDEX={index}", - ], + "name": name, "volumes": ["/mnt"], } + + if environment is not None: + job_definition["environment"] = environment + + return job_definition diff --git a/tests/container_collection/docker/test_make_docker_job.py b/tests/container_collection/docker/test_make_docker_job.py new file mode 100644 index 0000000..5129b51 --- /dev/null +++ b/tests/container_collection/docker/test_make_docker_job.py @@ -0,0 +1,42 @@ +import unittest + +from container_collection.docker.make_docker_job import make_docker_job + + +class TestMakeDockerJob(unittest.TestCase): + def test_make_docker_job_no_environment(self): + name = "job_name" + image = "image_name" + + expected_job = { + "image": image, + "name": name, + "volumes": ["/mnt"], + } + + job = make_docker_job(name, image) + + self.assertDictEqual(expected_job, job) + + def test_make_docker_job_with_environment(self): + name = "job_name" + image = "image_name" + environment = [ + "ENVIRONMENT_VARIABLE_A=X", + "ENVIRONMENT_VARIABLE_B=Y", + ] + + expected_job = { + "image": image, + "name": name, + "volumes": ["/mnt"], + "environment": environment, + } + + job = make_docker_job(name, image, environment=environment) + + self.assertDictEqual(expected_job, job) + + +if __name__ == "__main__": + unittest.main() From 5e6724107547f89be37299965a5eb5884734b588 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 12:10:15 -0400 Subject: [PATCH 05/10] Add docstrings and unit tests for submit docker job task --- .../docker/submit_docker_job.py | 29 ++++++++-- .../docker/test_submit_docker_job.py | 55 +++++++++++++++++++ 2 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 tests/container_collection/docker/test_submit_docker_job.py diff --git a/src/container_collection/docker/submit_docker_job.py b/src/container_collection/docker/submit_docker_job.py index 242992b..0ff061d 100644 --- a/src/container_collection/docker/submit_docker_job.py +++ b/src/container_collection/docker/submit_docker_job.py @@ -1,13 +1,30 @@ -import docker +from docker import APIClient -def submit_docker_job(job_definition: dict, volume: docker.models.volumes.Volume) -> str: - client = docker.APIClient(base_url="unix://var/run/docker.sock") - host_config = client.create_host_config(binds={volume.name: {"bind": "/mnt", "mode": "rw"}}) +def submit_docker_job(api_client: APIClient, job_definition: dict, volume_name: str) -> str: + """ + Submit Docker job. - container = client.create_container(**job_definition, host_config=host_config) + Parameters + ---------- + api_client + Docker API client. + job_definition + Docker job definition used to create job container. + volume_name + Name of created volume. + + Returns + ------- + : + Container ID. + """ + + host_config = api_client.create_host_config(binds={volume_name: {"bind": "/mnt", "mode": "rw"}}) + + container = api_client.create_container(**job_definition, host_config=host_config) container_id = container.get("Id") - client.start(container=container_id) + api_client.start(container=container_id) return container_id diff --git a/tests/container_collection/docker/test_submit_docker_job.py b/tests/container_collection/docker/test_submit_docker_job.py new file mode 100644 index 0000000..f9ae5b3 --- /dev/null +++ b/tests/container_collection/docker/test_submit_docker_job.py @@ -0,0 +1,55 @@ +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.submit_docker_job import submit_docker_job + + +def mock_create_host_config(**kwargs): + host_config = {"NetworkMode": "default"} + + if "binds" in kwargs: + binds = [] + for key, values in kwargs["binds"].items(): + combined = f"{key}:{values['bind']}:{values['mode']}" + if "propagation" in values: + combined = f"{combined},{values['propagation']}" + binds.append(combined) + host_config["Binds"] = binds + + return host_config + + +class TestSubmitDockerJob(unittest.TestCase): + def test_submit_docker_job(self): + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.create_host_config.side_effect = mock_create_host_config + client.create_container.return_value = {"Id": container_id, "Warnings": []} + + name = "job-definition-name" + image = "jobimage:latest" + volume_name = secrets.token_hex(32) + + host_config = {"NetworkMode": "default", "Binds": [f"{volume_name}:/mnt:rw"]} + + job_definition = { + "image": image, + "name": name, + "volumes": ["/mnt"], + "environment": [ + "ENVIRONMENT_VARIABLE_A=X", + "ENVIRONMENT_VARIABLE_B=Y", + ], + } + + submit_docker_job(client, job_definition, volume_name) + + client.create_container.assert_called_with(**job_definition, host_config=host_config) + client.start.assert_called_with(container=container_id) + + +if __name__ == "__main__": + unittest.main() From 534f95d0d73ef03e4a6cf48234ede3e89265d0d6 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 12:28:11 -0400 Subject: [PATCH 06/10] Add docstrings and unit tests for remove docker volume task --- .../docker/remove_docker_volume.py | 17 ++++++++++++--- .../docker/submit_docker_job.py | 2 +- .../docker/test_remove_docker_volume.py | 21 +++++++++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 tests/container_collection/docker/test_remove_docker_volume.py diff --git a/src/container_collection/docker/remove_docker_volume.py b/src/container_collection/docker/remove_docker_volume.py index 72cdab4..9afc177 100644 --- a/src/container_collection/docker/remove_docker_volume.py +++ b/src/container_collection/docker/remove_docker_volume.py @@ -1,5 +1,16 @@ -import docker +from docker import APIClient -def remove_docker_volume(volume: docker.models.volumes.Volume) -> None: - volume.remove() +def remove_docker_volume(api_client: APIClient, volume_name: str) -> None: + """ + Remove docker volume. + + Parameters + ---------- + api_client + Docker API client. + volume_name + Name of the docker volume. + """ + + api_client.remove_volume(volume_name) diff --git a/src/container_collection/docker/submit_docker_job.py b/src/container_collection/docker/submit_docker_job.py index 0ff061d..9c328ac 100644 --- a/src/container_collection/docker/submit_docker_job.py +++ b/src/container_collection/docker/submit_docker_job.py @@ -12,7 +12,7 @@ def submit_docker_job(api_client: APIClient, job_definition: dict, volume_name: job_definition Docker job definition used to create job container. volume_name - Name of created volume. + Name of the docker volume. Returns ------- diff --git a/tests/container_collection/docker/test_remove_docker_volume.py b/tests/container_collection/docker/test_remove_docker_volume.py new file mode 100644 index 0000000..001b31b --- /dev/null +++ b/tests/container_collection/docker/test_remove_docker_volume.py @@ -0,0 +1,21 @@ +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.remove_docker_volume import remove_docker_volume + + +class TestRemoveDockerVolume(unittest.TestCase): + def test_remove_docker_volume(self): + client = mock.MagicMock(spec=APIClient) + volume_name = secrets.token_hex(32) + + remove_docker_volume(client, volume_name) + + client.remove_volume.assert_called_with(volume_name) + + +if __name__ == "__main__": + unittest.main() From 7ca140847dbe8abe7aeaf5ce0682ec747d89e431 Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 12:38:16 -0400 Subject: [PATCH 07/10] Add docstrings and unit tests for get docker logs task --- .../docker/get_docker_logs.py | 25 +++++++-- .../docker/test_get_docker_logs.py | 53 +++++++++++++++++++ 2 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 tests/container_collection/docker/test_get_docker_logs.py diff --git a/src/container_collection/docker/get_docker_logs.py b/src/container_collection/docker/get_docker_logs.py index c3f3686..5a8db79 100644 --- a/src/container_collection/docker/get_docker_logs.py +++ b/src/container_collection/docker/get_docker_logs.py @@ -1,9 +1,26 @@ -import docker +from docker import APIClient -def get_docker_logs(container_id: str, log_filter: str) -> str: - client = docker.APIClient(base_url="unix://var/run/docker.sock") - logs = client.logs(container=container_id).decode("utf-8") +def get_docker_logs(api_client: APIClient, container_id: str, log_filter: str) -> str: + """ + Get logs for Docker job. + + Parameters + ---------- + api_client + Docker API client. + container_id + Docker container ID. + log_filter + Filter for log events (use "-" for exclusion). + + Returns + ------- + : + All filtered log events. + """ + + logs = api_client.logs(container=container_id).decode("utf-8") log_items = logs.split("\n") if "-" in log_filter: diff --git a/tests/container_collection/docker/test_get_docker_logs.py b/tests/container_collection/docker/test_get_docker_logs.py new file mode 100644 index 0000000..3d7d56c --- /dev/null +++ b/tests/container_collection/docker/test_get_docker_logs.py @@ -0,0 +1,53 @@ +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.get_docker_logs import get_docker_logs + + +class TestGetDockerLogs(unittest.TestCase): + def test_get_docker_logs_no_filter(self): + log_filter = "" + messages = [f"Event {i}" for i in range(10)] + expected_logs = "\n".join(messages) + + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.logs.return_value = "\n".join(messages).encode("utf-8") + + logs = get_docker_logs(client, container_id, log_filter) + self.assertEqual(expected_logs, logs) + + def test_get_docker_logs_with_include_filter(self): + log_filter = "INCLUDE" + include_messages = [f"Event INCLUDE {i}" for i in range(10)] + exclude_messages = [f"Event EXCLUDE {i}" for i in range(10)] + messages = [val for pair in zip(include_messages, exclude_messages) for val in pair] + expected_logs = "\n".join(include_messages) + + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.logs.return_value = "\n".join(messages).encode("utf-8") + + logs = get_docker_logs(client, container_id, log_filter) + self.assertEqual(expected_logs, logs) + + def test_get_docker_logs_with_exclude_filter(self): + log_filter = "-EXCLUDE" + include_messages = [f"Event INCLUDE {i}" for i in range(10)] + exclude_messages = [f"Event EXCLUDE {i}" for i in range(10)] + messages = [val for pair in zip(include_messages, exclude_messages) for val in pair] + expected_logs = "\n".join(include_messages) + + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.logs.return_value = "\n".join(messages).encode("utf-8") + + logs = get_docker_logs(client, container_id, log_filter) + self.assertEqual(expected_logs, logs) + + +if __name__ == "__main__": + unittest.main() From 9f76de28f2edab00f2ab1f8c3781ab11932034dd Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 12:59:16 -0400 Subject: [PATCH 08/10] Add docstrings and unit tests for clean docker job task --- .../docker/check_docker_job.py | 24 +-- .../docker/clean_docker_job.py | 20 ++- .../docker/test_check_docker_job.py | 146 ++++++++++++++++++ .../docker/test_clean_docker_job.py | 35 +++++ 4 files changed, 209 insertions(+), 16 deletions(-) create mode 100644 tests/container_collection/docker/test_check_docker_job.py create mode 100644 tests/container_collection/docker/test_clean_docker_job.py diff --git a/src/container_collection/docker/check_docker_job.py b/src/container_collection/docker/check_docker_job.py index 9bd91e9..0bae8f3 100644 --- a/src/container_collection/docker/check_docker_job.py +++ b/src/container_collection/docker/check_docker_job.py @@ -1,23 +1,25 @@ from typing import Union - -import docker -import prefect -from prefect.server.schemas.states import Failed, State +from docker import APIClient +from prefect.states import Failed, State +from prefect.context import TaskRunContext RETRIES_EXCEEDED_EXIT_CODE = 80 +"""Exit code used when task run retries exceed the maximum retries.""" -def check_docker_job(container_id: str, max_retries: int) -> Union[int, State]: - task_run = prefect.context.get_run_context().task_run # type: ignore +def check_docker_job(api_client: APIClient, container_id: str, max_retries: int) -> Union[int, State]: + context = TaskRunContext.get() - if task_run.run_count > max_retries: + if context is not None and context.task_run.run_count > max_retries: return RETRIES_EXCEEDED_EXIT_CODE - client = docker.APIClient(base_url="unix://var/run/docker.sock") - status = client.containers(all=True, filters={"id": container_id})[0]["State"] + status = api_client.containers(all=True, filters={"id": container_id})[0]["State"] - if status == "running": + # For jobs that are running, throw the appropriate exception. + if context is not None and status == "running": return Failed() + if status == "running": + raise RuntimeError("Job is in RUNNING state and does not have exit code.") - exitcode = client.wait(container_id, timeout=1)["StatusCode"] + exitcode = api_client.wait(container_id, timeout=1)["StatusCode"] return exitcode diff --git a/src/container_collection/docker/clean_docker_job.py b/src/container_collection/docker/clean_docker_job.py index d87ee76..f88b793 100644 --- a/src/container_collection/docker/clean_docker_job.py +++ b/src/container_collection/docker/clean_docker_job.py @@ -1,9 +1,19 @@ -import docker +from docker import APIClient -def clean_docker_job(container_id: str) -> None: - client = docker.APIClient(base_url="unix://var/run/docker.sock") - status = client.containers(all=True, filters={"id": container_id})[0]["State"] +def clean_docker_job(api_client: APIClient, container_id: str) -> None: + """ + Clean up container if it is not currently running. + + Parameters + ---------- + api_client + Docker API client. + container_id + ID of container to remove. + """ + + status = api_client.containers(all=True, filters={"id": container_id})[0]["State"] if status != "running": - client.remove_container(container=container_id) + api_client.remove_container(container=container_id) diff --git a/tests/container_collection/docker/test_check_docker_job.py b/tests/container_collection/docker/test_check_docker_job.py new file mode 100644 index 0000000..f352395 --- /dev/null +++ b/tests/container_collection/docker/test_check_docker_job.py @@ -0,0 +1,146 @@ +import os +import sys +import unittest +from typing import Optional +from unittest import mock + +import boto3 +from prefect import flow +from prefect.exceptions import FailedRun +from prefect.testing.utilities import prefect_test_harness + +from container_collection.batch import check_batch_job as check_batch_job_task +from container_collection.batch.check_batch_job import RETRIES_EXCEEDED_EXIT_CODE, check_batch_job + +SUCCEEDED_EXIT_CODE = 0 +FAILED_EXIT_CODE = 1 + + +def make_describe_jobs_response(status: Optional[str], exit_code: Optional[int]): + if status is None: + return {"jobs": []} + if status in ("SUCCEEDED", "FAILED"): + return {"jobs": [{"status": status, "attempts": [{"container": {"exitCode": exit_code}}]}]} + return {"jobs": [{"status": status}]} + + +def make_boto_mock(statuses: list[Optional[str]], exit_code: Optional[int] = None): + batch_mock = mock.MagicMock() + boto3_mock = mock.MagicMock(spec=boto3) + boto3_mock.client.return_value = batch_mock + batch_mock.describe_jobs.side_effect = [ + make_describe_jobs_response(status, exit_code) for status in statuses + ] + return boto3_mock + + +@flow +def run_task_under_flow(max_retries: int): + return check_batch_job_task("job-arn", max_retries) + + +@mock.patch.dict( + os.environ, + {"PREFECT_LOGGING_LEVEL": "CRITICAL"}, +) +class TestCheckBatchJob(unittest.TestCase): + @classmethod + def setUpClass(cls): + with prefect_test_harness(): + yield + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["RUNNING"]), + ) + def test_check_batch_job_as_method_running_throws_exception(self): + with self.assertRaises(RuntimeError): + check_batch_job("job-arn", 0) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock([None, "PENDING", "RUNNING"]), + ) + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], "sleep", lambda _: None + ) + def test_check_batch_job_as_method_running_with_waits_throws_exception(self): + with self.assertRaises(RuntimeError): + check_batch_job("job-arn", 0) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["SUCCEEDED"], SUCCEEDED_EXIT_CODE), + ) + def test_check_batch_job_as_method_succeeded(self): + exit_code = check_batch_job("job-arn", 0) + self.assertEqual(SUCCEEDED_EXIT_CODE, exit_code) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["FAILED"], FAILED_EXIT_CODE), + ) + def test_check_batch_job_as_method_failed(self): + exit_code = check_batch_job("job-arn", 0) + self.assertEqual(FAILED_EXIT_CODE, exit_code) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["RUNNING"]), + ) + def test_check_batch_job_as_task_running_below_max_retries_throws_failed_run(self): + max_retries = 1 + with self.assertRaises(FailedRun): + run_task_under_flow(max_retries) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock([None, "PENDING", "RUNNING"]), + ) + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], "sleep", lambda _: None + ) + def test_check_batch_job_as_task_running_below_max_retries_with_waits_throws_failed_run(self): + max_retries = 1 + with self.assertRaises(FailedRun): + run_task_under_flow(max_retries) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock([None]), + ) + def test_check_batch_job_as_task_max_retries_exceeded(self): + max_retries = 0 + exit_code = run_task_under_flow(max_retries) + self.assertEqual(RETRIES_EXCEEDED_EXIT_CODE, exit_code) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["SUCCEEDED"], SUCCEEDED_EXIT_CODE), + ) + def test_check_batch_job_as_task_succeeded(self): + max_retries = 1 + exit_code = run_task_under_flow(max_retries) + self.assertEqual(SUCCEEDED_EXIT_CODE, exit_code) + + @mock.patch.object( + sys.modules["container_collection.batch.check_batch_job"], + "boto3", + make_boto_mock(["FAILED"], FAILED_EXIT_CODE), + ) + def test_check_batch_job_as_task_failed(self): + max_retries = 1 + exit_code = run_task_under_flow(max_retries) + self.assertEqual(FAILED_EXIT_CODE, exit_code) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/container_collection/docker/test_clean_docker_job.py b/tests/container_collection/docker/test_clean_docker_job.py new file mode 100644 index 0000000..ee80651 --- /dev/null +++ b/tests/container_collection/docker/test_clean_docker_job.py @@ -0,0 +1,35 @@ +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.clean_docker_job import clean_docker_job + + +class TestCleanDockerJob(unittest.TestCase): + def test_clean_docker_job_state_running(self): + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.containers.return_value = [{"State": "running"}] + client.remove_container = mock.Mock() + + clean_docker_job(client, container_id) + + client.containers.assert_called_with(all=True, filters={"id": container_id}) + client.remove_container.assert_not_called() + + def test_clean_docker_job_state_exited(self): + container_id = secrets.token_hex(32) + client = mock.MagicMock(spec=APIClient) + client.containers.return_value = [{"State": "exited"}] + client.remove_container = mock.Mock() + + clean_docker_job(client, container_id) + + client.containers.assert_called_with(all=True, filters={"id": container_id}) + client.remove_container.assert_called_with(container=container_id) + + +if __name__ == "__main__": + unittest.main() From 115d67ca98e64d8cf8fcd5962878776e1f54fa8d Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 13:14:03 -0400 Subject: [PATCH 09/10] Add docstrings and unit tests for check docker job task --- .../docker/check_docker_job.py | 34 ++++- .../docker/test_check_docker_job.py | 142 ++++++------------ 2 files changed, 75 insertions(+), 101 deletions(-) diff --git a/src/container_collection/docker/check_docker_job.py b/src/container_collection/docker/check_docker_job.py index 0bae8f3..681b332 100644 --- a/src/container_collection/docker/check_docker_job.py +++ b/src/container_collection/docker/check_docker_job.py @@ -1,13 +1,43 @@ from typing import Union + from docker import APIClient -from prefect.states import Failed, State from prefect.context import TaskRunContext +from prefect.states import Failed, State RETRIES_EXCEEDED_EXIT_CODE = 80 """Exit code used when task run retries exceed the maximum retries.""" -def check_docker_job(api_client: APIClient, container_id: str, max_retries: int) -> Union[int, State]: +def check_docker_job( + api_client: APIClient, container_id: str, max_retries: int +) -> Union[int, State]: + """ + Check for exit code of a Docker container. + + If this task is running within a Prefect flow, it will use the task run + context to get the current run count. While the run count is below the + maximum number of retries, the task will continue to attempt to get the exit + code, and can be called with a retry delay to periodically check the status + of jobs. + + If this task is not running within a Prefect flow, the ``max_retries`` + parameters is ignored. Jobs that are still running will throw an exception. + + Parameters + ---------- + api_client + Docker API client. + container_id + ID of container to check. + max_retries + Maximum number of retries. + + Returns + ------- + : + Exit code if the job is complete, otherwise throws an exception. + """ + context = TaskRunContext.get() if context is not None and context.task_run.run_count > max_retries: diff --git a/tests/container_collection/docker/test_check_docker_job.py b/tests/container_collection/docker/test_check_docker_job.py index f352395..d1f1dad 100644 --- a/tests/container_collection/docker/test_check_docker_job.py +++ b/tests/container_collection/docker/test_check_docker_job.py @@ -1,145 +1,89 @@ import os -import sys import unittest from typing import Optional from unittest import mock -import boto3 +from docker import APIClient from prefect import flow from prefect.exceptions import FailedRun from prefect.testing.utilities import prefect_test_harness -from container_collection.batch import check_batch_job as check_batch_job_task -from container_collection.batch.check_batch_job import RETRIES_EXCEEDED_EXIT_CODE, check_batch_job +from container_collection.docker import check_docker_job as check_docker_job_task +from container_collection.docker.check_docker_job import ( + RETRIES_EXCEEDED_EXIT_CODE, + check_docker_job, +) SUCCEEDED_EXIT_CODE = 0 FAILED_EXIT_CODE = 1 -def make_describe_jobs_response(status: Optional[str], exit_code: Optional[int]): - if status is None: - return {"jobs": []} - if status in ("SUCCEEDED", "FAILED"): - return {"jobs": [{"status": status, "attempts": [{"container": {"exitCode": exit_code}}]}]} - return {"jobs": [{"status": status}]} - - -def make_boto_mock(statuses: list[Optional[str]], exit_code: Optional[int] = None): - batch_mock = mock.MagicMock() - boto3_mock = mock.MagicMock(spec=boto3) - boto3_mock.client.return_value = batch_mock - batch_mock.describe_jobs.side_effect = [ - make_describe_jobs_response(status, exit_code) for status in statuses - ] - return boto3_mock +def make_client_mock(status: str, exit_code: Optional[int] = None): + client = mock.MagicMock(spec=APIClient) + client.containers.return_value = [{"State": status}] + client.wait.return_value = {"StatusCode": exit_code} + return client @flow -def run_task_under_flow(max_retries: int): - return check_batch_job_task("job-arn", max_retries) +def run_task_under_flow(client: APIClient, max_retries: int): + return check_docker_job_task(client, "container-id", max_retries) @mock.patch.dict( os.environ, {"PREFECT_LOGGING_LEVEL": "CRITICAL"}, ) -class TestCheckBatchJob(unittest.TestCase): +class TestCheckDockerJob(unittest.TestCase): @classmethod def setUpClass(cls): with prefect_test_harness(): yield - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["RUNNING"]), - ) - def test_check_batch_job_as_method_running_throws_exception(self): - with self.assertRaises(RuntimeError): - check_batch_job("job-arn", 0) - - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock([None, "PENDING", "RUNNING"]), - ) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], "sleep", lambda _: None - ) - def test_check_batch_job_as_method_running_with_waits_throws_exception(self): + def test_check_docker_job_as_method_running_throws_exception(self): + client = make_client_mock("running") with self.assertRaises(RuntimeError): - check_batch_job("job-arn", 0) - - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["SUCCEEDED"], SUCCEEDED_EXIT_CODE), - ) - def test_check_batch_job_as_method_succeeded(self): - exit_code = check_batch_job("job-arn", 0) + check_docker_job(client, "container-id", 0) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) + + def test_check_docker_job_as_method_succeeded(self): + client = make_client_mock("exited", SUCCEEDED_EXIT_CODE) + exit_code = check_docker_job(client, "container-id", 0) self.assertEqual(SUCCEEDED_EXIT_CODE, exit_code) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["FAILED"], FAILED_EXIT_CODE), - ) - def test_check_batch_job_as_method_failed(self): - exit_code = check_batch_job("job-arn", 0) + def test_check_docker_job_as_method_failed(self): + client = make_client_mock("exited", FAILED_EXIT_CODE) + exit_code = check_docker_job(client, "container-id", 0) self.assertEqual(FAILED_EXIT_CODE, exit_code) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["RUNNING"]), - ) - def test_check_batch_job_as_task_running_below_max_retries_throws_failed_run(self): - max_retries = 1 - with self.assertRaises(FailedRun): - run_task_under_flow(max_retries) - - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock([None, "PENDING", "RUNNING"]), - ) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], "sleep", lambda _: None - ) - def test_check_batch_job_as_task_running_below_max_retries_with_waits_throws_failed_run(self): + def test_check_docker_job_as_task_running_below_max_retries_throws_failed_run(self): + client = make_client_mock("running") max_retries = 1 with self.assertRaises(FailedRun): - run_task_under_flow(max_retries) - - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock([None]), - ) - def test_check_batch_job_as_task_max_retries_exceeded(self): + run_task_under_flow(client, max_retries) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) + + def test_check_docker_job_as_task_max_retries_exceeded(self): + client = make_client_mock("") max_retries = 0 - exit_code = run_task_under_flow(max_retries) + exit_code = run_task_under_flow(client, max_retries) self.assertEqual(RETRIES_EXCEEDED_EXIT_CODE, exit_code) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["SUCCEEDED"], SUCCEEDED_EXIT_CODE), - ) - def test_check_batch_job_as_task_succeeded(self): + def test_check_docker_job_as_task_succeeded(self): + client = make_client_mock("exited", SUCCEEDED_EXIT_CODE) max_retries = 1 - exit_code = run_task_under_flow(max_retries) + exit_code = run_task_under_flow(client, max_retries) self.assertEqual(SUCCEEDED_EXIT_CODE, exit_code) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) - @mock.patch.object( - sys.modules["container_collection.batch.check_batch_job"], - "boto3", - make_boto_mock(["FAILED"], FAILED_EXIT_CODE), - ) - def test_check_batch_job_as_task_failed(self): + def test_check_docker_job_as_task_failed(self): + client = make_client_mock("exited", FAILED_EXIT_CODE) max_retries = 1 - exit_code = run_task_under_flow(max_retries) + exit_code = run_task_under_flow(client, max_retries) self.assertEqual(FAILED_EXIT_CODE, exit_code) + client.containers.assert_called_with(all=True, filters={"id": "container-id"}) if __name__ == "__main__": From 6cc1fb442facb8ee32c499133f0ff3c63bf709fa Mon Sep 17 00:00:00 2001 From: jessicasyu <15913767+jessicasyu@users.noreply.github.com> Date: Fri, 30 Aug 2024 13:23:29 -0400 Subject: [PATCH 10/10] Add docstrings and unit tests for run docker command task --- .../docker/run_docker_command.py | 28 +++++++++-- .../docker/test_run_docker_command.py | 48 +++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 tests/container_collection/docker/test_run_docker_command.py diff --git a/src/container_collection/docker/run_docker_command.py b/src/container_collection/docker/run_docker_command.py index 4af0d37..48efc95 100644 --- a/src/container_collection/docker/run_docker_command.py +++ b/src/container_collection/docker/run_docker_command.py @@ -1,19 +1,39 @@ from typing import Optional -import docker +from docker import DockerClient def run_docker_command( + client: DockerClient, image: str, command: list[str], - volume: Optional[docker.models.volumes.Volume] = None, + volume_name: Optional[str] = None, environment: Optional[list] = None, detach: bool = False, ) -> None: + """ + Run container from image with given command. + + Parameters + ---------- + api_client + Docker API client. + image + Docker image. + command + Command list passed to container. + volume_name + Name of the docker volume. + environment + List of environment variables as strings. + detach + True to start container and immediately return the Container object, + False otherwise. + """ + environment = [] if environment is None else environment - volumes = {} if volume is None else {volume.name: {"bind": "/mnt", "mode": "rw"}} + volumes = {} if volume_name is None else {volume_name: {"bind": "/mnt", "mode": "rw"}} - client = docker.DockerClient(base_url="unix://var/run/docker.sock") client.containers.run( image, command, diff --git a/tests/container_collection/docker/test_run_docker_command.py b/tests/container_collection/docker/test_run_docker_command.py new file mode 100644 index 0000000..dcc350a --- /dev/null +++ b/tests/container_collection/docker/test_run_docker_command.py @@ -0,0 +1,48 @@ +import secrets +import unittest +from unittest import mock + +from docker import APIClient + +from container_collection.docker.run_docker_command import run_docker_command + + +class TestRunDockerCommand(unittest.TestCase): + def test_run_docker_command_no_optional_parameters(self): + client = mock.MagicMock(spec=APIClient) + image = "jobimage:latest" + command = ["command", "string"] + + run_docker_command(client, image, command) + + client.containers.run.assert_called_with( + image, command, environment=[], volumes={}, auto_remove=True, detach=False + ) + + def test_run_docker_command_with_optional_parameters(self): + client = mock.MagicMock(spec=APIClient) + image = "jobimage:latest" + command = ["command", "string"] + environment = [ + "ENVIRONMENT_VARIABLE_A=X", + "ENVIRONMENT_VARIABLE_B=Y", + ] + volume_name = secrets.token_hex(32) + detach = True + + run_docker_command( + client, image, command, environment=environment, volume_name=volume_name, detach=detach + ) + + client.containers.run.assert_called_with( + image, + command, + environment=environment, + volumes={volume_name: {"bind": "/mnt", "mode": "rw"}}, + auto_remove=True, + detach=detach, + ) + + +if __name__ == "__main__": + unittest.main()