-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathexp_launcher.py
147 lines (105 loc) · 4.34 KB
/
exp_launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from concurrent.futures import process
from omegaconf import OmegaConf
import importlib
import multiprocessing as mp
import argparse
import random
import time
import subprocess
import os
import csv
from tqdm import tqdm
from setproctitle import setproctitle
from coolname import generate_slug
def find_available_gpus(idle_threshold: int = 5):
# Find all gpus with GPU util and MemoryAccess util < threshold
csv_output = subprocess.check_output(["nvidia-smi",
"--query-gpu=index,utilization.gpu,utilization.memory",
"--format=csv,noheader,nounits"]).decode()
csv_output = [list(map(int, line)) for line in csv.reader(csv_output.splitlines())]
return [index for (index, gpu_util, mem_util) in csv_output if (gpu_util < idle_threshold) and (mem_util < idle_threshold)]
def split(a, n):
# Split list a to n chunks
# https://stackoverflow.com/questions/2130016/splitting-a-list-into-n-parts-of-approximately-equal-length
k, m = divmod(len(a), n)
return [a[i*k+min(i, m): (i+1)*k+min(i+1, m)] for i in range(n)]
def get_class_from_str(import_name: str, class_name: str):
if import_name:
class_inst = getattr(importlib.import_module(import_name), class_name, None)
else:
class_inst = globals().get(class_name, None)
return class_inst
def worker_(env, import_name, method, conf_list):
# Set environment variables for worker
os.environ.update(env)
m = get_class_from_str(import_name, method)
for conf in conf_list:
# set proctitle
name = conf.log_group
setproctitle(name)
# info
tqdm.write("-- {} running --".format(name))
tqdm.write(OmegaConf.to_yaml(conf))
tqdm.write("-------------")
# begin process
m(conf)
def launch_experiments(launch):
mp.set_start_method("spawn")
# generate experiment conf list
exp_conf_list = []
seed = random.SystemRandom().randint(0, int(1e8)) # guarantee different seed for each experiment
run_uuid = generate_slug(2)
for expset_name, expset_conf in launch.experiment_sets.items():
for task_conf in launch.tasks:
# allocate seed for each experiment
for _ in range(launch.launch.seed_per_exp):
exp_conf_list.append(OmegaConf.merge(task_conf, expset_conf, {"seed": seed, "log_group": f"{expset_name} {run_uuid}", "log_eval_metric_save_name": expset_name}))
seed += 1
# run in random order to balance loads
random.shuffle(exp_conf_list)
# single-threaded prepare for running environment
if "prepare_method" in launch.launch:
prepare_fn = get_class_from_str(launch.launch.prepare_filename, launch.launch.prepare_method)
for conf in exp_conf_list:
prepare_fn(conf)
# allocate experiments
all_devices = launch.devices if hasattr(launch, "devices") else find_available_gpus()
if not len(all_devices):
print ("No available devices !!!")
return
print(f"Available devices: {all_devices}")
num_processes = launch.launch.runs_per_device * len(all_devices)
proc_device_id = list(all_devices) * launch.launch.runs_per_device
proc_conf_list = split(exp_conf_list, num_processes)
# get env variables
env = OmegaConf.to_container(launch.get("env", OmegaConf.create({})))
env = {k: str(v) for k, v in env.items()} # To string
# create processes
processes = []
for proc_id, conf_list in enumerate(proc_conf_list):
# Set device id for proc
proc_env = env.copy()
proc_env["CUDA_VISIBLE_DEVICES"] = str(proc_device_id[proc_id])
# start process
processes.append(mp.Process(target=worker_, kwargs={
"env": proc_env,
"import_name": launch.launch.filename,
"method": launch.launch.method,
"conf_list": conf_list
}))
# run processes
for proc in processes:
proc.start()
# cold start the processes to avoid contention
time.sleep(launch.launch.cold_start_seconds)
[proc.join() for proc in processes]
def main():
conf = OmegaConf.from_cli()
if hasattr(conf, "include"):
conf = OmegaConf.merge(
OmegaConf.load(conf.include),
conf
)
launch_experiments(conf)
if __name__ == "__main__":
main()