Skip to content

Commit

Permalink
Squashed 'nevermore/' changes from 37e095d..4f788dd
Browse files Browse the repository at this point in the history
4f788dd Import/from nevermore 20221125x (#15)
9d4b6f0 Import/from nevermore 20221124 (#12)
583cad1 Import/from krazy 20221124 (#13)
d0c3514 Import/from vknight 20221114c (#10)
8859aff Import/from metaphlow 20221102 (#9)
f07fbd0 Import/from krazy 20221018 (#8)
c76b946 Import/from krazy 20220930 (#7)
86dd73d Import/from krazy 20220929 (#6)
f6ddfd4 Import/from gqf 20220919 (#5)

git-subtree-dir: nevermore
git-subtree-split: 4f788dd1242a6af1cae5f36ed33271809c2bf606
  • Loading branch information
cschu committed Jan 13, 2023
1 parent 37e095d commit 5a57864
Show file tree
Hide file tree
Showing 23 changed files with 658 additions and 194 deletions.
2 changes: 1 addition & 1 deletion bin/collate_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def write_output(readcounts, flagstats, outfile):
# line = [sample, readcounts[sample].get("paired_end", False), readcounts[sample].get("single_end", False), sum(v for k, v in readcounts[sample].items() if k[0] == ".")]
# line += (readcounts[sample].get(key, 0) for key in (".main", ".singles", ".orphans", ".chimeras"))
line += tuple(
flagstats[sample].get(key, 0)
flagstats.get(sample, {}).get(key, 0)
for key in (
"n_alignments", "primary_alignments",
"secondary_alignments", "npaired_mapped", "nsingles_mapped"
Expand Down
308 changes: 243 additions & 65 deletions bin/prepare_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,119 +2,297 @@

import argparse
import itertools
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys

from collections import Counter

def check_pairwise(r1, r2):
pairs = {}
for p1, p2 in itertools.product(
tuple(p[:-1] for p, _ in r1),
tuple(p[:-1] for p, _ in r2)
):
pairs.setdefault(p1, set()).add(1)
pairs.setdefault(p2, set()).add(2)

for p, counts in pairs.items():
if len(counts) < 2:
raise ValueError(f"Missing mates for prefix {p}, files={str(counts)}")
elif len(counts) > 2:
raise ValueError(f"Too many files for prefix {p}, files={str(counts)}")
else:
...

logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(message)s'
)

logger = logging.getLogger(__name__)


def process_sample(input_dir, sample_id, fastqs, output_dir, remove_suffix=None):
def check_pairwise(r1, r2):
""" Checks if two sets of read files contain the same prefixes.
Input:
- r1/r2: lists of tuples (prefix, filename) of R1/R2 paired-end read files
print("#!/usr/bin/bash")
Raises error if one prefix is not found in either r1 or r2.
print("RESUF", remove_suffix, file=sys.stderr)
"""
r1_r2 = tuple(prefix[:-1] for prefix, _ in itertools.chain(r1, r2))
for prefix in set(r1_r2):
if r1_r2.count(prefix) != 2:
raise ValueError(f"Missing mates for prefix {prefix}.")

if len(fastqs) == 1:
sample_id = re.sub(r"[._]singles?", "", sample_id) + ".singles"
pathlib.Path(os.path.join(output_dir, sample_id)).mkdir(parents=True, exist_ok=True)
print(f"cp {os.path.join(input_dir, fastqs[0])} {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz")

def transfer_file(source, dest, remote_input=False):
""" Transfers a file depending on its location and compressed state.
Input:
- path to source file
- path to destination file
- whether source file is considered to be located on a remote file system
"""
resolved_src = pathlib.Path(source).resolve()

if os.path.splitext(source)[1][1:] in ("gz", "bz2"):
if remote_input:
# if file is on remote file system, copy it to destination
logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=copy', source, dest, remote_input)
shutil.copyfile(resolved_src, dest)
else:
# if file is compressed and on local fs, just symlink it
logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=symlink', source, dest, remote_input)
pathlib.Path(dest).symlink_to(resolved_src)
else:
prefixes = [re.sub(r"\.(fastq|fq).gz$", "", f) for f in fastqs]
# if file is not compressed, gzip it to destination
logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=gzip', source, dest, remote_input)
with open(dest, "wt") as _out:
subprocess.run(("gzip", "-c", resolved_src), stdout=_out)


def transfer_multifiles(files, dest, remote_input=False, compression=None):
""" Transfers a set of files depending on their location and compressed state.
Input:
- list of source file paths
- path to destination file
- whether source files are considered to be located on a remote file system
- the compression type of the files (supported: None, gz, bz2)
"""

if len(files) > 1:
src_files = tuple(os.path.abspath(f) for f in files) # tuple(f.resolve() for f in files)
cat_cmd = ("cat", ) + src_files

if compression in ("gz", "bz2"):
# multiple compressed files can just be concatenated
logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate', compression, remote_input)
with open(dest, "wt") as _out:
subprocess.run(cat_cmd, stdout=_out)
else:
# multiple uncompressed files will be cat | gzipped
logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate+gzip', compression, remote_input)
cat_pr = subprocess.Popen(cat_cmd, stdout=subprocess.PIPE)
with open(dest, "wt") as _out:
subprocess.run(("gzip", "-c", "-"), stdin=cat_pr.stdout, stdout=_out)

else:
logging.debug('transfer_multifiles: single file, source=%s, dest=%s, remote_input=%s, action=defer->transfer_file', files[0], dest, remote_input)
transfer_file(files[0], dest, remote_input=remote_input)


def process_sample(
sample, fastqs, output_dir,
fastq_suffix_pattern,
remove_suffix=None, remote_input=False,
):
""" Checks if a set of fastq files in a directory is a valid collection
and transfers files to a destination dir upon success.
Input:
- sample_id
- list of fastq files
- path to output directory
- suffix to strip off from filenames (e.g. _001)
- whether fastq files are located on remote file system
"""

if not fastqs:
...
elif len(fastqs) == 1:
# remove potential "single(s)" string from single fastq file name prefix
sample_sub = re.sub(r"[._]singles?", "", sample)
# 20221018: and attach it at the end of the sample name
# - this might be a temporary fix, but @93a73d0
# single-end samples without .singles-suffix cause problems
# with fastqc results in the collate step
sample = sample_sub + ".singles"
sample_dir = os.path.join(output_dir, sample)
pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True)

dest_compression = fastqs[0][fastqs[0].rfind(".") + 1:]

dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_file(fastqs[0], dest, remote_input=remote_input)

elif fastqs:

# check if all fastq files are compressed the same way
suffixes = Counter(
f[f.rfind("."):] in (".gz", ".bz2") for f in fastqs
)

if len(suffixes) > 1:
raise ValueError(f"sample: {sample} has mixed compressed and uncompressed input files. Please check.")

if suffixes.most_common()[0][0]:
# all compressed
suffixes = Counter(
f[f.rfind(".") + 1:] for f in fastqs
)
if len(suffixes) > 1:
raise ValueError(f"sample: {sample} has mixed gzip and bzip2 files. Please check.")
dest_compression = compression = suffixes.most_common()[0][0]
else:
dest_compression, compression = "gz", None

# extract the file name prefixes
prefixes = [re.sub(fastq_suffix_pattern, "", os.path.basename(f)) for f in fastqs]
if remove_suffix:
# remove suffix pattern if requested
prefixes = [re.sub(remove_suffix + r"$", "", p) for p in prefixes]

print("PRE", prefixes, file=sys.stderr)

r1 = [(p, f) for p, f in zip(prefixes, fastqs) if p.endswith("1")]
r2 = [(p, f) for p, f in zip(prefixes, fastqs) if p.endswith("2")]
others = set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2})
# partition fastqs into R1, R2, and 'other' sets
r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)]
r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)]
others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2})))

