From 4977ec097afac7fdc047c87fd4358f3858fa2320 Mon Sep 17 00:00:00 2001 From: Nicolas Vannieuwkerke Date: Wed, 20 Mar 2024 16:01:36 +0100 Subject: [PATCH] convert fromSamplesheet to an operator --- .../validation/SchemaValidator.groovy | 90 ++++++------------- 1 file changed, 27 insertions(+), 63 deletions(-) diff --git a/plugins/nf-schema/src/main/nextflow/validation/SchemaValidator.groovy b/plugins/nf-schema/src/main/nextflow/validation/SchemaValidator.groovy index 5ab0573f..646ea620 100644 --- a/plugins/nf-schema/src/main/nextflow/validation/SchemaValidator.groovy +++ b/plugins/nf-schema/src/main/nextflow/validation/SchemaValidator.groovy @@ -12,6 +12,7 @@ import java.nio.file.Path import java.util.regex.Matcher import java.util.regex.Pattern import nextflow.extension.CH +import nextflow.extension.DataflowHelper import nextflow.Channel import nextflow.Global import nextflow.Nextflow @@ -138,73 +139,36 @@ class SchemaValidator extends PluginExtensionPoint { public DataflowWriteChannel fromSamplesheet( final DataflowReadChannel source, final Path schema, - final Map options = null, + final Map options = null ) { - def Map params = session.params - - // Set defaults for optional inputs - def String schemaFilename = options?.containsKey('parameters_schema') ? options.parameters_schema as String : 'nextflow_schema.json' - def String baseDir = session.baseDir.toString() - - // Get the samplesheet schema from the parameters schema - def slurper = new JsonSlurper() - def Map parsed = (Map) slurper.parse( Path.of(Utils.getSchemaPath(baseDir, schemaFilename)) ) - def Map samplesheetValue = (Map) findDeep(parsed, samplesheetParam) - def Path samplesheetFile = params[samplesheetParam] as Path - - // Some safeguard to make sure the channel factory runs correctly - if (samplesheetValue == null) { - log.error """ -Parameter '--$samplesheetParam' was not found in the schema ($schemaFilename). -Unable to create a channel from it. - -Please make sure you correctly specified the inputs to `.fromSamplesheet`: - --------------------------------------------------------------------------------------- -Channel.fromSamplesheet("input") --------------------------------------------------------------------------------------- - -This would create a channel from params.input using the schema specified in the parameters JSON schema for this parameter. -""" - throw new SchemaValidationException("", []) - } - else if (samplesheetFile == null) { - log.error "Parameter '--$samplesheetParam' was not provided. Unable to create a channel from it." - throw new SchemaValidationException("", []) - } - else if (!samplesheetValue.containsKey('schema')) { - log.error "Parameter '--$samplesheetParam' does not contain a schema in the parameter schema ($schemaFilename). Unable to create a channel from it." - throw new SchemaValidationException("", []) - } - - // Convert to channel - final channel = CH.create() - def List arrayChannel = [] - try { - def Path schemaFile = Path.of(Utils.getSchemaPath(baseDir, samplesheetValue['schema'].toString())) - def SamplesheetConverter converter = new SamplesheetConverter(samplesheetFile, schemaFile) - arrayChannel = converter.convertToList() - } catch (Exception e) { - log.error( - """ Following error has been found during samplesheet conversion: - ${e} - ${e.getStackTrace().join("\n\t")} - -Please run validateParameters() first before trying to convert a samplesheet to a channel. -Reference: https://nextflow-io.github.io/nf-schema/parameters/validation/ - -Also make sure that the same schema is used for validation and conversion of the samplesheet -""" as String - ) - } - - session.addIgniter { + def params = session.params + final target = CH.createBy(source) + final validator = new JsonSchemaValidator() + final next = { + def JSONArray samplesheet = Utils.fileToJsonArray(it as Path, schema) + def List validationErrors = validator.validate(samplesheet, schema.text) + this.errors.addAll(validationErrors) + if (validationErrors) { + def Boolean useMonochromeLogs = options?.containsKey('monochrome_logs') ? options.monochrome_logs as Boolean : + params.monochrome_logs ? params.monochrome_logs as Boolean : + params.monochromeLogs ? params.monochromeLogs as Boolean : + false + def colors = logColours(useMonochromeLogs) + def msg = "${colors.red}The following errors have been detected in ${it.toString()}:\n\n" + validationErrors.join('\n').trim() + "\n${colors.reset}\n" + log.error("Validation of samplesheet failed!") + throw new SchemaValidationException(msg, this.getErrors()) + } + def SamplesheetConverter converter = new SamplesheetConverter(it as Path, schema) + def List arrayChannel = converter.convertToList() arrayChannel.each { - channel.bind(it) + target.bind(it) } - channel.bind(Channel.STOP) } - return channel + final done = { + target.bind(Channel.STOP) + } + DataflowHelper.subscribeImpl(source, [onNext: next, onComplete: done]) + return target }