From 40f572dc7cd57996f03ed2990d83fb2b2cf9084d Mon Sep 17 00:00:00 2001 From: Edmund Miller Date: Tue, 22 Aug 2023 20:44:17 -0500 Subject: [PATCH] feat(nf-validation): Use fromSamplesheet Borrowed code from rnaseq, had to fix the group pulling for whatever reason Removes old check_samplesheet code as well. --- assets/schema_input.json | 10 +- bin/check_samplesheet.py | 259 ----------------------------- lib/WorkflowNascent.groovy | 21 +++ modules/local/samplesheet_check.nf | 31 ---- nextflow_schema.json | 1 + subworkflows/local/input_check.nf | 44 ----- workflows/nascent.nf | 40 +++-- 7 files changed, 56 insertions(+), 350 deletions(-) delete mode 100755 bin/check_samplesheet.py delete mode 100644 modules/local/samplesheet_check.nf delete mode 100644 subworkflows/local/input_check.nf diff --git a/assets/schema_input.json b/assets/schema_input.json index 762ac215..464527ac 100644 --- a/assets/schema_input.json +++ b/assets/schema_input.json @@ -10,22 +10,26 @@ "sample": { "type": "string", "pattern": "^\\S+$", - "errorMessage": "Sample name must be provided and cannot contain spaces" + "errorMessage": "Sample name must be provided and cannot contain spaces", + "meta": ["id"] }, "fastq_1": { "type": "string", + "format": "file-path", + "exists": true, "pattern": "^\\S+\\.f(ast)?q\\.gz$", "errorMessage": "FastQ file for reads 1 must be provided, cannot contain spaces and must have extension '.fq.gz' or '.fastq.gz'" }, "fastq_2": { "errorMessage": "FastQ file for reads 2 cannot contain spaces and must have extension '.fq.gz' or '.fastq.gz'", + "type": "string", + "format": "file-path", + "exists": true, "anyOf": [ { - "type": "string", "pattern": "^\\S+\\.f(ast)?q\\.gz$" }, { - "type": "string", "maxLength": 0 } ] diff --git a/bin/check_samplesheet.py b/bin/check_samplesheet.py deleted file mode 100755 index 4a758fe0..00000000 --- a/bin/check_samplesheet.py +++ /dev/null @@ -1,259 +0,0 @@ -#!/usr/bin/env python - - -"""Provide a command line tool to validate and transform tabular samplesheets.""" - - -import argparse -import csv -import logging -import sys -from collections import Counter -from pathlib import Path - -logger = logging.getLogger() - - -class RowChecker: - """ - Define a service that can validate and transform each given row. - - Attributes: - modified (list): A list of dicts, where each dict corresponds to a previously - validated and transformed row. The order of rows is maintained. - - """ - - VALID_FORMATS = ( - ".fq.gz", - ".fastq.gz", - ) - - def __init__( - self, - sample_col="sample", - first_col="fastq_1", - second_col="fastq_2", - single_col="single_end", - **kwargs, - ): - """ - Initialize the row checker with the expected column names. - - Args: - sample_col (str): The name of the column that contains the sample name - (default "sample"). - first_col (str): The name of the column that contains the first (or only) - FASTQ file path (default "fastq_1"). - second_col (str): The name of the column that contains the second (if any) - FASTQ file path (default "fastq_2"). - single_col (str): The name of the new column that will be inserted and - records whether the sample contains single- or paired-end sequencing - reads (default "single_end"). - - """ - super().__init__(**kwargs) - self._sample_col = sample_col - self._first_col = first_col - self._second_col = second_col - self._single_col = single_col - self._seen = set() - self.modified = [] - - def validate_and_transform(self, row): - """ - Perform all validations on the given row and insert the read pairing status. - - Args: - row (dict): A mapping from column headers (keys) to elements of that row - (values). - - """ - self._validate_sample(row) - self._validate_first(row) - self._validate_second(row) - self._validate_pair(row) - self._seen.add((row[self._sample_col], row[self._first_col])) - self.modified.append(row) - - def _validate_sample(self, row): - """Assert that the sample name exists and convert spaces to underscores.""" - if len(row[self._sample_col]) <= 0: - raise AssertionError("Sample input is required.") - # Sanitize samples slightly. - row[self._sample_col] = row[self._sample_col].replace(" ", "_") - - def _validate_first(self, row): - """Assert that the first FASTQ entry is non-empty and has the right format.""" - if len(row[self._first_col]) <= 0: - raise AssertionError("At least the first FASTQ file is required.") - self._validate_fastq_format(row[self._first_col]) - - def _validate_second(self, row): - """Assert that the second FASTQ entry has the right format if it exists.""" - if len(row[self._second_col]) > 0: - self._validate_fastq_format(row[self._second_col]) - - def _validate_pair(self, row): - """Assert that read pairs have the same file extension. Report pair status.""" - if row[self._first_col] and row[self._second_col]: - row[self._single_col] = False - first_col_suffix = Path(row[self._first_col]).suffixes[-2:] - second_col_suffix = Path(row[self._second_col]).suffixes[-2:] - if first_col_suffix != second_col_suffix: - raise AssertionError("FASTQ pairs must have the same file extensions.") - else: - row[self._single_col] = True - - def _validate_fastq_format(self, filename): - """Assert that a given filename has one of the expected FASTQ extensions.""" - if not any(filename.endswith(extension) for extension in self.VALID_FORMATS): - raise AssertionError( - f"The FASTQ file has an unrecognized extension: {filename}\n" - f"It should be one of: {', '.join(self.VALID_FORMATS)}" - ) - - def validate_unique_samples(self): - """ - Assert that the combination of sample name and FASTQ filename is unique. - - In addition to the validation, also rename all samples to have a suffix of _T{n}, where n is the - number of times the same sample exist, but with different FASTQ files, e.g., multiple runs per experiment. - - """ - if len(self._seen) != len(self.modified): - raise AssertionError("The pair of sample name and FASTQ must be unique.") - seen = Counter() - for row in self.modified: - sample = row[self._sample_col] - seen[sample] += 1 - row[self._sample_col] = f"{sample}_T{seen[sample]}" - - -def read_head(handle, num_lines=10): - """Read the specified number of lines from the current position in the file.""" - lines = [] - for idx, line in enumerate(handle): - if idx == num_lines: - break - lines.append(line) - return "".join(lines) - - -def sniff_format(handle): - """ - Detect the tabular format. - - Args: - handle (text file): A handle to a `text file`_ object. The read position is - expected to be at the beginning (index 0). - - Returns: - csv.Dialect: The detected tabular format. - - .. _text file: - https://docs.python.org/3/glossary.html#term-text-file - - """ - peek = read_head(handle) - handle.seek(0) - sniffer = csv.Sniffer() - dialect = sniffer.sniff(peek) - return dialect - - -def check_samplesheet(file_in, file_out): - """ - Check that the tabular samplesheet has the structure expected by nf-core pipelines. - - Validate the general shape of the table, expected columns, and each row. Also add - an additional column which records whether one or two FASTQ reads were found. - - Args: - file_in (pathlib.Path): The given tabular samplesheet. The format can be either - CSV, TSV, or any other format automatically recognized by ``csv.Sniffer``. - file_out (pathlib.Path): Where the validated and transformed samplesheet should - be created; always in CSV format. - - Example: - This function checks that the samplesheet follows the following structure, - see also the `viral recon samplesheet`_:: - - sample,fastq_1,fastq_2 - SAMPLE_PE,SAMPLE_PE_RUN1_1.fastq.gz,SAMPLE_PE_RUN1_2.fastq.gz - SAMPLE_PE,SAMPLE_PE_RUN2_1.fastq.gz,SAMPLE_PE_RUN2_2.fastq.gz - SAMPLE_SE,SAMPLE_SE_RUN1_1.fastq.gz, - - .. _viral recon samplesheet: - https://raw.githubusercontent.com/nf-core/test-datasets/viralrecon/samplesheet/samplesheet_test_illumina_amplicon.csv - - """ - required_columns = {"sample", "fastq_1", "fastq_2"} - # See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`. - with file_in.open(newline="") as in_handle: - reader = csv.DictReader(in_handle, dialect=sniff_format(in_handle)) - # Validate the existence of the expected header columns. - if not required_columns.issubset(reader.fieldnames): - req_cols = ", ".join(required_columns) - logger.critical(f"The sample sheet **must** contain these column headers: {req_cols}.") - sys.exit(1) - # Validate each row. - checker = RowChecker() - for i, row in enumerate(reader): - try: - checker.validate_and_transform(row) - except AssertionError as error: - logger.critical(f"{str(error)} On line {i + 2}.") - sys.exit(1) - checker.validate_unique_samples() - header = list(reader.fieldnames) - header.insert(1, "single_end") - # See https://docs.python.org/3.9/library/csv.html#id3 to read up on `newline=""`. - with file_out.open(mode="w", newline="") as out_handle: - writer = csv.DictWriter(out_handle, header, delimiter=",") - writer.writeheader() - for row in checker.modified: - writer.writerow(row) - - -def parse_args(argv=None): - """Define and immediately parse command line arguments.""" - parser = argparse.ArgumentParser( - description="Validate and transform a tabular samplesheet.", - epilog="Example: python check_samplesheet.py samplesheet.csv samplesheet.valid.csv", - ) - parser.add_argument( - "file_in", - metavar="FILE_IN", - type=Path, - help="Tabular input samplesheet in CSV or TSV format.", - ) - parser.add_argument( - "file_out", - metavar="FILE_OUT", - type=Path, - help="Transformed output samplesheet in CSV format.", - ) - parser.add_argument( - "-l", - "--log-level", - help="The desired log level (default WARNING).", - choices=("CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"), - default="WARNING", - ) - return parser.parse_args(argv) - - -def main(argv=None): - """Coordinate argument parsing and program execution.""" - args = parse_args(argv) - logging.basicConfig(level=args.log_level, format="[%(levelname)s] %(message)s") - if not args.file_in.is_file(): - logger.error(f"The given input file {args.file_in} was not found!") - sys.exit(2) - args.file_out.parent.mkdir(parents=True, exist_ok=True) - check_samplesheet(args.file_in, args.file_out) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/lib/WorkflowNascent.groovy b/lib/WorkflowNascent.groovy index 40aecd91..c6aad331 100755 --- a/lib/WorkflowNascent.groovy +++ b/lib/WorkflowNascent.groovy @@ -20,6 +20,27 @@ class WorkflowNascent { } } + // + // Function to validate channels from input samplesheet + // + public static ArrayList validateInput(input) { + def (metas, fastqs) = input[1..2] + + // Check that multiple runs of the same sample are of the same strandedness + def strandedness_ok = metas.collect{ it.strandedness }.unique().size == 1 + if (!strandedness_ok) { + Nextflow.error("Please check input samplesheet -> Multiple runs of a sample must have the same strandedness!: ${metas[0].id}") + } + + // Check that multiple runs of the same sample are of the same datatype i.e. single-end / paired-end + def endedness_ok = metas.collect{ it.single_end }.unique().size == 1 + if (!endedness_ok) { + Nextflow.error("Please check input samplesheet -> Multiple runs of a sample must be of the same datatype i.e. single-end or paired-end: ${metas[0].id}") + } + + return [ metas[0], fastqs ] + } + // // Get workflow summary for MultiQC // diff --git a/modules/local/samplesheet_check.nf b/modules/local/samplesheet_check.nf deleted file mode 100644 index e882f946..00000000 --- a/modules/local/samplesheet_check.nf +++ /dev/null @@ -1,31 +0,0 @@ -process SAMPLESHEET_CHECK { - tag "$samplesheet" - label 'process_single' - - conda "conda-forge::python=3.8.3" - container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? - 'https://depot.galaxyproject.org/singularity/python:3.8.3' : - 'biocontainers/python:3.8.3' }" - - input: - path samplesheet - - output: - path '*.csv' , emit: csv - path "versions.yml", emit: versions - - when: - task.ext.when == null || task.ext.when - - script: // This script is bundled with the pipeline, in nf-core/nascent/bin/ - """ - check_samplesheet.py \\ - $samplesheet \\ - samplesheet.valid.csv - - cat <<-END_VERSIONS > versions.yml - "${task.process}": - python: \$(python --version | sed 's/Python //g') - END_VERSIONS - """ -} diff --git a/nextflow_schema.json b/nextflow_schema.json index 66896979..d17c5f61 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -17,6 +17,7 @@ "format": "file-path", "exists": true, "mimetype": "text/csv", + "schema": "assets/schema_input.json", "pattern": "^\\S+\\.csv$", "description": "Path to comma-separated file containing information about the samples in the experiment.", "help_text": "You will need to create a design file with information about the samples in your experiment before running the pipeline. Use this parameter to specify its location. It has to be a comma-separated file with 3 columns, and a header row. See [usage docs](https://nf-co.re/nascent/usage#samplesheet-input).", diff --git a/subworkflows/local/input_check.nf b/subworkflows/local/input_check.nf deleted file mode 100644 index 0aecf87f..00000000 --- a/subworkflows/local/input_check.nf +++ /dev/null @@ -1,44 +0,0 @@ -// -// Check input samplesheet and get read channels -// - -include { SAMPLESHEET_CHECK } from '../../modules/local/samplesheet_check' - -workflow INPUT_CHECK { - take: - samplesheet // file: /path/to/samplesheet.csv - - main: - SAMPLESHEET_CHECK ( samplesheet ) - .csv - .splitCsv ( header:true, sep:',' ) - .map { create_fastq_channel(it) } - .set { reads } - - emit: - reads // channel: [ val(meta), [ reads ] ] - versions = SAMPLESHEET_CHECK.out.versions // channel: [ versions.yml ] -} - -// Function to get list of [ meta, [ fastq_1, fastq_2 ] ] -def create_fastq_channel(LinkedHashMap row) { - // create meta map - def meta = [:] - meta.id = row.sample - meta.single_end = row.single_end.toBoolean() - - // add path(s) of the fastq file(s) to the meta map - def fastq_meta = [] - if (!file(row.fastq_1).exists()) { - exit 1, "ERROR: Please check input samplesheet -> Read 1 FastQ file does not exist!\n${row.fastq_1}" - } - if (meta.single_end) { - fastq_meta = [ meta, [ file(row.fastq_1) ] ] - } else { - if (!file(row.fastq_2).exists()) { - exit 1, "ERROR: Please check input samplesheet -> Read 2 FastQ file does not exist!\n${row.fastq_2}" - } - fastq_meta = [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ] - } - return fastq_meta -} diff --git a/workflows/nascent.nf b/workflows/nascent.nf index f139e710..104a1c45 100644 --- a/workflows/nascent.nf +++ b/workflows/nascent.nf @@ -4,7 +4,7 @@ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -include { paramsSummaryLog; paramsSummaryMap } from 'plugin/nf-validation' +include { paramsSummaryLog; paramsSummaryMap; fromSamplesheet } from 'plugin/nf-validation' def logo = NfcoreTemplate.logo(workflow, params.monochrome_logs) def citation = '\n' + WorkflowMain.citation(workflow) + '\n' @@ -91,31 +91,43 @@ workflow NASCENT { ch_versions = ch_versions.mix(PREPARE_GENOME.out.versions.first()) // - // SUBWORKFLOW: Read in samplesheet, validate and stage input files + // Create input channel from input file provided through params.input // - INPUT_CHECK ( - file(params.input) - ) - ch_versions = ch_versions.mix(INPUT_CHECK.out.versions) - // TODO: OPTIONAL, you can use nf-validation plugin to create an input channel from the samplesheet with Channel.fromSamplesheet("input") - // See the documentation https://nextflow-io.github.io/nf-validation/samplesheets/fromSamplesheet/ - // ! There is currently no tooling to help you write a sample sheet schema + Channel + .fromSamplesheet("input") + .map { + meta, fastq_1, fastq_2 -> + if (!fastq_2) { + return [ meta.id, meta + [ single_end:true ], [ fastq_1 ] ] + } else { + return [ meta.id, meta + [ single_end:false ], [ fastq_1, fastq_2 ] ] + } + } + .groupTuple() + .map { + WorkflowNascent.validateInput(it) + } + .map { + meta, fastqs -> + return [ meta, fastqs.flatten() ] + } + .set { ch_fastq } // // MODULE: Run FastQC // FASTQC ( - INPUT_CHECK.out.reads + ch_fastq ) ch_versions = ch_versions.mix(FASTQC.out.versions.first()) ch_reads = Channel.empty() if(!params.skip_trimming) { - FASTP ( INPUT_CHECK.out.reads, [], false, false ) + FASTP ( ch_fastq, [], false, false ) ch_reads = FASTP.out.reads ch_versions = ch_versions.mix(FASTP.out.versions.first()) } else { - ch_reads = INPUT_CHECK.out.reads + ch_reads = ch_fastq } // @@ -202,7 +214,9 @@ workflow NASCENT { ch_genome_bam.map { meta, bam -> fmeta = meta.findAll { it.key != 'read_group' } - fmeta.id = fmeta.id.split('_')[0..-3].join('_') + println fmeta + // Split and take the first element + fmeta.id = fmeta.id.split('_')[0] [ fmeta, bam ] } .groupTuple(by: [0]) .map { it -> [ it[0], it[1].flatten() ] }