Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for container collection docker tasks #66

Merged
merged 10 commits into from
Aug 30, 2024
15 changes: 8 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ prefect = "^2.8.2"
boto3 = "^1.24.59"
numpy = "^1.24.2"
pandas = "^1.5.3"
docker = "^6.0.1"
docker = "^7.1.0"
deepdiff = "^5.8.1"
tabulate = "^0.9.0"

Expand Down
58 changes: 45 additions & 13 deletions src/container_collection/docker/check_docker_job.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,55 @@
from typing import Union

import docker
import prefect
from prefect.server.schemas.states import Failed, State
from docker import APIClient
from prefect.context import TaskRunContext
from prefect.states import Failed, State

RETRIES_EXCEEDED_EXIT_CODE = 80


def check_docker_job(container_id: str, max_retries: int) -> Union[int, State]:
task_run = prefect.context.get_run_context().task_run # type: ignore

if task_run.run_count > max_retries:
"""Exit code used when task run retries exceed the maximum retries."""


def check_docker_job(
api_client: APIClient, container_id: str, max_retries: int
) -> Union[int, State]:
"""
Check for exit code of a Docker container.

If this task is running within a Prefect flow, it will use the task run
context to get the current run count. While the run count is below the
maximum number of retries, the task will continue to attempt to get the exit
code, and can be called with a retry delay to periodically check the status
of jobs.

If this task is not running within a Prefect flow, the ``max_retries``
parameters is ignored. Jobs that are still running will throw an exception.

Parameters
----------
api_client
Docker API client.
container_id
ID of container to check.
max_retries
Maximum number of retries.

Returns
-------
:
Exit code if the job is complete, otherwise throws an exception.
"""

context = TaskRunContext.get()

if context is not None and context.task_run.run_count > max_retries:
return RETRIES_EXCEEDED_EXIT_CODE

client = docker.APIClient(base_url="unix://var/run/docker.sock")
status = client.containers(all=True, filters={"id": container_id})[0]["State"]
status = api_client.containers(all=True, filters={"id": container_id})[0]["State"]

if status == "running":
# For jobs that are running, throw the appropriate exception.
if context is not None and status == "running":
return Failed()
if status == "running":
raise RuntimeError("Job is in RUNNING state and does not have exit code.")

exitcode = client.wait(container_id, timeout=1)["StatusCode"]
exitcode = api_client.wait(container_id, timeout=1)["StatusCode"]
return exitcode
20 changes: 15 additions & 5 deletions src/container_collection/docker/clean_docker_job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import docker
from docker import APIClient


def clean_docker_job(container_id: str) -> None:
client = docker.APIClient(base_url="unix://var/run/docker.sock")
status = client.containers(all=True, filters={"id": container_id})[0]["State"]
def clean_docker_job(api_client: APIClient, container_id: str) -> None:
"""
Clean up container if it is not currently running.

Parameters
----------
api_client
Docker API client.
container_id
ID of container to remove.
"""

status = api_client.containers(all=True, filters={"id": container_id})[0]["State"]

if status != "running":
client.remove_container(container=container_id)
api_client.remove_container(container=container_id)
24 changes: 19 additions & 5 deletions src/container_collection/docker/create_docker_volume.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import docker
from docker import APIClient