assert len(r2) == 0 or len(r1) == len(r2), "R1/R2 sets are not of the same length"
check_pairwise(r1, r2)
# check if R1/R2 sets have equal sizes or are empty
# R1 empty: potential scRNAseq (or any protocol with barcode reads in R1)
# R2 empty: typical single end reads with (R?)1 suffix
assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length"

r1 = [os.path.join(input_dir, f) for f in sorted(f for _, f in r1)]
r2 = [os.path.join(input_dir, f) for f in sorted(f for _, f in r2)]
# if R1 and R2 are of equal size, check if the prefixes match
if len(r1) == len(r2) and r1:
check_pairwise(r1, r2)

# sort R1/R2 for concatenation, get rid off prefixes
r1 = sorted(f for _, f in r1)
r2 = sorted(f for _, f in r2)

print("R1", r1, file=sys.stderr)
print("R2", r2, file=sys.stderr)
print("others", others, file=sys.stderr, flush=True)

r1_cmd = " ".join(["cat"] + r1) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz"
print(r1_cmd)

if r2:
r2_cmd = " ".join(["cat"] + r2) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R2.fastq.gz"
print(r2_cmd)

if others:
sample_id = sample_id + ".singles"
pathlib.Path(os.path.join(output_dir, sample_id)).mkdir(parents=True, exist_ok=True)
other_cmd = " ".join(["cat"] + list(others)) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz"
print(other_cmd)


# ['HD_Path_4_S81_L001_R1_001', 'HD_Path_4_S81_L001_R2_001', 'HD_Path_4_S81_L002_R1_001', 'HD_Path_4_S81_L002_R2_001']
# ['HD_Path_4_S81_L001_R1', 'HD_Path_4_S81_L001_R2', 'HD_Path_4_S81_L002_R1', 'HD_Path_4_S81_L002_R2']

sample_dir = os.path.join(output_dir, sample)

if r1 or r2:

pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True)

