Skip to content

Commit

Permalink
feat: single end support with conditional inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
rroutsong committed Dec 9, 2023
1 parent 62c3582 commit b8d1b0d
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 61 deletions.
7 changes: 7 additions & 0 deletions scripts/samplesheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, samplesheet, end=None):
self.path = Path(samplesheet).absolute()
self.sheet = self.parse_sheet(samplesheet)
self.force_endedness = end
self.validate_sheet(self.sheet)

def parse_sheet(self, sheet):
sheet_sections = dict()
Expand Down Expand Up @@ -117,6 +118,12 @@ def process_csv_section(self, section):
del row['index2']
setattr(self, 'data', csv_data)


def validate_sheet(self, sheet):
# this is the space we can do any type of sample sheet validation for running
# in our snakemake pipelines
# check values in self.sheet and continue or raise appropriate error
return

@property
def samples(self):
Expand Down
23 changes: 7 additions & 16 deletions scripts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ def exec_snakemake(popen_cmd, local=False, dry_run=False, env=None, cwd=None):


def mk_sbatch_script(wd, cmd):
if not Path(wd, 'logs', 'masterjob').exists():
Path(wd, 'logs', 'masterjob').mkdir(mode=0o755, parents=True)
master_job_script = \
"""
f"""
#!/bin/bash
#SBATCH --job-name=weave_masterjob
#SBATCH --output=./slurm/%x_%j.out
#SBATCH --error=./slurm/%x_%j.err
#SBATCH --output={wd}/logs/masterjob/%x_%j.out
#SBATCH --error={wd}/logs/masterjob/%x_%j.err
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=2
#SBATCH --time=02-00:00:00
Expand All @@ -138,7 +140,7 @@ def mk_sbatch_script(wd, cmd):
master_job_script += get_mods(init=True) + "\n"
master_job_script += cmd
master_job_script = '\n'.join([x.lstrip() for x in master_job_script.split('\n')])
master_script_location = Path(wd, 'slurm', 'master_jobscript.sh').absolute()
master_script_location = Path(wd, 'logs', 'masterjob', 'master_jobscript.sh').absolute()
master_script_location.parent.mkdir(mode=0o755, parents=True, exist_ok=True)
with open(master_script_location, 'w') as fo:
fo.write(master_job_script)
Expand Down Expand Up @@ -202,17 +204,6 @@ def get_mounts(*extras):
return "\'-B " + ','.join(mounts) + "\'"


def ensure_pe_adapters(samplesheets):
pe = []
for ss in samplesheets:
this_is_pe = [ss.is_paired_end]
for this_sample in ss.samples:
this_is_pe.append(str(this_sample.index) not in ('', None, 'nan'))
this_is_pe.append(str(this_sample.index2) not in ('', None, 'nan'))
pe.extend(this_is_pe)
return all(pe)