def create_docker_volume(path: str) -> docker.models.volumes.Volume:
client = docker.DockerClient(base_url="unix://var/run/docker.sock")
volume = client.volumes.create(
def create_docker_volume(api_client: APIClient, path: str) -> dict:
"""
Create a docker volume that copies content to specified path.

Parameters
----------
api_client
Docker API client.
path
Local path for volume.

Returns
-------
:
Created volume reference object.
"""

return api_client.create_volume(
driver="local", driver_opts={"type": "none", "device": path, "o": "bind"}
)
return volume
25 changes: 21 additions & 4 deletions src/container_collection/docker/get_docker_logs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import docker
from docker import APIClient


def get_docker_logs(container_id: str, log_filter: str) -> str:
client = docker.APIClient(base_url="unix://var/run/docker.sock")
logs = client.logs(container=container_id).decode("utf-8")
def get_docker_logs(api_client: APIClient, container_id: str, log_filter: str) -> str:
"""
Get logs for Docker job.

Parameters
----------
api_client
Docker API client.
container_id
Docker container ID.
log_filter
Filter for log events (use "-" for exclusion).

Returns
-------
:
All filtered log events.
"""

logs = api_client.logs(container=container_id).decode("utf-8")

log_items = logs.split("\n")
if "-" in log_filter:
Expand Down
47 changes: 39 additions & 8 deletions src/container_collection/docker/make_docker_job.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
def make_docker_job(name: str, image: str, index: int) -> dict:
return {
from typing import Optional


def make_docker_job(name: str, image: str, environment: Optional[list[str]] = None) -> dict:
"""
Create docker job definition.

Environment variables are passed as strings using the following structure:

.. code-block:: python

[
"envName1=envValue1",
"envName2=envValue2",
...
]

Parameters
----------
name
Job definition name.
image
Docker image.
environment
List of environment variables as strings.

Returns
-------
:
Job definition.
"""

job_definition = {
"image": image,
"name": f"{name}_{index}",
"environment": [
"SIMULATION_TYPE=LOCAL",
f"FILE_SET_NAME={name}",
f"JOB_ARRAY_INDEX={index}",
],
"name": name,
"volumes": ["/mnt"],
}

if environment is not None:
job_definition["environment"] = environment

return job_definition
17 changes: 14 additions & 3 deletions src/container_collection/docker/remove_docker_volume.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import docker
from docker import APIClient


def remove_docker_volume(volume: docker.models.volumes.Volume) -> None:
volume.remove()
def remove_docker_volume(api_client: APIClient, volume_name: str) -> None:
"""
Remove docker volume.

Parameters
----------
api_client
Docker API client.
volume_name
Name of the docker volume.
"""

api_client.remove_volume(volume_name)
28 changes: 24 additions & 4 deletions src/container_collection/docker/run_docker_command.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
from typing import Optional

import docker
from docker import DockerClient


def run_docker_command(
client: DockerClient,
image: str,
command: list[str],
volume: Optional[docker.models.volumes.Volume] = None,
volume_name: Optional[str] = None,
environment: Optional[list] = None,
detach: bool = False,
) -> None:
"""
Run container from image with given command.

Parameters
----------
api_client
Docker API client.
image
Docker image.
command
Command list passed to container.
volume_name
Name of the docker volume.
environment
List of environment variables as strings.
detach
True to start container and immediately return the Container object,
False otherwise.
"""

environment = [] if environment is None else environment
volumes = {} if volume is None else {volume.name: {"bind": "/mnt", "mode": "rw"}}
volumes = {} if volume_name is None else {volume_name: {"bind": "/mnt", "mode": "rw"}}

client = docker.DockerClient(base_url="unix://var/run/docker.sock")
client.containers.run(
image,
command,
Expand Down
29 changes: 23 additions & 6 deletions src/container_collection/docker/submit_docker_job.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
import docker
from docker import APIClient


def submit_docker_job(job_definition: dict, volume: docker.models.volumes.Volume) -> str:
client = docker.APIClient(base_url="unix://var/run/docker.sock")
host_config = client.create_host_config(binds={volume.name: {"bind": "/mnt", "mode": "rw"}})
def submit_docker_job(api_client: APIClient, job_definition: dict, volume_name: str) -> str:
"""
Submit Docker job.

container = client.create_container(**job_definition, host_config=host_config)
Parameters
----------
api_client
Docker API client.
job_definition
Docker job definition used to create job container.
volume_name
Name of the docker volume.

Returns
-------
:
Container ID.
"""

host_config = api_client.create_host_config(binds={volume_name: {"bind": "/mnt", "mode": "rw"}})

container = api_client.create_container(**job_definition, host_config=host_config)
container_id = container.get("Id")

client.start(container=container_id)
api_client.start(container=container_id)

return container_id
18 changes: 14 additions & 4 deletions src/container_collection/docker/terminate_docker_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import docker
from docker import APIClient


def terminate_docker_job(container_id: str) -> None:
client = docker.APIClient(base_url="unix://var/run/docker.sock")
client.stop(container=container_id, timeout=1)
def terminate_docker_job(api_client: APIClient, container_id: str) -> None:
"""
Terminate specific docker container.

Parameters
----------
api_client
Docker API client.
container_id
ID of container to terminate.
"""

api_client.stop(container=container_id, timeout=1)
Loading