-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
222 lines (174 loc) · 5.76 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import argparse
import os
import shutil
from abc import abstractmethod
from pathlib import Path
import subprocess
from cadetrdm import ProjectRepo, Options
from cadetrdm.wrapper import tracks_results
from joblib import Parallel, delayed
import pathos
class ParallelizationBase:
def __init__(self, n_cores=1):
self.n_cores = n_cores
@abstractmethod
def run(self, func, args_list):
"""Run a function over a list of input args and return the output.
Parameters
----------
func : callable
The function wrapping the shell command.
args_list : list | iterable
List of args to the func.
Returns
-------
List of function returns
"""
return
class SequentialBackend(ParallelizationBase):
def run(self, func, args_list):
results = []
for args in args_list:
results.append(func(*args))
return results
class JoblibBackend(ParallelizationBase):
def run(self, func, args_list):
Parallel(n_jobs=self.n_cores)(delayed(func)(*args) for args in args_list)
class PathosBackend(ParallelizationBase):
def run(self, func, args_list):
with pathos.pools.ProcessPool(ncpus=self.n_cores) as pool:
# *zip(*args_list) because for multiple args, pathos expects (func, args1_list, args2_list)
results = pool.map(func, *zip(*args_list))
return results
def run_func_over_args_list(func, args_list, backend=None, n_cores=1):
"""Run a function over a list of input args and return the output.
Parameters
----------
func : callable
The function wrapping the shell command.
args_list : list | iterable
List of tuple of args to the func.
backend : ParallelizationBase, optional
Backend for parallelization. Default is SequentialBackend
n_cores : int, optional
Number of cores to use for parallelization
Returns
-------
List of function returns
"""
if type(args_list[0]) not in (list, tuple):
args_list = [(x,) for x in args_list]
if backend is None and n_cores == 1:
backend = SequentialBackend()
else:
backend = PathosBackend(n_cores=n_cores)
return backend.run(func, args_list)
def run_command(command):
"""Run a shell command and return its output.
Parameters
----------
command : str
The shell command to run.
Returns
-------
None
"""
print(command)
return subprocess.run(command, shell=True, check=True)
def remove_non_jupytext_files(file_list):
"""
Filter out all python files that do not have a 'jupytext:' header section.
:param file_list:
:return:
"""
filtered_list = []
for file in file_list:
with open(file, "r") as handle:
lines = handle.readlines()
if any("jupytext:" in line for line in lines):
filtered_list.append(file)
return filtered_list
def create_output(root_path: Path, output_path: Path, n_cores=1):
"""Create solution files.
Parameters
----------
root_path : Path
Root directory wherein all .py files are located
output_path : Path
Root directory where all compiled .ipynbs should be placed
n_cores : int, optional
Number of cpu cores to use for parallelization
Returns
-------
None
"""
if os.path.exists(output_path):
shutil.rmtree(output_path)
shutil.copytree(root_path, output_path)
# Find all myst files recursively
python_files = list(output_path.glob("**/*.py"))
# Filter out checkpoints
python_files = [
file for file in python_files if (".ipynb_checkpoints" not in file.as_posix())
]
# Filter out python files that are not marked for jupytext conversion
python_files = remove_non_jupytext_files(python_files)
# Make ipynbs
ipynb_files = [file.with_suffix(".ipynb") for file in python_files]
# Run jupytext for each myst file
run_func_over_args_list(
func=convert_python_to_ipynb,
args_list=[
(python_path.as_posix(), ipynb_path.as_posix())
for python_path, ipynb_path in zip(python_files, ipynb_files)
],
n_cores=n_cores,
)
def convert_python_to_ipynb(myst_file_path, ipynb_file_path):
"""Run jupytext with --output ipynb flag on myst_file_path. Will skip README files.
Will run the jupyter notebooks.
Parameters
----------
myst_file_path : str
path to myst file as posix.
ipynb_file_path : str
path to output ipynb file as posix.
Returns
-------
None
"""
if "README" in myst_file_path:
return
results = run_command(
f'jupytext --output "{ipynb_file_path}" --execute "{myst_file_path}"'
)
return results
@tracks_results
def main(repo: ProjectRepo, options, **kwargs):
"""Run post-commit tasks based on command-line arguments.
Returns
-------
None
"""
parser = argparse.ArgumentParser(description="Perform post-commit tasks.")
parser.add_argument("--n_cores", help="Number of cores to use.")
args = parser.parse_args()
# This isn't great, but for now (and with argparse) the best I could think of
for kwarg_key, kwarg_value in kwargs.items():
if kwarg_value is None:
continue
args.__setattr__(kwarg_key, kwarg_value)
if args.n_cores is None:
args.n_cores = 1
create_output(
root_path=repo.path / options.source_directory,
output_path=repo.output_path / options.source_directory,
n_cores=args.n_cores,
)
if __name__ == "__main__":
options = Options()
options.commit_message = "Trying out new things"
options.debug = True
options.push = False
options.source_directory = "src"
main(options)