-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_pyopatra_tests.py
130 lines (108 loc) · 3.86 KB
/
run_pyopatra_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from tapisclient import TapisClient
from multiprocessing.connection import Listener
from fire import Fire
import os
import yaml
from datetime import datetime
from config import server_config, base_job_config
"""
Entry script to run HPC tests via CI/CD
"""
class TestCase:
DEFAULT_CONFIG = {
"nodeCount": 1,
"processorsPerNode": 32,
"maxRunTime": "00:30:00"
}
def __init__(self, dirname):
# There has to be a run.py file to count
if not os.path.exists(dirname+"/run.py"):
self.is_valid = False
return
config = None
for ending in [".yml", ".yaml"]:
path = dirname+"/config"+ending
if os.path.exists(path):
with open(path, "r") as fp:
config = yaml.load(fp)
break
self.name = os.path.basename(dirname)
if config is None:
print(f"Warning: Missing config file for test '{self.name}'! Using defaults.")
config = self.DEFAULT_CONFIG
self.is_valid = True
self.config = config
self.dirname = dirname
self.remote_dir = None
def make_zip_file(self):
"""Zip the test assets and return a path to the zipfile
"""
path = self.dirname + "/testcase.zip"
if os.path.exists(path): os.remove(path)
os.system(f"cd {self.dirname} && zip -r testcase.zip *")
self.zipfile = path
return path
def get_job_config(self, storage_system):
# a shallow copy is ok here
config = {**base_job_config}
config.update(self.config)
config["name"] = "pyopatra-test-"+self.name
config["inputs"] = {"test_assets": f"agave://{storage_system}/{self.remote_dir}/testcase.zip"}
print(config)
return config
def get_remote_dir(self):
if self.remote_dir is None:
self.remote_dir = f"tapis_job_assets/{self.name}-" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
return self.remote_dir
def main(testsdir,
tapisconfig="tapisconfig.json",
):
# iterate over tests
# currently each directory in the tests directory describes a single test
tests = []
for dirname in next(os.walk(testsdir))[1]:
path = testsdir+"/"+dirname
testcase = TestCase(path)
if not testcase.is_valid:
# not a valid testcase
continue
tests.append(testcase)
print(f"Found {len(tests)} tests. Uploading assets. . .")
t = TapisClient(tapisconfig)
for test in tests:
zipfile = test.make_zip_file()
remote_dir = test.get_remote_dir()
t.mkdir(remote_dir)
t.upload(zipfile, remote_dir)
# TODO - check for errors with the upload operations
print("Uploaded test assets. Submitting jobs. . .")
jobs = {}
for test in tests:
j = t.submit_job(test.get_job_config(t.storage_system))
jobs[j['result']['id']] = j['result']
print(f"Submitted {len(jobs)} jobs. Monitoring status. . .")
listener = Listener(('localhost', server_config['message_port']), authkey=b'speakfriendandenter')
conn = listener.accept()
completed = {}
failed = {}
while True:
try:
msg = conn.recv()
except EOFError:
conn.close()
conn = listener.accept()
continue
print("Recieved status update: ", msg)
if msg['status'] in ['FINISHED', 'FAILED', 'STOPPED']:
job_id = msg['id']
completed[job_id] = msg
if msg['status'] in ['FAILED', 'STOPPED']:
failed[job_id] = msg
print(f"Job {job_id} completed.")
if len(completed) == len(jobs):
print("All jobs complete.")
break
if len(failed):
raise RuntimeError("Jobs Failed!", failed)
if __name__ == "__main__":
Fire(main, name="run-pyopatra-tests")