Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dependabot/submodules…
Browse files Browse the repository at this point in the history
…/pipeline-submodules-d6234f9053
  • Loading branch information
nwiltsie committed Jul 3, 2024
2 parents 7d7bed0 + 76b343d commit 4d3d812
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 63 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to the call-gSV pipeline.
---

## [Unreleased]
### Added
- Add `methods.modify_base_allocations()` to update resource allocation

### Changed
- Use `methods.setup_process_afterscript()` for process logs
- Increase CPU allocation for Manta

---

Expand Down
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,53 @@ input:

An example of the NextFlow Input Parameters Config file can be found [here](config/template.config).

### Base resource allocation updaters
To optionally update the base resource (cpus or memory) allocations for processes, use the following structure and add the necessary parts to the [input.config](config/template.config) file. The default allocations can be found in the [node-specific config files](./config/)

```Nextflow
base_resource_update {
memory = [
[['process_name', 'process_name2'], <multiplier for resource>],
[['process_name3', 'process_name4'], <different multiplier for resource>]
]
cpus = [
[['process_name', 'process_name2'], <multiplier for resource>],
[['process_name3', 'process_name4'], <different multiplier for resource>]
]
}
```
> **Note** Resource updates will be applied in the order they're provided so if a process is included twice in the memory list, it will be updated twice in the order it's given.
Examples:

- To double memory of all processes:
```Nextflow
base_resource_update {
memory = [
[[], 2]
]
}
```
- To double memory for `call_gSV_Delly` and triple memory for `run_validate_PipeVal` and `call_gSV_Manta`:
```Nextflow
base_resource_update {
memory = [
['call_gSV_Delly', 2],
[['run_validate_PipeVal', 'call_gSV_Manta'], 3]
]
}
```
- To double CPUs and memory for `call_gSV_Manta` and double memory for `run_validate_PipeVal`:
```Nextflow
base_resource_update {
cpus = [
['call_gSV_Manta', 2]
]
memory = [
[['call_gSV_Manta', 'run_validate_PipeVal'], 2]
]
}
```

---

