Skip to content

Commit

Permalink
convert fromSamplesheet to an operator
Browse files Browse the repository at this point in the history
  • Loading branch information
nvnieuwk committed Mar 20, 2024
1 parent 18ae17c commit 4977ec0
Showing 1 changed file with 27 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.nio.file.Path
import java.util.regex.Matcher
import java.util.regex.Pattern
import nextflow.extension.CH
import nextflow.extension.DataflowHelper
import nextflow.Channel
import nextflow.Global
import nextflow.Nextflow
Expand Down Expand Up @@ -138,73 +139,36 @@ class SchemaValidator extends PluginExtensionPoint {
public DataflowWriteChannel fromSamplesheet(
final DataflowReadChannel source,
final Path schema,
final Map options = null,
final Map options = null
) {
def Map params = session.params

// Set defaults for optional inputs
def String schemaFilename = options?.containsKey('parameters_schema') ? options.parameters_schema as String : 'nextflow_schema.json'
def String baseDir = session.baseDir.toString()

// Get the samplesheet schema from the parameters schema
def slurper = new JsonSlurper()
def Map parsed = (Map) slurper.parse( Path.of(Utils.getSchemaPath(baseDir, schemaFilename)) )
def Map samplesheetValue = (Map) findDeep(parsed, samplesheetParam)
def Path samplesheetFile = params[samplesheetParam] as Path

// Some safeguard to make sure the channel factory runs correctly
if (samplesheetValue == null) {
log.error """
Parameter '--$samplesheetParam' was not found in the schema ($schemaFilename).
Unable to create a channel from it.
Please make sure you correctly specified the inputs to `.fromSamplesheet`:
--------------------------------------------------------------------------------------
Channel.fromSamplesheet("input")
--------------------------------------------------------------------------------------
This would create a channel from params.input using the schema specified in the parameters JSON schema for this parameter.
"""
throw new SchemaValidationException("", [])
}
else if (samplesheetFile == null) {
log.error "Parameter '--$samplesheetParam' was not provided. Unable to create a channel from it."
throw new SchemaValidationException("", [])
}
else if (!samplesheetValue.containsKey('schema')) {
log.error "Parameter '--$samplesheetParam' does not contain a schema in the parameter schema ($schemaFilename). Unable to create a channel from it."
throw new SchemaValidationException("", [])
}

// Convert to channel
final channel = CH.create()
def List arrayChannel = []
try {
def Path schemaFile = Path.of(Utils.getSchemaPath(baseDir, samplesheetValue['schema'].toString()))
def SamplesheetConverter converter = new SamplesheetConverter(samplesheetFile, schemaFile)
arrayChannel = converter.convertToList()
} catch (Exception e) {
log.error(
""" Following error has been found during samplesheet conversion:
${e}
${e.getStackTrace().join("\n\t")}
Please run validateParameters() first before trying to convert a samplesheet to a channel.
Reference: https://nextflow-io.github.io/nf-schema/parameters/validation/
Also make sure that the same schema is used for validation and conversion of the samplesheet
""" as String
)
}

session.addIgniter {
def params = session.params
final target = CH.createBy(source)
final validator = new JsonSchemaValidator()
final next = {
def JSONArray samplesheet = Utils.fileToJsonArray(it as Path, schema)
def List<String> validationErrors = validator.validate(samplesheet, schema.text)
this.errors.addAll(validationErrors)
if (validationErrors) {
def Boolean useMonochromeLogs = options?.containsKey('monochrome_logs') ? options.monochrome_logs as Boolean :
params.monochrome_logs ? params.monochrome_logs as Boolean :
params.monochromeLogs ? params.monochromeLogs as Boolean :
false
def colors = logColours(useMonochromeLogs)
def msg = "${colors.red}The following errors have been detected in ${it.toString()}:\n\n" + validationErrors.join('\n').trim() + "\n${colors.reset}\n"
log.error("Validation of samplesheet failed!")
throw new SchemaValidationException(msg, this.getErrors())
}
def SamplesheetConverter converter = new SamplesheetConverter(it as Path, schema)
def List arrayChannel = converter.convertToList()
arrayChannel.each {
channel.bind(it)
target.bind(it)
}
channel.bind(Channel.STOP)
}
return channel
final done = {
target.bind(Channel.STOP)
}
DataflowHelper.subscribeImpl(source, [onNext: next, onComplete: done])
return target
}


Expand Down

0 comments on commit 4977ec0

Please sign in to comment.