if r1:
# if R1 is not empty, transfer R1-files
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression)
if r2:
# if R2 is not empty, transfer R2-files,
# if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end
target_r = "R2" if r1 else "R1"
dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}")
transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression)

if others:
# if single-end reads exist,
# transfer them to <sample>.singles
# these will be processed independently and merged with the paired-end reads
# at a later stage
sample_dir = sample_dir + ".singles"
pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True)
dest = os.path.join(sample_dir, f"{sample}.singles_R1.fastq.{dest_compression}")
transfer_multifiles(others, dest, remote_input=remote_input, compression=compression)


def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes):
""" Checks if a file is a fastq file (compressed or uncompressed.)
Input:
- filename
Output:
- true if file is fastq else false
"""
prefix, suffix = os.path.splitext(f)
if suffix in valid_fastq_suffixes:
return True
if suffix in valid_compression_suffixes:
_, suffix = os.path.splitext(prefix)
return suffix in valid_fastq_suffixes
return False


def main():

ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input_dir", type=str, default=".")
ap.add_argument("-o", "--output_dir", type=str, default="prepared_samples")
ap.add_argument("-s", "--sample_id", type=str, required=True)
ap.add_argument("-p", "--prefix", type=str, required=True)
ap.add_argument("--remote-input", action="store_true")
ap.add_argument("--remove-suffix", type=str, default=None)
ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq")
ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2")

args = ap.parse_args()

pathlib.Path(os.path.join(args.output_dir, args.sample_id)).mkdir(parents=True, exist_ok=True)
valid_fastq_suffixes = tuple(f".{suffix}" for suffix in args.valid_fastq_suffixes.split(","))
print(valid_fastq_suffixes)
valid_compression_suffixes = tuple(f".{suffix}" for suffix in args.valid_compression_suffixes.split(","))
print(valid_compression_suffixes)

fastq_file_suffix_pattern = r"[._](" + \
args.valid_fastq_suffixes.replace(",", "|") + \
")([._](" + \
args.valid_compression_suffixes.replace(",", "|") + \
"))?$"

walk = os.walk(args.input_dir)
try:
cwd, dirs, files = next(walk)
except StopIteration:
raise StopIteration(f"Invalid directory: {args.input_dir}")
fastq_mate_pattern = r"([._R][12])$"

fastqs = sorted(f for f in files if f.endswith(".fq.gz") or f.endswith(".fastq.gz"))
assert fastqs, "Could not find any fastq files."
def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffixes):
return sorted(
os.path.join(input_dir, f)
for f in os.listdir(input_dir)
if is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes)
)

print(cwd, fastqs, file=sys.stderr)
try:
process_sample(cwd, args.sample_id, fastqs, args.output_dir, remove_suffix=args.remove_suffix)
except Exception as e:
raise ValueError(f"Encountered problems processing sample '{args.sample_id}': {e}.\nPlease check your file names.")



pwd, dirs, _ = next(os.walk(args.input_dir))
except StopIteration:
raise ValueError(f"Could not find input directory {args.input_dir} ({os.path.abspath(args.input_dir)})")

samples = {}
pathlib.Path(args.output_dir).mkdir(parents=True, exist_ok=True)

for sample_dir in dirs:
sample, sample_dir = sample_dir, os.path.join(pwd, sample_dir)

samples.setdefault(sample, []).extend(
collect_fastq_files(sample_dir, valid_fastq_suffixes, valid_compression_suffixes)
)

root_fastqs = collect_fastq_files(args.input_dir, valid_fastq_suffixes, valid_compression_suffixes)

if samples and root_fastqs:
raise ValueError("Found {len(root_fastqs)} fastq files in input directory together with {len(samples)} sample directories. Please check input data.")
elif root_fastqs:
for f in root_fastqs:
sample = re.sub(fastq_file_suffix_pattern, "", os.path.basename(f))
sample = re.sub(fastq_mate_pattern, "", sample)
samples.setdefault(sample, []).append(f)

# check and transfer the files
for sample, fastqs in samples.items():
try:
process_sample(
sample, fastqs, args.output_dir,
fastq_file_suffix_pattern,
remove_suffix=args.remove_suffix, remote_input=args.remote_input
)
except Exception as e:
raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.")


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions modules/align/bwa.nf
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ process bwa_mem_align {
def sort_reads2 = (sample.is_paired) ? "sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R2.fastq.gz out=${sample.id}_R2.sorted.fastq.gz" : ""
def blocksize = "-K 10000000" // shamelessly taken from NGLess

def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam -" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -"
def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -"

"""
set -e -o pipefail
mkdir -p tmp/
sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=${sample.id}_R1.sorted.fastq.gz
${sort_reads2}
bwa mem -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${sample.id}_R1.sorted.fastq.gz ${reads2} | samtools view -F 4 -buSh - | ${sort_cmd}
rm -rvf tmp/ *.sorted.fastq.gz
"""
// samtools sort -@ ${sort_cpus} -o ${sample.id}.bam

}
Loading

0 comments on commit 5a57864

Please sign in to comment.