diff --git a/conf/base.config b/conf/base.config index 8640cad463..8b72554083 100644 --- a/conf/base.config +++ b/conf/base.config @@ -10,9 +10,7 @@ */ process { - cpus = {check_resource(params.cpus * task.attempt)} - memory = {check_resource((params.singleCPUMem as nextflow.util.MemoryUnit) * task.attempt)} - time = {check_resource(24.h * task.attempt)} + shell = ['/bin/bash', '-euo', 'pipefail'] errorStrategy = {task.exitStatus in [143,137,104,134,139] ? 'retry' : 'finish'} @@ -21,9 +19,11 @@ process { withLabel:cpus_1 { cpus = {check_resource(1)} + memory = 7.5.GB } withLabel:cpus_2 { cpus = {check_resource(2)} + memory = 15.GB } withLabel:cpus_4 { cpus = {check_resource(4)} @@ -37,18 +37,28 @@ process { withLabel:cpus_max { cpus = {params.max_cpus} } - - withLabel:memory_singleCPU_2_task { - memory = {check_resource((params.singleCPUMem as nextflow.util.MemoryUnit) * 2 * task.attempt)} - } - withLabel:memory_singleCPU_task_sq { - memory = {check_resource((params.singleCPUMem as nextflow.util.MemoryUnit) * task.attempt * task.attempt)} - } - withLabel:memory_max { memory = {params.max_memory} } - + withName: MarkDuplicates { + maxForks = 2 + } + withName: BaseRecalibrator { + maxForks = 32 + } + withName: ApplyBQSRS { + maxForks = 32 + } + withName: ScatterIntervalList { + container = 'us.gcr.io/broad-gotc-prod/genomes-in-the-cloud:2.4.1-1540490856' + } + withName: HaplotypeCaller { + cpus = {check_resource(1)} + memory = 7.5.GB + maxForks = 30 + maxRetries = params.preemptible_tries + container = 'us.gcr.io/broad-gatk/gatk:4.0.10.1' + } withName:ConcatVCF { // For unknown reasons, ConcatVCF sometimes fails with SIGPIPE // (exit code 141). Rerunning the process will usually work. diff --git a/main.nf b/main.nf index de1c2f690a..6a9e1b1fc7 100644 --- a/main.nf +++ b/main.nf @@ -589,78 +589,37 @@ ch_intervals = params.no_intervals ? "null" : params.intervals && !('annotate' i // STEP 0: CREATING INTERVALS FOR PARALLELIZATION (PREPROCESSING AND VARIANT CALLING) -process CreateIntervalBeds { - tag {intervals.fileName} +// This task calls picard's IntervalListTools to scatter the input interval list into scatter_count sub interval lists +// Note that the number of sub interval lists may not be exactly equal to scatter_count. There may be slightly more or less. +// Thus we have the block of python to count the number of generated sub interval lists. - input: - file(intervals) from ch_intervals - - output: - file '*.bed' into bedIntervals mode flatten - - when: (!params.no_intervals) && step != 'annotate' +process ScatterIntervalList { + tag "$interval_list" - script: - // If the interval file is BED format, the fifth column is interpreted to - // contain runtime estimates, which is then used to combine short-running jobs - if (hasExtension(intervals, "bed")) - """ - awk -vFS="\t" '{ - t = \$5 # runtime estimate - if (t == "") { - # no runtime estimate in this row, assume default value - t = (\$3 - \$2) / ${params.nucleotidesPerSecond} - } - if (name == "" || (chunk > 600 && (chunk + t) > longest * 1.05)) { - # start a new chunk - name = sprintf("%s_%d-%d.bed", \$1, \$2+1, \$3) - chunk = 0 - longest = 0 - } - if (t > longest) - longest = t - chunk += t - print \$0 > name - }' ${intervals} - """ - else if (hasExtension(intervals, "interval_list")) - """ - grep -v '^@' ${intervals} | awk -vFS="\t" '{ - name = sprintf("%s_%d-%d", \$1, \$2, \$3); - printf("%s\\t%d\\t%d\\n", \$1, \$2-1, \$3) > name ".bed" - }' - """ - else - """ - awk -vFS="[:-]" '{ - name = sprintf("%s_%d-%d", \$1, \$2, \$3); - printf("%s\\t%d\\t%d\\n", \$1, \$2-1, \$3) > name ".bed" - }' ${intervals} - """ -} + input: + file(interval_list) from ch_intervals -bedIntervals = bedIntervals - .map { intervalFile -> - def duration = 0.0 - for (line in intervalFile.readLines()) { - final fields = line.split('\t') - if (fields.size() >= 5) duration += fields[4].toFloat() - else { - start = fields[1].toInteger() - end = fields[2].toInteger() - duration += (end - start) / params.nucleotidesPerSecond - } - } - [duration, intervalFile] - }.toSortedList({ a, b -> b[0] <=> a[0] }) - .flatten().collate(2) - .map{duration, intervalFile -> intervalFile} + output: + file('out/*/*.interval_list') into scattered_interval_list mode flatten -bedIntervals = bedIntervals.dump(tag:'bedintervals') + script: + """ + mkdir out + java -Xms1g -jar /usr/gitc/picard.jar \ + IntervalListTools \ + SCATTER_COUNT=${params.scatter_count} \ + SUBDIVISION_MODE=BALANCING_WITHOUT_INTERVAL_SUBDIVISION_WITH_OVERFLOW \ + UNIQUE=true \ + SORT=true \ + BREAK_BANDS_AT_MULTIPLES_OF=${params.break_bands_at_multiples_of} \ + INPUT=${interval_list} \ + OUTPUT=out -if (params.no_intervals && step != 'annotate') bedIntervals = Channel.from(file("no_intervals.bed")) + count_intervals.py +""" +} -(intBaseRecalibrator, intApplyBQSR, intHaplotypeCaller, intMpileup, bedIntervals) = bedIntervals.into(5) +(intBaseRecalibrator, intApplyBQSR, intHaplotypeCaller, intMpileup, scattered_interval_list) = scattered_interval_list.into(5) // PREPARING CHANNELS FOR PREPROCESSING AND QC @@ -768,7 +727,6 @@ else inputPairReadsSentieon.close() process MapReads { label 'cpus_max' label 'memory_max' - echo true tag {idPatient + "-" + idRun} @@ -1053,7 +1011,7 @@ process SentieonDedup { // STEP 3: CREATING RECALIBRATION TABLES process BaseRecalibrator { - label 'med_resources' + label 'cpus_1' tag {idPatient + "-" + idSample + "-" + intervalBed.baseName} @@ -1105,8 +1063,8 @@ if (params.no_intervals) { // STEP 3.5: MERGING RECALIBRATION TABLES process GatherBQSRReports { - label 'memory_singleCPU_2_task' - label 'cpus_2' + + label 'cpus_1' tag {idPatient + "-" + idSample} @@ -1174,8 +1132,7 @@ bamApplyBQSR = bamApplyBQSR.dump(tag:'BAM + BAI + RECAL TABLE + INT') process ApplyBQSR { - label 'memory_singleCPU_2_task' - label 'cpus_2' + label 'cpus_1' tag {idPatient + "-" + idSample + "-" + intervalBed.baseName} @@ -1296,7 +1253,8 @@ bamRecalSentieonSampleTSV // STEP 4.5.1: MERGING THE RECALIBRATED BAM FILES process MergeBamRecal { - label 'med_resources' + label 'cpus_max' + label 'memory_max' tag {idPatient + "-" + idSample} @@ -1415,7 +1373,7 @@ bamRecalSampleTSV // STEP 5: QC process SamtoolsStats { - label 'cpus_2' + label 'cpus_1' tag {idPatient + "-" + idSample} @@ -1440,8 +1398,7 @@ samtoolsStatsReport = samtoolsStatsReport.dump(tag:'SAMTools') bamBamQC = bamMappedBamQC.mix(bamRecalBamQC) process BamQC { - label 'memory_max' - label 'cpus_max' + label 'med_resources' tag {idPatient + "-" + idSample} @@ -1508,10 +1465,9 @@ bamHaplotypeCaller = bamRecalAllTemp.combine(intHaplotypeCaller) process HaplotypeCaller { - label 'forks_max' - label 'cpus_1' + label 'cpus_1' - tag {idSample + "-" + intervalBed.baseName} + tag {idSample + "-" + interval_list.baseName} input: set idPatient, idSample, file(bam), file(bai), file(intervalBed) from bamHaplotypeCaller @@ -1548,6 +1504,8 @@ else gvcfHaplotypeCaller = gvcfHaplotypeCaller.dump(tag:'GVCF HaplotypeCaller') // STEP GATK HAPLOTYPECALLER.2 process GenotypeGVCFs { + label 'memory_max' + tag {idSample + "-" + intervalBed.baseName} input: @@ -1561,7 +1519,7 @@ process GenotypeGVCFs { output: set val("HaplotypeCaller"), idPatient, idSample, file("${intervalBed.baseName}_${idSample}.vcf") into vcfGenotypeGVCFs - when: 'haplotypecaller' in tools + when: !(params.noGVCF) && ('haplotypecaller' in tools) script: // Using -L is important for speed and we have to index the interval files also @@ -1838,7 +1796,7 @@ pairBam = pairBam.dump(tag:'BAM Somatic Pair') // Manta, Strelka, Mutect2 (pairBamManta, pairBamStrelka, pairBamStrelkaBP, pairBamCalculateContamination, pairBamFilterMutect2, pairBamTNscope, pairBam) = pairBam.into(7) -intervalPairBam = pairBam.spread(bedIntervals) +intervalPairBam = pairBam.spread(scattered_interval_list) bamMpileup = bamMpileup.spread(intMpileup) @@ -1849,7 +1807,6 @@ bamMpileup = bamMpileup.spread(intMpileup) process FreeBayes { - label 'forks_max' label 'cpus_1' tag {idSampleTumor + "_vs_" + idSampleNormal + "-" + intervalBed.baseName} @@ -1889,7 +1846,6 @@ vcfFreeBayes = vcfFreeBayes.groupTuple(by:[0,1,2]) process Mutect2 { tag {idSampleTumor + "_vs_" + idSampleNormal + "-" + intervalBed.baseName} - label 'forks_max' label 'cpus_1' @@ -2019,7 +1975,6 @@ vcfConcatenated = vcfConcatenated.dump(tag:'VCF') process PileupSummariesForMutect2 { tag {idSampleTumor + "_vs_" + idSampleNormal + "_" + intervalBed.baseName } - label 'forks_max' label 'cpus_1' input: @@ -2052,7 +2007,6 @@ pileupSummaries = pileupSummaries.groupTuple(by:[0,1]) process MergePileupSummaries { - label 'forks_max' label 'cpus_1' tag {idPatient + "_" + idSampleTumor} @@ -2082,7 +2036,6 @@ process MergePileupSummaries { process CalculateContamination { - label 'forks_max' label 'cpus_1' tag {idSampleTumor + "_vs_" + idSampleNormal} @@ -2112,7 +2065,6 @@ process CalculateContamination { process FilterMutect2Calls { - label 'forks_max' label 'cpus_1' tag {idSampleTN} @@ -2388,7 +2340,7 @@ vcfStrelkaBP = vcfStrelkaBP.dump(tag:'Strelka BP') // Run commands and code from Malin Larsson // Based on Jesper Eisfeldt's code process AlleleCounter { - label 'memory_singleCPU_2_task' + label 'cpus_2' tag {idSample} @@ -2711,7 +2663,6 @@ vcfKeep = Channel.empty().mix( process BcftoolsStats { - label 'forks_max' label 'cpus_1' tag {"${variantCaller} - ${vcf}"} @@ -2736,7 +2687,6 @@ bcftoolsReport = bcftoolsReport.dump(tag:'BCFTools') process Vcftools { - label 'forks_max' label 'cpus_1' tag {"${variantCaller} - ${vcf}"} @@ -3055,8 +3005,7 @@ compressVCFOutVEP = compressVCFOutVEP.dump(tag:'VCF') process MultiQC { - label 'cpus_max' - label 'memory_max' + label 'cpus_2' publishDir "${params.outdir}/MultiQC", mode: params.publishDirMode diff --git a/nextflow.config b/nextflow.config index df7843b3fa..fdef49dc5d 100644 --- a/nextflow.config +++ b/nextflow.config @@ -10,7 +10,7 @@ params { // Workflow flags annotateTools = null // Only with --step annotate - genome = 'GRCh38_PGP_UK' + genome = 'GRCh38' input = null // No default input noGVCF = null // g.vcf are produced by HaplotypeCaller noStrelkaBP = null // Strelka will use Manta candidateSmallIndels if available @@ -32,6 +32,12 @@ params { saveGenomeIndex = null // Built Indexes not saved sequencing_center = null // No sequencing center to be written in BAM header in MapReads process sentieon = null // Not using Sentieon by default + scatter_count = 30 + break_bands_at_multiples_of = 100000 + preemptible_tries = 3 + contamination = 0 + compression_level = 2 + make_bamout = false // Optional files/directory cadd_InDels = false // No CADD InDels file @@ -95,66 +101,6 @@ params { // Developmental code should specify dev process.container = 'nfcore/sarek:2.5.2' - -process { - - withLabel: forks_max { - cpus = {params.max_forks} - maxForks = 96 - } - withLabel: cpus_max { - cpus = {params.max_cpus} - maxForks = 2 - } - withLabel: memory_max { - memory = {params.max_memory} - } - - withLabel: med_resources { - cpus = { alloc_med_resource(params.max_cpus) } - //maxForks = { alloc_med_resource(params.max_forks) } - memory = { alloc_med_resource(params.max_memory) } - } - - withName: BaseRecalibratorSpark { - container = "broadinstitute/gatk:4.1.4.0" - maxForks = 64 - } - - withName: MarkDuplicatesSpark { - container = "broadinstitute/gatk:4.1.3.0" - maxForks = 2 - } - - withName: RunGenomeChronicler { - container = "lifebitai/genomechronicler:pgp-uk-5513c6f" - } - - withName: ApplyBQSRSpark { - container = "broadinstitute/gatk:4.1.4.0" - maxForks = 96 - } - - withName: HaplotypeCaller { - container = "broadinstitute/gatk:4.1.4.0" - cpus = 1 - maxForks = 96 - } - - withName: Mutect2 { - container = "broadinstitute/gatk:4.1.4.0" - } - - withName: PileupSummariesForMutect2 { - container = "broadinstitute/gatk:4.1.4.0" - } - - withName: MultiQC { - errorStrategy = 'retry' - maxRetries = 4 - } -} - // Load base.config by default for all pipelines includeConfig 'conf/base.config'