-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon_methods.config
483 lines (434 loc) · 20.2 KB
/
common_methods.config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
import java.nio.file.Path
import nextflow.config.ConfigBase
import nextflow.file.FileHelper
import nextflow.util.SysHelper
includeConfig "../schema/schema.config"
/**
* This methods namespace contains common functions for pipeline set up.
*/
methods {
/**
* Detect and load base and node-specific resource allocations
* @throws IllegalStateException if system resources are not as expected.
* @throws java.nio.file.NoSuchFileException if there is no config file for the partition type.
* @throws NumberFormatException if invalid number provided for max_cpus.
* @throws IllegalArgumentException if invalid memory provided for max_memory or if max_cpus is less than 1.
*/
set_resources_allocation = {
def node_cpus = (params.containsKey('max_cpus')) ? params.max_cpus : SysHelper.getAvailCpus()
try {
node_cpus = node_cpus as Integer
} catch(NumberFormatException number_format_exception) {
System.out.println(" ### ERROR ### Invalid format for params.max_cpus: `${node_cpus}`, please provide an integer.")
throw number_format_exception
}
if (node_cpus < 1) {
throw new IllegalArgumentException("params.max_cpus = `${node_cpus}` cannot be less than 1.")
}
def node_memory = (params.containsKey('max_memory')) ? params.max_memory : SysHelper.getAvailMemory()
try {
node_memory = node_memory as nextflow.util.MemoryUnit
} catch(IllegalArgumentException illegal_argument_exception) {
System.out.println(" ### ERROR ### Invalid memory value specified for params.max_memory: `${node_memory}`, please provide an valid memory unit value.")
throw illegal_argument_exception
}
def node_memory_GB = node_memory.toGiga()
// Load base.config by default for all pipelines
includeConfig "${projectDir}/config/base.config"
def config_to_include = ""
if (params.containsKey('ucla_cds') && params.ucla_cds) {
if (node_cpus == 64) {
// Check memory for M64 node
if (node_memory_GB >= 950 && node_memory_GB <= 1010) {
config_to_include = "${projectDir}/config/M64.config"
} else {
throw new IllegalStateException(" ### ERROR ### System resources not as expected (cpus=${node_cpus} memory=${node_memory_GB}), unable to assign resources.")
}
} else {
// Check memory for F series node
if (node_memory_GB >= (node_cpus * 2 * 0.9 - 1) && node_memory_GB <= (node_cpus * 2)) {
config_to_include = "${projectDir}/config/F${node_cpus}.config"
} else {
throw new IllegalStateException(" ### ERROR ### System resources not as expected (cpus=${node_cpus} memory=${node_memory_GB}), unable to assign resources.")
}
}
} else {
if (node_cpus < 2 || node_memory_GB < 3) {
throw new Exception(" ### ERROR ### System resources not sufficient (cpus=${node_cpus} memory=${node_memory_GB}), minimum of 2 cpus and 4 GB memory required")
} else if (node_cpus < 16 || node_memory_GB < 30) {
config_to_include = "${projectDir}/config/F2.config"
} else if (node_cpus < 32 || node_memory_GB < 60) {
config_to_include = "${projectDir}/config/F16.config"
} else if (node_cpus < 64 || node_memory_GB < 136) {
config_to_include = "${projectDir}/config/F32.config"
} else if (node_memory_GB >= 950) {
config_to_include = "${projectDir}/config/M64.config"
} else {
config_to_include = "${projectDir}/config/F72.config"
}
}
try {
includeConfig config_to_include
} catch(java.nio.file.NoSuchFileException file_not_found_exception) {
System.out.println(" ### ERROR ### The partition-specific config file: ${config_to_include} does not exist. The pipeline does not support the requested partition type.")
throw file_not_found_exception
}
}
/**
* Check upper and lower limits for a resource as defined in params
* lower = params.min_<resource>, upper = params.max_<resource>
*/
check_limits = { Object obj, String type ->
if (type == 'memory') {
try {
if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1)
return params.max_memory as nextflow.util.MemoryUnit
else if (obj.compareTo(params.min_memory as nextflow.util.MemoryUnit) == -1)
return params.min_memory as nextflow.util.MemoryUnit
else
return obj
} catch (all) {
System.out.println(" ### WARNING ### Max memory '${params.max_memory}' or min memory '${params.min_memory}' is not valid! Using default value: $obj")
return obj
}
} else if (type == 'time') {
try {
if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1)
return params.max_time as nextflow.util.Duration
else if (obj.compareTo(params.min_time as nextflow.util.Duration) == -1)
return params.min_time as nextflow.util.Duration
else
return obj
} catch (all) {
System.out.println(" ### WARNING ### Max time '${params.max_time}' or min time '${params.min_time}' is not valid! Using default value: $obj")
return obj
}
} else if (type == 'cpus') {
try {
return Math.max( Math.min( obj as int, params.max_cpus as int ), params.min_cpus as int )
} catch (all) {
System.out.println(" ### WARNING ### Max cpus '${params.max_cpus}' or min cpus '${params.min_cpus}' is not valid! Using default value: $obj")
return obj
}
}
}
/**
* Check and generate keys identifying process namespace keys to update
*/
generate_process_keys = { List processes ->
def process_keys = [] as Set
if (processes) {
processes.each { process_key ->
String generated_key = "withName:${process_key}"
if (process.containsKey(generated_key)) {
process_keys.add(generated_key)
} else {
System.out.println(" ### WARNING ### Process ${process_key} not found in process namespace. Allocation update will be skipped for ${process_key}.")
}
}
} else {
process.each { process_key, process_value ->
if (process_key.startsWith('withName:')) {
process_keys.add(process_key)
}
}
}
return process_keys
}
/**
* Update the base allocation for a given process
*/
update_process_allocation = { String process_key, String resource, Float multiplier ->
if (process[process_key].containsKey(resource)) {
process[process_key][resource] = methods.check_limits(process[process_key][resource] * multiplier, resource)
} else {
System.out.println(" ### WARNING ### No base value defined for resource ${resource} for process ${process_key}. Update will be skipped.")
}
}
/**
* Modify resource allocation
*/
update_base_resource_allocation = { String resource, Float multiplier, List processes=[] ->
def processes_to_update = methods.generate_process_keys(processes)
processes_to_update.each { process_key ->
methods.update_process_allocation(process_key, resource, multiplier)
}
}
/**
* Extract the reference genome version from UCLA CDS path
*
* @throws IllegalArgumentException if genome path does not exist or genome version is not in path
*
*/
get_genome_version = { Object genome_path ->
def genome_real_path = ""
try {
// Resolve path to absolute path
genome_real_path = new File(genome_path).toPath().toRealPath()
} catch (Exception e) {
throw new IllegalArgumentException(" ### ERROR ### Failed to resolve genome path: ${genome_path}")
}
def pattern = ~/^\/hot\/resource\/reference-genome\/(?<genomeversion>[A-Za-z0-9-]+)\/.+$/
def matcher = genome_real_path =~ pattern
matcher.matches()
try {
return matcher.group("genomeversion")
} catch (Exception e) {
throw new IllegalArgumentException(" ### ERROR ### Failed to extract genome version: ${genome_real_path}. Expected path to follow UCLA CDS reference format: /hot/resource/reference-genome/<genome_version>/...")
}
}
/**
* Ensure all required params are provided
* @throws IllegalArgumentException when required information is missing
*/
check_registered_output_params = {
// TO-DO: Regex/custom validation for standardized fields, like dataset_id
// Can create a custom schema for dataset registration-specific fields
def required_information = [
'dataset_id',
'patient_id',
'sample_id',
'ucla_cds_analyte',
'ucla_cds_technology',
'ucla_cds_reference_genome_version',
]
def missing_information = []
for (field in required_information) {
if ( !(params.containsKey(field) && params[field])) {
missing_information.add(field)
}
}
if (missing_information.size() != 0) {
throw new IllegalArgumentException(" ### ERROR ### Missing params required for registered dataset output directory generation: ${missing_information}.")
}
if (params.containsKey('save_intermediate_files') && params.save_intermediate_files) {
params.save_intermediate_files = false
System.out.println(" ### WARNING ### Intermediate file saving has been automatically disabled with registered output.")
}
}
/**
* Generate a UUID
*/
generate_uuid = {
return UUID.randomUUID().toString()
}
/**
* Generate the output path for registered output
* @throws IllegalArgumentException when data_dir is not a valid string type
*/
generate_registered_output_directory = { Object data_dir="/hot/data" ->
def STRING_TYPES = [String, GString]
if (!STRING_TYPES.any{ data_dir in it }) {
throw new IllegalArgumentException(" ### ERROR ### Input data_dir for generate_registered_output_directory must be a String or GString!")
}
methods.check_registered_output_params()
def disease = params.dataset_id.substring(0, 4)
def registered_output_directory = "${data_dir}/${disease}/${params.dataset_id}/${params.patient_id}/${params.sample_id}/${params.ucla_cds_analyte}/${params.ucla_cds_technology}/aligned/${params.ucla_cds_reference_genome_version}"
schema.check_path(registered_output_directory, 'w')
return registered_output_directory
}
/**
* Parse the common publish dir rules
*/
get_common_publish_dir_rules = {
def common_publish_dir_rules = []
if (process.publishDir) {
if (process.publishDir in Map) {
common_publish_dir_rules = [process.publishDir]
} else if (process.publishDir in List) {
common_publish_dir_rules = process.publishDir
} else {
throw new IllegalArgumentException("Unexpected common publishDir type: ${process.publishDir.getClass()}. Please define either a Map or a List of Maps")
}
}
return common_publish_dir_rules
}
/**
* Set up specific process publishDir
*/
get_publish_dir = { Object aprocess, List common_publish_dir_rules, Boolean disable_common_rules ->
def publish_dir_rules = []
if (!disable_common_rules) {
publish_dir_rules = common_publish_dir_rules
}
if (process[aprocess.key].publishDir) {
if (process[aprocess.key].publishDir in Map) {
publish_dir_rules = [process[aprocess.key].publishDir] + publish_dir_rules
} else if (process[aprocess.key].publishDir in List) {
publish_dir_rules = process[aprocess.key].publishDir + publish_dir_rules
}
}
return publish_dir_rules
}
/**
* Combine the common publishDir rules with process-specific ones
*/
merge_publish_dirs = {
def common_publish_dir_rules = methods.get_common_publish_dir_rules()
def disable_common_rules = false
for (i in process) {
if (i.key.startsWith('withName:')) {
if (process[i.key].containsKey('disable_common_rules')) {
disable_common_rules = process[i.key].disable_common_rules
process[i.key].remove('disable_common_rules')
} else {
disable_common_rules = false
}
process[i.key].publishDir = methods.get_publish_dir(i, common_publish_dir_rules, disable_common_rules)
}
}
}
/**
* Check for existence of publishDir config file and load if available
*
* @throws NoSuchFileException if config file cannot be located.
*/
load_publish_dirs = { String file_path="${projectDir}/config/publish_dir.config" ->
System.out.println("Attempting to load publishDir config: ${file_path}")
try {
includeConfig file_path
System.out.println("Loaded ${file_path}")
} catch(java.nio.file.NoSuchFileException file_not_found_exception) {
System.out.println(" ### ERROR ### The publishDir config file: ${file_path} does not exist.")
throw file_not_found_exception
}
}
/**
* Identify the default workDir based on Slurm env variables and path permissions
*/
get_default_workdir = {
def slurm_job_id = System.getenv("SLURM_JOB_ID")
String default_workdir = "/scratch"
if (slurm_job_id) {
String default_workdir_with_job_id = "${default_workdir}/${slurm_job_id}"
try {
schema.check_path(default_workdir_with_job_id, 'w')
return default_workdir_with_job_id
} catch(Exception path_check_failed_exception) { }
}
return default_workdir
}
/**
* Set workDir according to given parameters
*/
set_env = {
if (params.ucla_cds) {
/**
* By default, if the /scratch directory exists, set it as the Nextflow working directory
* along with a Slurm job-specific annotation if avaialable
* If config file specified work_dir, set it as the Nextflow working directory
*
* WARNING: changing this directory can lead to high server latency and
* potential disk space limitations. Change with caution! The 'workDir'
* in Nextflow determines the location of intermediate and temporary files.
*/
params.work_dir = (params.containsKey('work_dir') && params.work_dir) ? params.work_dir : methods.get_default_workdir()
schema.check_path(params.work_dir, 'w')
workDir = params.work_dir
} else {
// If work_dir was specified as a param and exists or can be created, set workDir. Otherwise, let Nextflow's default behavior dictate workDir
// Default Nextflow behavior: NXF_WORK environment variable if set, otherwise "${launchDir}/work"
if (params.containsKey("work_dir") && params.work_dir) {
schema.check_path(params.work_dir, 'w')
workDir = params.work_dir
} else {
// Set the work_dir to match the default Nextflow behavior for parity
params.work_dir = System.getenv("NXF_WORK") ?: "${launchDir}/work"
}
}
}
/**
* Keep only allowed characters in a string, usually a sample or patient ID
*/
sanitize_uclahs_cds_id = { raw ->
if (![String, GString].any{ raw in it }) {
throw new Exception("Input to sanitize is either empty or not a string! Provide a non-empty string.")
}
def disallowed_characters = /[^a-zA-Z\d\/_.-]/
return raw.replaceAll(disallowed_characters, '').replace('_', '-')
}
/**
* Set up Docker to use the cpus allocated to a process as number of CPUs and not CPU share
*/
setup_docker_cpus = {
int default_cpu_shares = 1024 // Default from Docker
// Nextflow by default adds "--cpu-shares <1024 * task.cpus>" to the docker run command
// This resets that value to the default of 1024 and adds the "--cpus" option to go along with it
process.containerOptions = {-> (task.containsKey('cpus')) ? "--cpu-shares ${default_cpu_shares} --cpus ${task.cpus}" : "--cpu-shares ${default_cpu_shares}"}
}
/**
* Configure all processes to save their command files in the output
* directory.
*
* This add a custom process directive that, when used as the afterScript,
* will copy all of the process's .command.* files into the output
* directory.
* Processes can customize the output directory by setting
* `process.ext.log_dir` and `process.ext.log_dir_suffix`. Both may be
* closures.
*
* Inspired by https://github.com/nextflow-io/nextflow/issues/1166#issuecomment-502467562
*/
setup_process_afterscript = {
process.ext.log_dir = {
"${task.process.replace(':', '/')}"
}
process.ext.capture_logs = true
process.ext.commonAfterScript = {
if (!task.ext.capture_logs) {
return ""
}
process_log_dir = [
"${params.log_output_dir}",
"process-log",
"${task.ext.log_dir}${task.ext.log_dir_suffix ?: ''}"
].join("/")
// Handle relative paths
if (process_log_dir.substring(0, 1) != "/") {
process_log_dir = "${launchDir}/${process_log_dir}"
}
return """\
readonly LOG_DIR="${process_log_dir}"
mkdir -p "\${LOG_DIR}"
for filename in .command.*; do
[ -e "\${filename}" ] || continue
cp "\${filename}" "\${LOG_DIR}/log\${filename}"
done
""".stripIndent()
}
/*
Set the default afterScript. If individual processes override
afterScript, they can restore this functionality like so (this is safe
to include even if setup_process_afterscript is not called):
afterScript {
[
"echo 'Before the common'",
task.ext.commonAfterScript ?: "",
"echo 'After the common'"
].join("\n")
}
*/
process.afterScript = process.ext.commonAfterScript
}
/**
* Resolve the absolute path of a file relative to the current config file.
*/
get_absolute_path = { String relativePath ->
// Nextflow has a ConfigBase class with a private `configStack` member
// holding the currently-loading config files. Do some skullduggery to
// get access to it.
def f = ConfigBase.class.getDeclaredField("configStack")
boolean accessible = f.isAccessible()
f.setAccessible(true)
configStack = f.get(this)
f.setAccessible(accessible)
// Adapted from Nextflow code
// https://github.com/nextflow-io/nextflow/blob/9f1b68900acb29098effd5a57998a15364958207/modules/nextflow/src/main/groovy/nextflow/config/ConfigBase.groovy#L73
Path currentConfig = configStack ? configStack.peek() : null
Path resolvedPath = FileHelper.asPath(relativePath.toString())
if (!resolvedPath.isAbsolute() && currentConfig) {
resolvedPath = currentConfig.resolveSibling(relativePath.toString())
}
return resolvedPath.toString()
}
}