def exec_pipeline(configs, dry_run=False, local=False):
"""
Execute the BCL->FASTQ pipeline.
Expand Down Expand Up @@ -248,13 +239,13 @@ def exec_pipeline(configs, dry_run=False, local=False):
top_env = {}
top_env['PATH'] = os.environ["PATH"]
top_env['SNK_CONFIG'] = str(config_file.absolute())
# top_env['LOAD_MODULES'] = get_mods()
top_env['SINGULARITY_CACHEDIR'] = str(Path(this_config['out_to'], '.singularity').absolute())
this_cmd = [
"snakemake",
"-pr",
"--use-singularity",
"--rerun-incomplete",
"--keep-incomplete",
"--rerun-triggers", "mtime",
"--verbose",
"-s", snake_file,
Expand Down
14 changes: 13 additions & 1 deletion weave
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import argparse
import subprocess
from pathlib import Path
from scripts import utils, files, config, cache

Expand Down Expand Up @@ -29,7 +30,7 @@ def run(args):
# ~~~ general run configuration ~~~
exec_config['bclconvert'].append(utils.is_bclconvert(sample_sheet))
exec_config['run_ids'].append(rundir.name)
exec_config['demux_input_dir'].append(rundir.resolve())
exec_config['demux_input_dir'].append(rundir.absolute())
exec_config['sids'].append([x['sid'] for x in sample_list])
exec_config['project'].append(project_list[0])
exec_config['rnums'].append(pairs)
Expand Down Expand Up @@ -60,9 +61,15 @@ def get_cache(sub_args):
"""
Main frontend for cache execution
"""
skele_config = {k: v if not isinstance(v, list) else "" for k, v in config.base_config(qc=True).items()}
cache.download(sub_args.cachedir, local=sub_args.local)


def unlock_dir(sub_args):
workflow = config.SNAKEFILE['Illumnia']
subprocess.Popen(['snakemake', '--unlock'])


# ~~~~ argument parsing commands ~~~~
if __name__ == '__main__':
main_parser = argparse.ArgumentParser(prog='weave')
Expand Down Expand Up @@ -94,8 +101,13 @@ if __name__ == '__main__':
help='Relative or absolute path to directory for cache storage')
parser_cache.add_argument('-l', '--local', action='store_true',
help='Execute pipeline locally without a dispatching executor')

parser_unlock = sub_parsers.add_parser('unlock')
parser_unlock.add_argument('unlockdir', metavar='<directory to unlock>', type=cache.valid_dir,
help='Relative or absolute path to directory for cache storage')

parser_run.set_defaults(func = run)
parser_cache.set_defaults(func = get_cache)
parser_unlock.set_defaults(func = unlock_dir)
args = main_parser.parse_args()
args.func(args)
4 changes: 2 additions & 2 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ qa_qc_outputs = flatten(
bcl2fastq_outputs = flatten(
[
# ~~ All other Illumnia demultiplexing ~~
expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args),
expand("{out_to}/demux/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args),
expand("{out_to}/demux/Stats/Stats.json", **demux_expand_args),
expand("{out_to}/demux/.B2F_DEMUX_COMPLETE", **demux_expand_args),
]
Expand All @@ -104,7 +104,7 @@ bcl2fastq_outputs = flatten(
bclconvert_outputs = flatten(
[
# ~~ NextSeq2k demultiplexing ~~
expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args),
expand("{out_to}/demux/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args),
expand("{out_to}/demux/Reports/Demultiplex_Stats.csv", **demux_expand_args),
expand("{out_to}/demux/Reports/Adapter_Metrics.csv", **demux_expand_args),
expand("{out_to}/demux/.BC_DEMUX_COMPLETE", **demux_expand_args),
Expand Down
49 changes: 25 additions & 24 deletions workflow/demux.smk
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,35 @@ rule bcl2fastq:
Copyright (c) 2007-2017 Illumina, Inc.
"""
input:
run_dir = config['demux_input_dir'],
run_dir = config['demux_input_dir'] if not config['bclconvert'] else [],
binary_base_calls = expand("{files}", files=config['bcl_files'] if not config['bclconvert'] else demux_noop_args),
samplesheets = expand("{run}/SampleSheet.csv", run=config['demux_input_dir'] if not config['bclconvert'] else demux_noop_args)
samplesheet = config["sample_sheet"] if not config['bclconvert'] else [],
output:
seq_data = expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if not config['bclconvert'] else demux_noop_args),
seq_data = expand("{out_to}/demux/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if not config['bclconvert'] else demux_noop_args),
undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args if not config['bclconvert'] else demux_noop_args),
stats = expand("{out_to}/demux/Stats/Stats.json", **demux_expand_args if not config['bclconvert'] else demux_noop_args),
breadcrumb = expand("{out_to}/demux/.B2F_DEMUX_COMPLETE", **demux_expand_args if not config['bclconvert'] else demux_noop_args),
params:
out_dir = config["out_to"] + "/demux/",
run = config["run_ids"],
project = config["project"],
out_dir = config["out_to"] + "/demux",
container: config["resources"]["sif"] + "bcl2fastq.sif",
threads: 24
threads: 26
resources:
mem_mb = "32g",
slurm_partition = "quick",
runtime = 60*4,
tasks = 1,
disk_mb = 5*1024
shell:
r"""
mkdir -p {params.out_dir}
"""
bcl2fastq \
--sample-sheet {input.samplesheet} \
--runfolder-dir {input.run_dir} \
--min-log-level=INFO \
-r {threads} -p {threads} -w {threads} \
--no-lane-splitting -o {params.out_dir}
echo "run dir: {params.out_dir}/{params.run}/{params.project}"
echo "proj dir: {params.out_dir}/{params.project}"
mkdir -p {params.out_dir}/{params.run}
mv {params.out_dir}/{params.project} {params.out_dir}/{params.run}
--min-log-level=TRACE \
-r 8 -p 8 -w 8 \
--fastq-compression-level 9 \
--no-lane-splitting \
-o {params.out_dir}
find . > .fqlist
touch {output.breadcrumb}
"""

Expand Down Expand Up @@ -76,17 +79,17 @@ rule bclconvert:
params:
out_dir = config["out_to"] + "/demux/",
proj_dir = config["out_to"] + "/demux/" + config["project"],
mv_dir = config["out_to"] + "/demux/" + config["run_ids"] + "/" + config["project"],
output:
seq_data = expand("{out_to}/demux/{rid}/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if config['bclconvert'] else demux_noop_args),
seq_data = expand("{out_to}/demux/{project}/{sids}_R{rnums}_001.fastq.gz", **demux_expand_args if config['bclconvert'] else demux_noop_args),
undetermined = expand("{out_to}/demux/Undetermined_S0_R{rnums}_001.fastq.gz", **demux_expand_args if config['bclconvert'] else demux_noop_args),
stats = expand("{out_to}/demux/Reports/Demultiplex_Stats.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args),
ametrics = expand("{out_to}/demux/Reports/Quality_Metrics.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args),
qmetrics = expand("{out_to}/demux/Reports/Adapter_Metrics.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args),
top_unknown = expand("{out_to}/demux/Reports/Top_Unknown_Barcodes.csv", **demux_expand_args if config['bclconvert'] else demux_noop_args),
breadcrumb = expand("{out_to}/demux/.BC_DEMUX_COMPLETE", **demux_expand_args if config['bclconvert'] else demux_noop_args),
container: config["resources"]["sif"] + "weave_bclconvert_0.0.3.sif",
threads: 28
log: config["out_to"] + "/logs/bclconvert/" + config["run_ids"] + "_" + config["project"] + ".log",
threads: 75
resources: mem_mb = int(64e3)
shell:
"""
Expand All @@ -96,12 +99,10 @@ rule bclconvert:
--output-directory {params.out_dir} \
--fastq-gzip-compression-level 9 \
--bcl-sampleproject-subdirectories true \
--bcl-num-conversion-threads 8 \
--bcl-num-compression-threads 8 \
--bcl-num-decompression-threads 8 \
--bcl-num-conversion-threads 24 \
--bcl-num-compression-threads 24 \
--bcl-num-decompression-threads 24 \
--bcl-num-parallel-tiles 3 \
--no-lane-splitting true
mkdir -p {params.mv_dir}
mv {params.proj_dir}/*.fastq.gz {params.mv_dir}
touch {output.breadcrumb}
"""
34 changes: 17 additions & 17 deletions workflow/fastq.smk
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
rule trim_w_fastp:
input:
in_read1 = config["out_to"] + "/demux/" + config["run_ids"] + "/" + config["project"] + "/{sids}_R1_001.fastq.gz",
in_read2 = config["out_to"] + "/demux/" + config["run_ids"] + "/" + config["project"] + "/{sids}_R2_001.fastq.gz",
in_read1 = config["out_to"] + "/demux/" + config["project"] + "/{sids}_R1_001.fastq.gz",
in_read2 = config["out_to"] + "/demux/" + config["project"] + "/{sids}_R2_001.fastq.gz" if len(config['rnums']) == 2 else [],
output:
html = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastp/{sids}.html",
json = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastp/{sids}_fastp.json",
out_read1 = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R1.fastq.gz",
out_read2 = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz",
out_read2 = config["out_to"] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz" if len(config['rnums']) == 2 else [],
containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.1.sif"
threads: 4,
params:
read_args = lambda _, output, input: f"--in2 {input.in_read2} --out2 {_output.out_read2} --detect_adapter_for_pe""" if len(config['rnums']) == 2 else ""
resources: mem_mb = 8192,
log: config["out_to"] + "/logs/" + config["run_ids"] + "/" + config["project"] + "/fastp/{sids}.log",
shell:
"""
fastp \
--detect_adapter_for_pe \
--in1 {input.in_read1} --in2 {input.in_read2} \
--out1 {output.out_read1} \
--out2 {output.out_read2} \
--in1 {input.in_read1} --out1 {output.out_read1} {params.read_args} \
--html {output.html} \
--json {output.json} \
"""
Expand Down Expand Up @@ -54,50 +53,52 @@ rule fastq_screen:
rule kaiju_annotation:
input:
read1 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R1.fastq.gz",
read2 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz",
read2 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz" if len(config['rnums']) == 2 else [],
output:
kaiju_report = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}.tsv",
kaiju_order = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}_order.tsv",
kaiju_family = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}_family.tsv",
kaiju_species = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}_species.tsv",
kaiju_phylum = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}_phylum.tsv",
kaiju_genus = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kaiju/{sids}_genus.tsv",
params:
nodes = config["resources"]["mounts"]["kaiju"]["to"] + "/nodes.dmp",
names = config["resources"]["mounts"]["kaiju"]["to"] + "/names.dmp",
database = config["resources"]["mounts"]["kaiju"]["to"] + "/kaiju_db_nr_euk.fmi",
reads_in_arg = lambda wc, input, output: f"-j {input.read1} -i {input.read2}" if input.read2 else f"-j {input.read1}",
containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.1.sif"
log: config['out_to'] + "/logs/" + config['run_ids'] + "/" + config["project"] + "/kaiju/{sids}.log",
threads: 24
resources:
mem_mb = 220000,
mem_mb = 220000,
runtime = 60*24*2
shell:
"""
kaiju \
-t {params.nodes} \
-f {params.database} \
-i {input.read2} \
-j {input.read1} \
{params.reads_in_arg} \
-z {threads} \
-o {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r species -o {output.kaiju_species} {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r phylum -o {output.kaiju_phylum} {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r family -o {output.kaiju_order} {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r order -o {output.kaiju_family} {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r genus -o {output.kaiju_family} {output.kaiju_report}
kaiju2table -t {params.nodes} -n {params.names} -r genus -o {output.kaiju_genus} {output.kaiju_report}
"""


rule kraken_annotation:
input:
read1 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R1.fastq.gz",
read2 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz",
read2 = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/fastp/{sids}_trimmed_R2.fastq.gz" if len(config['rnums']) == 2 else [],
output:
kraken_report = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kraken/{sids}.tsv",
kraken_log = config['out_to'] + "/" + config['run_ids'] + "/" + config["project"] + "/{sids}/kraken/{sids}.log",
params:
kraken_db = config["resources"]["mounts"]["kraken2"]["to"]
containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.1.sif"
kraken_db = config["resources"]["mounts"]["kraken2"]["to"],
reads_in_arg = lambda wc, input, output: f"{input.read1} {input.read2}" if input.read2 else f"{input.read1}",
containerized: config["resources"]["sif"] + "weave_ngsqc_0.0.1.sif",
log: config['out_to'] + "/logs/" + config['run_ids'] + "/" + config["project"] + "/kraken/{sids}.log",
threads: 24
resources:
Expand All @@ -111,6 +112,5 @@ rule kraken_annotation:
--gzip-compressed --paired \
--report {output.kraken_report} \
--output {output.kraken_log} \
{input.read1} \
{input.read2}
{params.reads_in_arg}
"""
2 changes: 1 addition & 1 deletion workflow/qc.smk
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ demux_stats = config['out_to'] + "/demux/Reports/Demultiplex_Stats.csv" if confi

rule fastqc_untrimmed:
input:
samples = config['out_to'] + "/demux/" + config["run_ids"] + "/" + config["project"] + "/{sids}_R{rnums}_001.fastq.gz",
samples = config['out_to'] + "/demux/" + config["project"] + "/{sids}_R{rnums}_001.fastq.gz",
output:
html = config['out_to'] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastqc_untrimmed/{sids}_R{rnums}_001_fastqc.html",
fqreport = config['out_to'] + "/" + config["run_ids"] + "/" + config["project"] + "/{sids}/fastqc_untrimmed/{sids}_R{rnums}_001_fastqc.zip",
Expand Down

0 comments on commit b8d1b0d

Please sign in to comment.