-
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathdocker_extract.py
executable file
·106 lines (87 loc) · 3.47 KB
/
docker_extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-only
import argparse
import os
import sys
from pathlib import Path
from typing import Iterator, Union, cast
import cwl_utils.parser_v1_0 as cwl
from cwl_utils.image_puller import (
DockerImagePuller,
ImagePuller,
SingularityImagePuller,
)
ProcessType = Union[cwl.Workflow, cwl.CommandLineTool, cwl.ExpressionTool]
WorkflowDir = ""
def parse_args() -> argparse.Namespace:
"""Argument parser."""
parser = argparse.ArgumentParser(
description="Tool to save docker images from a cwl workflow."
)
parser.add_argument("dir", help="Directory in which to save images")
parser.add_argument("input", help="Input CWL workflow")
parser.add_argument(
"-s",
"--singularity",
help="Use singularity to pull the image",
action="store_true",
)
return parser.parse_args()
def main(args: argparse.Namespace) -> None:
"""Extract the docker reqs and download them using Singularity or docker."""
global WorkflowDir
os.makedirs(args.dir, exist_ok=True)
top = cwl.load_document(args.input)
WorkflowDir = str(Path(args.input).parent)+"/"
for req in traverse(top):
if not req.dockerPull:
print(f"Unable to save image from {req} due to lack of 'dockerPull'.")
continue
if args.singularity:
image_puller: ImagePuller = SingularityImagePuller(req.dockerPull, args.dir)
else:
image_puller = DockerImagePuller(req.dockerPull, args.dir)
image_puller.save_docker_image()
def extract_docker_requirements(
process: ProcessType,
) -> Iterator[cwl.DockerRequirement]:
"""Yield an iterator of the docker reqs, normalizint the pull request."""
for req in extract_docker_reqs(process):
if isinstance(req.dockerPull, str) and ":" not in req.dockerPull:
req.dockerPull += ":latest"
yield req
def extract_docker_reqs(process: ProcessType) -> Iterator[cwl.DockerRequirement]:
"""For the given process, extract the DockerRequirement(s)."""
if process.requirements:
for req in process.requirements:
if isinstance(req, cwl.DockerRequirement):
yield req
if process.hints:
for req in process.hints:
if req["class"] == "DockerRequirement":
yield cwl.load_field(
req,
cwl.DockerRequirementLoader,
Path.cwd().as_uri(),
process.loadingOptions,
)
def traverse(process: ProcessType) -> Iterator[cwl.DockerRequirement]:
"""Yield the iterator for the docker reqs, including an workflow steps."""
yield from extract_docker_requirements(process)
if isinstance(process, cwl.Workflow):
yield from traverse_workflow(process)
def get_process_from_step(step: cwl.WorkflowStep) -> ProcessType:
"""Return the process for this step, loading it if necessary."""
global WorkflowDir
if isinstance(step.run, str):
return cast(ProcessType, cwl.load_document(WorkflowDir+step.run))
return cast(ProcessType, step.run)
def traverse_workflow(workflow: cwl.Workflow) -> Iterator[cwl.DockerRequirement]:
"""Iterate over the steps of this workflow, yielding the docker reqs."""
for step in workflow.steps:
for req in extract_docker_reqs(step):
yield req
yield from traverse(get_process_from_step(step))
if __name__ == "__main__":
main(parse_args())
sys.exit(0)