## Outputs
Expand Down
2 changes: 1 addition & 1 deletion config/F16.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ process {
}
}
withName: call_gSV_Manta {
cpus = 1
cpus = 6
memory = 8.GB
retry_strategy {
memory {
Expand Down
2 changes: 1 addition & 1 deletion config/F32.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ process {
}
}
withName: call_gSV_Manta {
cpus = 1
cpus = 12
memory = 15.GB
retry_strategy {
memory {
Expand Down
2 changes: 1 addition & 1 deletion config/F72.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ process {
}
}
withName: call_gSV_Manta {
cpus = 1
cpus = 24
memory = 30.GB
retry_strategy {
memory {
Expand Down
2 changes: 1 addition & 1 deletion config/M64.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ process {
}
}
withName: call_gSV_Manta {
cpus = 1
cpus = 30
memory = 60.GB
retry_strategy {
memory {
Expand Down
68 changes: 65 additions & 3 deletions config/custom_schema_types.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,37 @@ custom_schema_types {
'normal',
'tumor'
]
allowed_resource_types = [
'memory',
'cpus'
]

/**
* Check if given input is a number
*/

check_if_number = { val, String name ->
if (!(val in Integer || val in Float || val in BigDecimal)) {
throw new Exception("${name} should be an Integer, Float or BigDecimal, not ${val.getClass()}")
}
}

/**
* Check if given input is valid process list
*/
check_if_process_list = { val, String name ->
if (custom_schema_types.is_string(val)) {
if (val.isEmpty()) {
throw new Exception("Empty string specified for ${name}. Please provide valid input.")
}
} else {
try {
custom_schema_types.check_if_list(val, name)
} catch(Exception e) {
throw new Exception("${name} should be either a string or a list. Please provide valid input.")
}
}
}

/**
* Check that input types are in allowed list
Expand Down Expand Up @@ -83,9 +114,40 @@ custom_schema_types {
}
}

/**
* Check namespace for resource updates
*/
check_resource_update_namespace = { Map options, String name, Map properties ->
custom_schema_types.check_if_namespace(options[name], name)
def given_keys = options[name].keySet() as ArrayList
if (given_keys.size() <= 0) {
return
}
custom_schema_types.check_input_type_keys(given_keys, name, custom_schema_types.allowed_resource_types)

options[name].each { entry ->
def entry_as_map = [:]
entry_as_map[entry.key] = entry.value
schema.validate_parameter(entry_as_map, entry.key, properties.elements[entry.key])
}
}

/**
* Check list of resource updates
*/
check_resource_update_list = { Map options, String name, Map properties ->
custom_schema_types.check_if_list(options[name], name)
for (item in options[name]) {
custom_schema_types.check_if_process_list(item[0], name)
custom_schema_types.check_if_number(item[1], name)
}
}

types = [
'InputNamespace': custom_schema_types.check_input_namespace,
'InputBAMNamespace': custom_schema_types.check_bam_namespace,
'BAMEntryList': custom_schema_types.check_bam_list
]
}
'BAMEntryList': custom_schema_types.check_bam_list,
'ResourceUpdateNamespace': custom_schema_types.check_resource_update_namespace,
'ResourceUpdateList': custom_schema_types.check_resource_update_list
]
}
15 changes: 15 additions & 0 deletions config/methods.config
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,32 @@ methods {
process.cache = params.cache_intermediate_pipeline_steps
}

modify_base_allocations = {
if (!(params.containsKey('base_resource_update') && params.base_resource_update)) {
return
}

params.base_resource_update.each { resource, updates ->
updates.each { processes, multiplier ->
def processes_to_update = (custom_schema_types.is_string(processes)) ? [processes] : processes
methods.update_base_resource_allocation(resource, multiplier, processes_to_update)
}
}
}

// Set up env, timeline, trace, and report above.
setup = {
methods.set_env()
schema.load_custom_types("${projectDir}/config/custom_schema_types.config")
schema.validate()
methods.set_ids_from_bams()
methods.set_resources_allocation()
methods.modify_base_allocations()
methods.set_output_dir()
methods.set_log_output_dir()
methods.set_pipeline_logs()
methods.set_process()
methods.setup_process_afterscript()
retry.setup_retry()
methods.setup_docker_cpus()
}
Expand Down
13 changes: 13 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ save_intermediate_files:
help: 'Save intermediate files from the pipeline'
default:
- false
base_resource_update:
type: 'ResourceUpdateNamespace'
required: false
help: 'User-defined modifications for adjusting base resource allocations for processes'
elements:
memory:
type: 'ResourceUpdateList'
required: false
help: 'List of memory updates'
cpus:
type: 'ResourceUpdateList'
required: false
help: 'List of CPU updates'
input:
type: 'InputNamespace'
required: true
Expand Down
2 changes: 2 additions & 0 deletions config/template.config
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ params {

map_qual = 20 // min. paired-end (PE) mapping quality for Delly

// Base resource allocation updater
// See README for adding parameters to update the base resource allocations
}

methods.setup()
6 changes: 0 additions & 6 deletions module/bcftools.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ process convert_BCF2VCF_BCFtools {
pattern: "*.vcf",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
path bcf_file
val bam_sample_name
val variant_type

output:
path "DELLY-${params.delly_version}_${variant_type}_${params.dataset_id}_${bam_sample_name}.vcf", emit: vcf_file
path ".command.*"

"""
set -euo pipefail
Expand Down
24 changes: 0 additions & 24 deletions module/delly.nf
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ process call_gSV_Delly {
pattern: "*.bcf*",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
tuple val(bam_sample_name), path(input_bam), path(input_bam_bai)
path(reference_fasta)
Expand All @@ -29,7 +24,6 @@ process call_gSV_Delly {
output:
path "${params.output_filename}_${params.GSV}.bcf", emit: bcf_sv_file
path "${params.output_filename}_${params.GSV}.bcf.csi", emit: bcf_sv_file_csi
path ".command.*"
val bam_sample_name, emit: bam_sample_name

script:
Expand All @@ -52,11 +46,6 @@ process regenotype_gSV_Delly {
pattern: "*.bcf*",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
tuple val(bam_sample_name), path(input_bam), path(input_bam_bai)
path(reference_fasta)
Expand All @@ -67,7 +56,6 @@ process regenotype_gSV_Delly {
output:
path "${params.output_filename}_${params.RGSV}.bcf", emit: regenotyped_sv_bcf
path "${params.output_filename}_${params.RGSV}.bcf.csi", emit: regenotyped_sv_bcf_csi
path ".command.*"

script:
"""
Expand All @@ -90,11 +78,6 @@ process call_gCNV_Delly {
pattern: "*.bcf*",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
tuple val(bam_sample_name), path(input_bam), path(input_bam_bai)
path(delly_sv_file)
Expand All @@ -105,7 +88,6 @@ process call_gCNV_Delly {
output:
path "${params.output_filename}_${params.GCNV}.bcf", emit: bcf_cnv_file
path "${params.output_filename}_${params.GCNV}.bcf.csi", emit: bcf_cnv_file_csi
path ".command.*"
val bam_sample_name, emit: bam_sample_name

script:
Expand All @@ -128,11 +110,6 @@ process regenotype_gCNV_Delly {
pattern: "*.bcf*",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
tuple val(bam_sample_name), path(input_bam), path(input_bam_bai)
path(reference_fasta)
Expand All @@ -143,7 +120,6 @@ process regenotype_gCNV_Delly {
output:
path "${params.output_filename}_${params.RGCNV}.bcf", emit: regenotyped_cnv_bcf
path "${params.output_filename}_${params.RGCNV}.bcf.csi", emit: regenotyped_cnv_bcf_csi
path ".command.*"

script:
"""
Expand Down
8 changes: 1 addition & 7 deletions module/manta.nf
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ process call_gSV_Manta {
mode: "copy",
saveAs: { "${params.output_filename}_${file(it).getName()}" }

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
tuple val(bam_sample_name), path(input_bam), path(input_bam_bai)
path(reference_fasta)
Expand All @@ -38,7 +33,6 @@ process call_gSV_Manta {
path("${params.output_filename}_candidateSV.vcf.gz"), emit: vcf_candidate_sv_file
path("${params.output_filename}_candidateSV.vcf.gz.tbi"), emit: vcf_candidate_sv_tbi
path "*Stats*"
path ".command.*"
val bam_sample_name, emit: bam_sample_name

script:
Expand All @@ -49,7 +43,7 @@ process call_gSV_Manta {
--normalBam $input_bam \
--referenceFasta $reference_fasta \
--runDir MantaWorkflow
MantaWorkflow/runWorkflow.py
MantaWorkflow/runWorkflow.py -j ${task.cpus}
# re-name Manta outputs based on output file name standardization - `params.output_filename`
for variant_file in `ls MantaWorkflow/results/variants/*`
Expand Down
6 changes: 0 additions & 6 deletions module/rtgtools.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ process run_vcfstats_RTGTools {
pattern: "*_stats.txt",
mode: "copy"

publishDir "${params.workflow_log_dir}",
pattern: ".command.*",
mode: "copy",
saveAs: { "${task.process.replace(':', '/')}/log${file(it).getName()}" }

input:
path vcf_sv_file
val bam_sample_name
val variant_type

output:
path "DELLY-${params.delly_version}_${variant_type}_${params.dataset_id}_${bam_sample_name}_stats.txt"
path ".command.*"

"""
set -euo pipefail
Expand Down
Loading

0 comments on commit 4d3d812

Please sign in to comment.