From fb080cf8b6b0c11d435a2a9dffac76dcdc78005b Mon Sep 17 00:00:00 2001 From: Ryan Routsong Date: Wed, 6 Dec 2023 12:11:33 -0700 Subject: [PATCH] feat: working bclconvert rule --- scripts/config.py | 2 +- scripts/files.py | 11 +++++-- scripts/samplesheet.py | 38 +++++++++++++++--------- scripts/utils.py | 16 ++++++++++ weave | 12 ++++---- workflow/Snakefile | 4 +-- workflow/demux.smk | 66 +++++++++++++++++++++++++++++------------- workflow/qc.smk | 5 ++-- 8 files changed, 109 insertions(+), 45 deletions(-) diff --git a/scripts/config.py b/scripts/config.py index a1737a8..3c3cfd9 100644 --- a/scripts/config.py +++ b/scripts/config.py @@ -93,7 +93,7 @@ def get_resource_config(): def base_config(keys=None, qc=True): base_keys = ('runs', 'run_ids', 'project', 'rnums', 'bcl_files', \ - 'sample_sheet', 'samples', 'sids', 'out_to', 'demux_input_dir') + 'sample_sheet', 'samples', 'sids', 'out_to', 'demux_input_dir', 'bclconvert') this_config = {k: [] for k in base_keys} this_config['resources'] = get_resource_config() this_config['runqc'] = qc diff --git a/scripts/files.py b/scripts/files.py index 105ca77..ff03581 100644 --- a/scripts/files.py +++ b/scripts/files.py @@ -89,8 +89,15 @@ def is_dir_staged(server, run_dir): """ filter check for wheter or not a directory has the appropriate breadcrumbs or not - RTAComplete.txt - file transfer from instrument breadcrumb, CSV file with values: - Run Date, Run time, Instrument ID + CopyComplete.txt - file transfer from instrument breadcrumb, blank (won't be there on instruments != NextSeq2k) + + RTAComplete.txt - sequencing breadcrumb, CSV file with values: + Run Date, Run time, Instrument ID + + RunInfo.xml - XML metainformation + + Fastq Markers for DRAGEN demultiplexing + /Analysis/*/Data/fastq/*.fastq.gz """ analyzed_checks = [ Path(run_dir, 'RTAComplete.txt').exists(), diff --git a/scripts/samplesheet.py b/scripts/samplesheet.py index 61c879f..0627e16 100644 --- a/scripts/samplesheet.py +++ b/scripts/samplesheet.py @@ -1,5 +1,6 @@ from csv import DictReader from io import StringIO +from pathlib import Path from datetime import datetime from operator import itemgetter from dateutil import parser as dateparser @@ -22,6 +23,7 @@ class IllumniaSampleSheet(): """ def __init__(self, samplesheet, end=None): + self.path = Path(samplesheet).absolute() self.sheet = self.parse_sheet(samplesheet) self.force_endedness = end @@ -47,14 +49,19 @@ def parse_sheet(self, sheet): if 'Reads' in sheet_sections: self.process_simple_section(sheet_sections['Reads']) + if 'BCLConvert_Settings' in sheet_sections: + norm_names = {'AdapterRead1': 'Read01', 'AdapterRead2': 'Read02'} + self.process_simple_section(sheet_sections['BCLConvert_Settings'], rename=norm_names) + if 'Data' not in sheet_sections and 'BCLConvert_Data' in sheet_sections: sheet_sections['Data'] = sheet_sections['BCLConvert_Data'] assert 'Data' in sheet_sections, 'No sample data within this sample sheet' - self.process_csv_section(sheet_sections['Data']) + filtered_data = list(filter(lambda x: len(set(x)) > 1, sheet_sections['Data'])) + self.process_csv_section(filtered_data) - def process_simple_section(self, section): + def process_simple_section(self, section, rename=None): """Simple section processing for Illumnia sample sheet. Objective: @@ -76,6 +83,8 @@ def process_simple_section(self, section): if index == 'Date': setattr(self, index, dateparser.parse(second)) else: + if rename and index in rename: + index = rename[index] setattr(self, index, second) return @@ -99,17 +108,24 @@ def project(self): def instrument(self): return getattr(self, 'Instrument', None) + @property + def platform(self): + return getattr(self, 'InstrumentPlatform', None) + + @staticmethod + def intorlen(s): + "Cast to int if possible, otherwise get length" + try: + v = int(s) + except ValueError: + v = len(s) + return v + @property def adapters(self): r1 = getattr(self, 'Read01', None) r2 = getattr(self, 'Read02', None) - return list(map(int, filter(None, [r1, r2]))) - - @property - def indexes(self): - i1 = getattr(self, 'Index01', None) - i2 = getattr(self, 'Index02', None) - return list(map(int, filter(None, [i1, i2]))) + return list(map(self.intorlen, filter(None, [r1, r2]))) @property def is_paired_end(self): @@ -128,7 +144,3 @@ def is_single_end(self): return False else: raise ValueError('Unknown endedness from sample sheet') - - - - diff --git a/scripts/utils.py b/scripts/utils.py index 0a61734..1aca451 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -174,6 +174,8 @@ def get_mounts(*extras): if extras: for extra in extras: + if ':' in str(extra): + extra = str(extra).split(':')[0] if not Path(extra).exists(): raise FileNotFoundError(f"Can't mount {str(extra)}, it doesn't exist!") mount_binds.extend(extras) @@ -233,7 +235,13 @@ def exec_pipeline(configs, dry_run=False, local=False): for i in range(0, len(configs['run_ids'])): this_config = {k: (v[i] if k not in skip_config_keys else v) for k, v in configs.items() if v} this_config.update(profile_config) + extra_to_mount = [this_config['out_to'], this_config['demux_input_dir']] + if this_config['bclconvert']: + bclcon_log_dir = Path(this_config['out_to'], "logs", "bclconvert_demux") + if not bclcon_log_dir.exists(): + bclcon_log_dir.mkdir(mode=0o755, parents=True) + extra_to_mount.append(str(bclcon_log_dir) + ":" + "/var/log/bcl-convert:rw") singularity_binds = get_mounts(*extra_to_mount) config_file = Path(this_config['out_to'], '.config', f'config_job_{str(i)}.json').absolute() json.dump(this_config, open(config_file, 'w'), cls=PathJSONEncoder, indent=4) @@ -269,3 +277,11 @@ def exec_pipeline(configs, dry_run=False, local=False): print(' '.join(map(str, this_cmd))) exec_snakemake(this_cmd, local=local, dry_run=dry_run, env=top_env, cwd=str(Path(this_config['out_to']).absolute())) + +def is_bclconvert(samplesheet): + BCLCONVERT_INSTRUMENTS = ('VH01716',) + BCLCONVERT_PLATFORMS = ('NextSeq1k2k',) + check = False + if samplesheet.instrument in BCLCONVERT_INSTRUMENTS or samplesheet.platform in BCLCONVERT_PLATFORMS: + check = True + return check \ No newline at end of file diff --git a/weave b/weave index 87dbf9d..984929b 100755 --- a/weave +++ b/weave @@ -14,12 +14,12 @@ def run(args): for (rundir, run_infos) in runs: sample_sheet = run_infos['samplesheet'] - import ipdb; ipdb.set_trace() sample_list = [ dict(sid=sample.Sample_ID+'_S'+str(i), r1_adapter=sample.Index, r2_adapter=sample.Index2) for i, sample in enumerate(sample_sheet.samples, start=1) ] project_list = list(set([_sample.Sample_Project for _sample in sample_sheet.samples])) + if len(project_list) > 1: raise NotImplementedError("Unable to process multiple projects currently.\n" + "Please file issue if this message is blocking: https://github.com/OpenOmics/weave/issues") @@ -27,6 +27,7 @@ def run(args): pairs = ['1', '2'] if sample_sheet.is_paired_end else ['1'] # ~~~ general run configuration ~~~ + exec_config['bclconvert'].append(utils.is_bclconvert(sample_sheet)) exec_config['run_ids'].append(rundir.name) exec_config['demux_input_dir'].append(rundir.resolve()) exec_config['sids'].append([x['sid'] for x in sample_list]) @@ -45,11 +46,12 @@ def run(args): exec_config['samples'].append(sample_list) # ~~~ output verification ~~~ - files.valid_run_output(args.output, dry_run=args.dry_run) - exec_config['out_to'].append(str(Path(args.output).absolute())) - + opdir = Path(args.output, rundir.name).absolute() \ + if args.output is not None \ + else Path(Path.cwd(), 'output').absolute() + files.valid_run_output(opdir, dry_run=args.dry_run) + exec_config['out_to'].append(opdir) - import ipdb; ipdb.set_trace() utils.exec_pipeline(exec_config, dry_run=args.dry_run, local=args.local) diff --git a/workflow/Snakefile b/workflow/Snakefile index 0e7c094..b079c91 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -104,7 +104,7 @@ bcl2fastq_outputs = flatten( bclconvert_outputs = flatten( [ # ~~ NextSeq2k demultiplexing ~~ - expand("{out_to}/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args), + expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args), expand("{out_to}/demux/Reports/Demultiplex_Stats.csv", **demux_expand_args), expand("{out_to}/demux/Reports/Adapter_Metrics.csv", **demux_expand_args), expand("{out_to}/demux/.BC_DEMUX_COMPLETE", **demux_expand_args), @@ -114,7 +114,7 @@ bclconvert_outputs = flatten( if config['bclconvert']: all_outputs = bclconvert_outputs else: - all_outputs = bcl2fastq + all_outputs = bcl2fastq_outputs if config["runqc"]: diff --git a/workflow/demux.smk b/workflow/demux.smk index a413765..20f4220 100644 --- a/workflow/demux.smk +++ b/workflow/demux.smk @@ -5,7 +5,7 @@ demux_expand_args = { "rnums": config["rnums"], "sids": config['sids'], } - +demux_noop_args = dict.fromkeys(demux_expand_args.keys(), []) rule bcl2fastq: """ @@ -15,13 +15,13 @@ rule bcl2fastq: """ input: run_dir = config['demux_input_dir'], - binary_base_calls = expand("{files}", files=config['bcl_files']), - samplesheets = expand("{run}/SampleSheet.csv", run=config['demux_input_dir']) + binary_base_calls = expand("{files}", files=config['bcl_files'] if not config['bclconvert'] else demux_noop_args), + samplesheets = expand("{run}/SampleSheet.csv", run=config['demux_input_dir'] if not config['bclconvert'] else demux_noop_args) output: - seq_data = expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args), - undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args), - stats = expand("{out_to}/demux/Stats/Stats.json", **demux_expand_args), - breadcrumb = expand("{out_to}/demux/.B2F_DEMUX_COMPLETE", **demux_expand_args), + seq_data = expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if not config['bclconvert'] else demux_noop_args), + undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args if not config['bclconvert'] else demux_noop_args), + stats = expand("{out_to}/demux/Stats/Stats.json", **demux_expand_args if not config['bclconvert'] else demux_noop_args), + breadcrumb = expand("{out_to}/demux/.B2F_DEMUX_COMPLETE", **demux_expand_args if not config['bclconvert'] else demux_noop_args), params: out_dir = config["out_to"] + "/demux/", run = config["run_ids"], @@ -46,33 +46,59 @@ rule bcl2fastq: rule bclconvert: """ - bcl-convert Version 00.000.000.3.6.3 - Copyright (c) 2014-2018 Illumina, Inc. + bcl-convert Version 00.000.000.4.2.4 Run BCL Conversion (BCL directory to *.fastq.gz) bcl-convert --bcl-input-directory --output-directory [options] + + --bcl-input-directory arg Input BCL directory for BCL conversion (must be specified) + --sample-sheet arg Path to SampleSheet.csv file (default searched for in --bcl-input-directory) + --first-tile-only arg Only convert first tile of input (for testing & debugging) + --bcl-sampleproject-subdirectories arg Output to subdirectories based upon sample sheet 'Sample_Project' column + --sample-name-column-enabled arg Use sample sheet 'Sample_Name' column when naming fastq files & subdirectories + --fastq-gzip-compression-level arg Set fastq output compression level 0-9 (default 1) + --bcl-num-parallel-tiles arg # of tiles to process in parallel (default 1) + --bcl-num-conversion-threads arg # of threads for conversion (per tile, default # cpu threads) + --bcl-num-compression-threads arg # of threads for fastq.gz output compression (per tile, default # cpu threads, + or HW+12) + --bcl-num-decompression-threads arg # of threads for bcl/cbcl input decompression (per tile, default half # cpu + threads, or HW+8. Only applies when preloading files) + --bcl-only-matched-reads arg For pure BCL conversion, do not output files for 'Undetermined' [unmatched] + reads (output by default) + --no-lane-splitting arg Do not split FASTQ file by lane (false by default) + """ input: run_dir = config['demux_input_dir'], - binary_base_calls = expand("{files}", files=config['bcl_files']), - samplesheets = expand("{run}/SampleSheet.csv", run=config['demux_input_dir']) - samplesheets = expand("{run}/RunInfo.xml", run=config['demux_input_dir']) + binary_base_calls = expand("{files}", files=config['bcl_files'] if config['bclconvert'] else demux_noop_args), + samplesheets = expand("{run}/SampleSheet.csv", run=config['demux_input_dir'] if config['bclconvert'] else demux_noop_args), + runinfo = expand("{run}/RunInfo.xml", run=config['demux_input_dir'] if config['bclconvert'] else demux_noop_args), params: out_dir = config["out_to"] + "/demux/", run = config["run_ids"], project = config["project"], - mv_dir = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + mv_dir = config["out_to"] + "/demux/" + config["run_ids"] + "/" + config["project"] output: - seq_data = expand("{out_to}/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args), - undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args), - stats = expand("{out_to}/demux/Reports/Demultiplex_Stats.csv", **demux_expand_args), - metrics = expand("{out_to}/demux/Reports/Adapter_Metrics.csv", **demux_expand_args), - breadcrumb = expand("{out_to}/demux/.BC_DEMUX_COMPLETE", **demux_expand_args), - container: config["resources"]["sif"] + "weave_bclconvert_0.0.2.sif", + seq_data = expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if config['bclconvert'] else demux_noop_args), + undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args if config['bclconvert'] else demux_noop_args), + stats = expand("{out_to}/demux/Reports/Demultiplex_Stats.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args), + metrics = expand("{out_to}/demux/Reports/Adapter_Metrics.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args), + breadcrumb = expand("{out_to}/demux/.BC_DEMUX_COMPLETE", **demux_expand_args if config['bclconvert'] else demux_noop_args), + container: config["resources"]["sif"] + "weave_bclconvert_0.0.3.sif", threads: 24 shell: """ - bcl-convert --bcl-input-directory {input.run_dir} --output-directory {params.out_dir} + bcl-convert \ + --bcl-input-directory {input.run_dir} \ + --force \ + --output-directory {params.out_dir} \ + --fastq-gzip-compression-level 9 \ + --bcl-sampleproject-subdirectories true \ + --bcl-num-conversion-threads 8 \ + --bcl-num-compression-threads 8 \ + --bcl-num-decompression-threads 8 \ + --bcl-num-parallel-tiles 3 \ + --no-lane-splitting true mkdir -p {params.mv_dir} mv {params.out_dir}/*.fastq.gz {params.mv_dir} touch {output.breadcrumb} diff --git a/workflow/qc.smk b/workflow/qc.smk index 08a775f..79fc2d6 100644 --- a/workflow/qc.smk +++ b/workflow/qc.smk @@ -2,6 +2,7 @@ qc_expand_args = { "rnums": config["rnums"], "sids": config['sids'], } +demux_stats = config['out_to'] + "/demux/Reports/Demultiplex_Stats.csv" if config['bclconvert'] else config['out_to'] + "/demux/Stats/Stats.json" rule fastqc_untrimmed: @@ -44,8 +45,8 @@ rule fastqc_trimmed: rule multiqc_report: input: - # bcl2fastq - config['out_to'] + "/demux/Stats/Stats.json", + # demux status + demux_stats, # fastqc on untrimmed reads expand(config['out_to'] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastqc_untrimmed/{sids}_R{rnums}_001_fastqc.zip", **qc_expand_args), # fastqc on trimmed reads