From b664503f9dbe89c282b0800ee4197f64d35f8094 Mon Sep 17 00:00:00 2001 From: Alastair Smith <49727900+alsmith151@users.noreply.github.com> Date: Tue, 19 Sep 2023 13:44:05 +0100 Subject: [PATCH] chore: clean-up repo (#196) * chore: removed sm file * chore: removed sh requirement * chore: removed natsort * chore: removed more-itertools * chore: removed pyyamml * chore: updated environment.yml minimum versions * docs: updated readme docs badge * docs: updated readme notes * chore: removed setup.cfg * fix: added dynamic dependencies to pyproject.toml * fix: added back more-itertools requirement * chore(packaging): added entry point * chore: updated pyarrow requirement * feat: added full install option * chore: updated requirements * docs: "full" install instruction added to docs * fix(docs): cluster_config.md missing from index * chore: workflow now installs with "full" option * fix: cookiecutter missing from requirements * fix: cookiecutter added to environment.yml * chore(tests): remove old tests * fix(packaging): lowered pyarrow max version * feat(api): removed deduplicate and digest * feat: deduplicate now a single command * feat(cli-fastq): removed deduplicate subcommands * feat(cli): corrected fastq dedup for cct * feat(cli-fastq-dedup): updated dedup function * feat: corrected stats for digest_fastq * fix: corrected genome digestion tests * fix: corrected fastq deduplicate * chore: capcruncher tools and tracknado now deps * fix(tests): corrected deduplicate test * fix(fastq_deduplicate): removed old code * fix: corrected digestion stats output * chore: capcruncher_tools now >= 0.1.8 * feat: integrated capcruncher_tools count * fix: removed unwanted code * fix: typo in output path * fix: corrected small integration pipeline issues * chore(pipeline): not checking for cct install * fix: corrected digestion read_stats * chore: cct to >= 0.1.9 * ci: added macos to test os * fix: removed cct specific branch in stats * fix: added os specific dependency install * ci: removed ray from environment. Not on conda for macos * fix: removed fastq duplicate testing * fix(ci): installing coreutils for macos * fix(macos): using gsplit and gzcat vs split/zcat * fix(fastq digest): missing restriction site Using get_restriction_site() to correct * fix(genome-digestion): added rs identification * fix(tests): corrected deduplication test * fix(ci): ignoring macos for now * fix(tests): removed concurrent futures * feat(fastq): make output directory first * chore(coverage): delete files to boost coverage --- .github/workflows/CI.yml | 13 +- README.md | 33 +- capcruncher/api/count.py | 125 ---- capcruncher/api/deduplicate.py | 247 ------- capcruncher/api/digest.py | 427 ------------ capcruncher/api/io.py | 431 ------------ capcruncher/cli/cli_fastq.py | 188 ++---- capcruncher/cli/cli_genome.py | 3 + capcruncher/cli/cli_interactions.py | 55 +- capcruncher/cli/cli_utilities.py | 111 ++- capcruncher/cli/fastq_deduplicate.py | 226 +------ capcruncher/cli/fastq_digest.py | 207 ++---- capcruncher/cli/fastq_split.py | 7 + capcruncher/cli/genome_digest.py | 60 +- capcruncher/cli/interactions_count.py | 182 +---- capcruncher/pipeline/workflow/Snakefile | 8 - .../pipeline/workflow/rules/digest.smk | 3 +- capcruncher/pipeline/workflow/rules/fastq.smk | 220 ++---- .../pipeline/workflow/rules/pileup.smk | 98 +-- .../pipeline/workflow/rules/statistics.smk | 36 +- capcruncher/utils.py | 47 +- codecov.yml | 2 +- conftest.py | 24 + docs/installation.md | 8 +- environment.yml | 13 +- mkdocs.yml | 1 + pyproject.toml | 9 +- requirements.txt | 7 +- setup.cfg | 45 -- sm | 91 --- tests/data/fastq_digestion/chrom_to_digest.fa | 2 - .../fastq_digestion/chrom_to_digest.fa.gz | Bin 0 -> 79 bytes tests/old/need_fixing/test_cli.py | 633 ------------------ tests/old/need_fixing/test_pileup.py | 34 - tests/old/need_fixing/test_plotting.py | 0 tests/old/need_fixing/test_slice_filtering.py | 53 -- tests/old/need_fixing/test_storage.py | 147 ---- tests/old/need_fixing/test_utils.py | 65 -- tests/old/test_pipeline.py | 292 -------- tests/test_cli.py | 99 +-- tests/test_deduplication.py | 117 ---- tests/test_digest.py | 213 +----- tests/test_io.py | 104 --- tests/test_pipeline.py | 2 - tests/test_plotting.py | 3 +- 45 files changed, 529 insertions(+), 4162 deletions(-) delete mode 100644 capcruncher/api/count.py delete mode 100644 capcruncher/api/deduplicate.py delete mode 100644 capcruncher/api/digest.py delete mode 100644 setup.cfg delete mode 100644 sm delete mode 100644 tests/data/fastq_digestion/chrom_to_digest.fa create mode 100644 tests/data/fastq_digestion/chrom_to_digest.fa.gz delete mode 100644 tests/old/need_fixing/test_cli.py delete mode 100644 tests/old/need_fixing/test_pileup.py delete mode 100644 tests/old/need_fixing/test_plotting.py delete mode 100644 tests/old/need_fixing/test_slice_filtering.py delete mode 100644 tests/old/need_fixing/test_storage.py delete mode 100644 tests/old/need_fixing/test_utils.py delete mode 100644 tests/old/test_pipeline.py delete mode 100644 tests/test_deduplication.py delete mode 100644 tests/test_io.py diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index ca3e270b..0059e836 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -7,9 +7,10 @@ env: jobs: install-and-test: - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} strategy: matrix: + os: [ubuntu-latest] python-version: ["3.10"] steps: @@ -20,11 +21,17 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Install OS dependencies + - name: Install Linux dependencies + if: matrix.os == 'ubuntu-latest' run: | sudo apt-get update sudo apt-get install libcurl4-openssl-dev + # - name: Install Mac dependencies + # if: matrix.os == 'macos-latest' + # run: | + # brew install curl-openssl coreutils + - name: Restore bowtie2 cache uses: actions/cache@v3 with: @@ -69,7 +76,7 @@ jobs: - name: Install the package shell: bash -el {0} run: | - pip install .[stats,plotting,experimental] + pip install .[full] - name: Test with pytest and generate report shell: bash -el {0} diff --git a/README.md b/README.md index 5d40c808..065739b2 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,26 @@ # CapCruncher -[![Documentation](https://github.com/sims-lab/CapCruncher/actions/workflows/docs.yml/badge.svg?branch=master)](https://github.com/sims-lab/CapCruncher/actions/workflows/docs.yml) +[![Documentation](https://github.com/sims-lab/CapCruncher/actions/workflows/docs.yml/badge.svg?branch=master)](https://sims-lab.github.io/CapCruncher/) [![Codecov](https://codecov.io/gh/sims-lab/CapCruncher/branch/master/graph/badge.svg?token=RHIGNMGX09)](https://codecov.io/gh/sims-lab/CapCruncher) [![Anaconda-Server Badge](https://anaconda.org/bioconda/capcruncher/badges/version.svg)](https://anaconda.org/bioconda/capcruncher) [![Anaconda-Server Badge License](https://anaconda.org/bioconda/capcruncher/badges/license.svg)](https://anaconda.org/bioconda/capcruncher) [![DOI](https://zenodo.org/badge/224631087.svg)](https://zenodo.org/badge/latestdoi/224631087) [![Downloads](https://pepy.tech/badge/capcruncher)](https://pepy.tech/project/capcruncher) -![CapCruncher Logo](https://github.com/sims-lab/CapCruncher/blob/68a91cea502a8623c71919c5f8d85febd6acef06/docs/img/capcruncher_logo.png) +![CapCruncher Logo](https://raw.githubusercontent.com/sims-lab/CapCruncher/master/docs/img/capcruncher_logo.png) The CapCruncher package is designed to process Capture-C, Tri-C and Tiled-C data. Unlike other pipelines that are designed to process Hi-C or Capture-HiC data, the filtering steps in CapCruncher are specifically optimized for these datasets. The package consists of a configurable data processing pipeline and a supporting command line interface to enable fine-grained control over the analysis. The pipeline is fast, robust and scales from a single workstation to a large HPC cluster. It is designed to be run on an HPC cluster and can be configured to use a variety of package management systems, such as conda and singularity. For more information, see the [documentation](https://sims-lab.github.io/CapCruncher/). -**Note:** The current version of CapCruncher is in beta. Please report any issues you encounter to the [issue tracker](https://github.com/sims-lab/CapCruncher/issues/new/choose) +> **Note:** +> The current version of CapCruncher is in beta. Please report any issues you encounter to the [issue tracker](https://github.com/sims-lab/CapCruncher/issues/new/choose) ## Quick Start ### Installation -**Warning:** CapCruncher is currently only availible for linux with MacOS support planned in the future. +> **Warning:** +> +> CapCruncher is currently only availible for linux with MacOS support planned in the future. CapCruncher is available on conda and PyPI. To install the latest version, run: @@ -77,15 +80,17 @@ capcruncher pipeline --cores capcruncher pipeline --cores 8 --profile slurm --use-singularity ``` -**Note:** In order to avoid disconnecting from the cluster, it is recommended to run the pipeline in a [tmux](https://linuxize.com/post/getting-started-with-tmux/) session. Alternatively, [nohup](https://linuxize.com/post/linux-nohup-command/) can be used to run the pipeline in the background. For example: - -``` bash -# tmux example -tmux new -s capcruncher -capcruncher pipeline --cores 8 --profile slurm --use-singularity - -# nohup example -nohup capcruncher pipeline --cores 8 --profile slurm --use-singularity & -``` +> **Note:** +> In order to avoid disconnecting from the cluster, it is recommended to run the pipeline in a [tmux](https://linuxize.com/post/getting-started-with-tmux/) +> session. Alternatively, [nohup](https://linuxize.com/post/linux-nohup-command/) can be used to run the pipeline in the background. For example: +> +> ``` bash +> # tmux example +>tmux new -s capcruncher +> capcruncher pipeline --cores 8 --profile slurm --use-singularity +> +># nohup example +>nohup capcruncher pipeline --cores 8 --profile slurm --use-singularity & +>``` See the [pipeline guide](https://sims-lab.github.io/CapCruncher/pipeline/) for more detailed instructions. diff --git a/capcruncher/api/count.py b/capcruncher/api/count.py deleted file mode 100644 index cf61a2f6..00000000 --- a/capcruncher/api/count.py +++ /dev/null @@ -1,125 +0,0 @@ -import itertools -import os -from collections import defaultdict - -import pandas as pd -from loguru import logger -from tqdm import tqdm - - -def get_fragment_combinations(df: pd.DataFrame): - return [ - sorted(comb) for comb in itertools.combinations(df["restriction_fragment"], 2) - ] - - -def subsample_reporters_from_df(df: pd.DataFrame, subsample: float): - - logger.info("Subsampling data") - if isinstance(subsample, float): - subsample_options = {"frac": subsample} - elif isinstance(subsample, int): - subsample_options = {"n": subsample} - - # Generate a subsample of fragments and slice these from the reporter dataframe - df_reporters = df[df["parent_id"].isin(df["parent_id"].sample(**subsample_options))] - - return df_reporters - - -def remove_exclusions_from_df(df: pd.DataFrame): - # TODO: remove this slight dtype hack - df = df.astype({"viewpoint": str}) - # df.loc[:, "viewpoint"] = df.loc[:, "viewpoint"].astype(str) - return df.query("viewpoint != exclusion") - - -def preprocess_reporters_for_counting(df: pd.DataFrame, **kwargs): - - # Need to remove any restiction fragments that are not in the digested genome - df_reporters = df.query("restriction_fragment != -1") - - if kwargs.get("remove_exclusions"): - logger.info("Removing excluded regions") - df_reporters = remove_exclusions_from_df(df_reporters) - - # Remove the capture site - if kwargs.get("remove_viewpoints"): - logger.info("Removing viewpoints") - df_reporters = df_reporters.query("capture_count == 0") - - # Subsample at the fragment level - if kwargs.get("subsample"): - df_reporters = subsample_reporters_from_df(df_reporters, kwargs["subsample"]) - - return df_reporters - - -def count_re_site_combinations( - groups: pd.core.groupby.GroupBy, - column: str = "restriction_fragment", - counts: defaultdict = None, -) -> defaultdict: - """ - Counts the number of unique combinations bewteen groups in a column. - - Args: - groups (pd.core.groupby.GroupBy): A groupby object - column (str, optional): The column to count combinations from. Defaults to "restriction_fragment". - counts (defaultdict, optional): A defaultdict to store counts in. Defaults to None. - - Returns: - defaultdict: A defaultdict containing counts of combinations. - """ - - if not counts: - counts = defaultdict(int) # Store counts in a default dict - - # For each set of ligated fragments - for ii, (group_name, frag) in enumerate(tqdm(groups)): - - for rf1, rf2 in itertools.combinations( - frag[column], 2 - ): # Get fragment combinations - # TODO: Notice a high amount of multicaptures (same oligo) not being removed. - # Need to track this down but for now will explicitly prevent the same bin appearing twice. - if not rf1 == rf2: - rf1, rf2 = sorted([rf1, rf2]) # Sort them to ensure consistency - counts[rf1, rf2] += 1 - - return counts - - -def get_counts_from_tsv_by_batch(reporters: os.PathLike, chunksize: int, **kwargs): - - df_reporters_iterator = pd.read_csv(reporters, sep="\t", chunksize=chunksize) - - ligated_rf_counts = defaultdict(int) - for ii, df_reporters in enumerate(df_reporters_iterator): - - logger.info(f"Processing chunk #{ii+1} of {chunksize} slices") - - reporters = preprocess_reporters_for_counting(df_reporters, **kwargs) - fragments = reporters.groupby("parent_id") - - logger.info("Counting") - ligated_rf_counts = count_re_site_combinations( - fragments, column="restriction_fragment", counts=ligated_rf_counts - ) - - return ligated_rf_counts - - -def get_counts_from_tsv(reporters: os.PathLike, **kwargs): - - df_reporters = pd.read_csv(reporters, sep="\t") - - reporters = preprocess_reporters_for_counting(df_reporters, **kwargs) - fragments = reporters.groupby("parent_id") - - logger.info("Counting") - ligated_rf_counts = count_re_site_combinations( - fragments, column="restriction_fragment" - ) - - return ligated_rf_counts diff --git a/capcruncher/api/deduplicate.py b/capcruncher/api/deduplicate.py deleted file mode 100644 index a69f383f..00000000 --- a/capcruncher/api/deduplicate.py +++ /dev/null @@ -1,247 +0,0 @@ -import functools -from loguru import logger -import multiprocessing -import os -import queue -from collections import namedtuple -from multiprocessing import Process -from typing import Iterable, Tuple - -import pandas as pd -import ujson -import xxhash -from capcruncher.utils import get_file_type, save_dict - - -class ReadDeduplicationParserProcess(Process): - """ - Process subclass for parsing fastq file(s) into a hashed {id:sequence} json format. - - Attributes: - inq: Input read queue - outq: Output read queue (Not currently used) - hash_seed: Seed for xxhash64 algorithm to ensure consistency - save_hash_dict_path: Path to save hashed dictionary - """ - - def __init__( - self, - inq: multiprocessing.Queue, - hash_seed: int = 42, - output_path: os.PathLike = "parsed.json", - ): - """ - Args: - inq (multiprocessing.SimpleQueue): Input queue for fastq reads. - hash_seed (int, optional): Seed to use for hashing. Defaults to 42. - output_path (os.PathLike, optional): Path to save hashed reads. - """ - - self.inq = inq - self.hash_seed = hash_seed - self.output_path = output_path - - super(ReadDeduplicationParserProcess, self).__init__() - - def run(self): - """Processes fastq reads from multiple files and generates a hashed json dict. - - Dictionary is hashed and in the format {(read 1 name + read 2 name): (s1 + s2)} - - Output path is specified by save_hashed_dict_path. - - """ - - hash_seed = self.hash_seed - hash_function = functools.partial(xxhash.xxh64_intdigest, seed=hash_seed) - records = dict() - - while True: - - try: - reads = self.inq.get(block=True, timeout=0.01) - - if reads: - - for read_set in reads: - hash_sequence = hash_function( - "".join([r.sequence for r in read_set]) - ) - hash_id = hash_function("".join([r.name for r in read_set])) - records[hash_id] = hash_sequence - - else: - break - - except queue.Empty: - continue - - output_format = get_file_type(self.output_path) - save_dict(records, self.output_path, output_format) - - -RemovalStatistics = namedtuple( - "RemovalStatistics", ["reads_total", "reads_unique", "reads_removed"] -) - - -class ReadDuplicateRemovalProcess(Process): - """ - Process subclass for parsing fastq file(s) and removing identified duplicates. - - Attributes: - inq: Input read queue - outq: Output queue for deduplicated reads. - duplicated_ids: Concatenated read ids to remove from input fastq files. - statq: Output queue for statistics. - reads_total: Number of fastq reads processed. - reads_unique: Number of non-duplicated reads output. - hash_seed: Seed for xxhash algorithm. Same as ReadDuplicationParserProcess. - """ - - def __init__( - self, - inq: multiprocessing.Queue, - outq: multiprocessing.Queue, - stats_tx: multiprocessing.Pipe, - duplicated_ids: set, - hash_seed: int = 42, - hash_read_name: bool = True, - ): - """ - Args: - inq (multiprocessing.SimpleQueue): Input queue for fastq reads. - outq (multiprocessing.SimpleQueue): Output queue for deduplicated reads. - stats_tx (multiprocessing.Pipe): Pipe for sending statistics. - duplicated_ids (set): Set of read ids to remove. - hash_seed (int, optional): Seed to use for hashing. Defaults to 42. - hash_read_name (bool, optional): Whether to hash read names. Defaults to True. - """ - - self.inq = inq - self.outq = outq - self.hash_seed = hash_seed - self.duplicated_ids = duplicated_ids - - # Misc - self.hash_read_name = hash_read_name - - # Stats - self.stats_tx = stats_tx - self.reads_total = 0 - self.reads_unique = 0 - - super(ReadDuplicateRemovalProcess, self).__init__() - - def run(self): - - """Performs read deduplication based on sequence. - - Unique reads are placed on outq and deduplication stats are placed on statq. - - """ - - hash_seed = self.hash_seed - hash_read_name = self.hash_read_name - hash_function = functools.partial(xxhash.xxh64_intdigest, seed=hash_seed) - duplicated_ids = self.duplicated_ids - reads_unique = list() - - while True: - - try: - reads = self.inq.get(block=True, timeout=0.01) - - if reads: - for read_glob in reads: - - hash_id = hash_function("".join([r.name for r in read_glob])) - - if hash_id not in duplicated_ids: - if hash_read_name: - for r in read_glob: - r.name = str(hash_function(r.name)) - - reads_unique.append(read_glob) - - self.reads_total += len(reads) - self.reads_unique += len(reads_unique) - self.outq.put(reads_unique.copy()) - reads_unique.clear() - - else: - break - - except queue.Empty: - continue - - stats = RemovalStatistics( - self.reads_total, self.reads_unique, self.reads_total - self.reads_unique - ) - self.stats_tx.send(stats) - - -def remove_duplicates_from_parquet( - slices: Iterable, duplicated_ids: pd.Series, output: os.PathLike -) -> Tuple[int, int]: - - import dask.dataframe as dd - import pyarrow.dataset as ds - - if not duplicated_ids.empty: - duplicates = set(duplicated_ids.values) - else: - duplicates = set() - - n_reads_total = ( - dd.read_parquet(slices, columns=["parent_id"], engine="pyarrow")["parent_id"] - .nunique() - .compute() - ) - - logger.info("Loading and filtering slices") - - # Load and filter data - slice_dataset = ds.dataset( - list(slices), - format="parquet", - ) - - slice_dataset_scanner = slice_dataset.scanner( - filter=~ds.field("parent_id").isin(duplicates) - ) - - logger.info("Writing unique slices") - ds.write_dataset( - slice_dataset_scanner, output, format="parquet", partitioning_flavor="hive" - ) - - n_reads_unique = ( - dd.read_parquet(output, columns=["parent_id"], engine="pyarrow")["parent_id"] - .nunique() - .compute() - ) - return (n_reads_total, n_reads_unique) - - -def read_duplicated_ids(path: os.PathLike): - - from xopen import xopen - - file_type = get_file_type(path) - - if file_type == "json": - with xopen.xopen(path, "r") as r: - ids_duplicated = {int(x) for x in ujson.load(r)} - - elif file_type == "hdf5": - - try: - ids_duplicated = pd.read_hdf(path, key="/duplicated_ids") - except KeyError: - ids_duplicated = pd.Series(data=["NO_DATA"], name="/duplicated_ids") - - elif file_type == "pickle": - ids_duplicated = pd.read_pickle(path) - - return ids_duplicated diff --git a/capcruncher/api/digest.py b/capcruncher/api/digest.py deleted file mode 100644 index b70da074..00000000 --- a/capcruncher/api/digest.py +++ /dev/null @@ -1,427 +0,0 @@ -from loguru import logger -import queue -import re -import pysam -import multiprocessing -import numpy as np -from typing import Iterable, List -from capcruncher.api.statistics import DigestionStats -import pandas as pd - - -def get_re_site(recognition_site: str = None) -> str: - - """ - Obtains the recogniton sequence for a supplied restriction enzyme or correctly - formats a supplied recognition sequence. - - Args: - recognition_site (str): Restriction enzyme name or recognition sequence. - - Returns: - recognition sequence e.g. "GATC" - - Raises: - ValueError: Error if restriction_enzyme is not in known enzymes - - """ - - known_enzymes = { - "dpnii": "GATC", - "mboi": "GATC", - "hindiii": "AAGCTT", - "ecori": "GAATTC", - "nlaiii": "CATG", - } - - if re.match(r"[GgAaTtCc]+", recognition_site): # matches a DNA sequence - cutsite = recognition_site.upper() # Just uppercase convert and return - - elif recognition_site.lower() in known_enzymes: - cutsite = known_enzymes[recognition_site.lower()] - - else: - logger.error("No restriction site or recognised enzyme provided") - raise ValueError("No restriction site or recognised enzyme provided") - - return cutsite - - -class DigestedChrom: - """ - Performs in slico digestion of fasta files. - - Identifies all restriction sites for a supplied restriction enzyme/restriction site - and generates bed style entries. - - Attributes: - chrom (pysam.FastqProxy): Chromosome to digest - recognition_seq (str): Sequence of restriction recognition site - recognition_len (int): Length of restriction recognition site - recognition_seq (re.Pattern): Regular expression for restriction recognition site - fragment_indexes (List[int]): Indexes of fragment(s) start and end positions. - fragment_number_offset (int): Starting fragment number. - fragment_min_len (int): Minimum fragment length required to report fragment - - """ - - def __init__( - self, - chrom: pysam.FastqProxy, - cutsite: str, - fragment_number_offset: int = 0, - fragment_min_len: int = 1, - ): - """ - Args: - chrom (pysam.FastqProxy): Input fastq entry to digest - cutsite (str): Restriction enzyme recognition sequence. - fragment_number_offset (int, optional): Changes the fragment number output. Useful for multiple digests. Defaults to 0. - fragment_min_len (int, optional): Minimum length of a fragment required to output. Defaults to 1. - """ - - self.chrom = chrom - self.recognition_seq = cutsite.upper() - self.recognition_len = len(cutsite) - self.recognition_re = re.compile(self.recognition_seq) - - self.fragment_indexes = self.get_recognition_site_indexes() - self.fragment_number_offset = fragment_number_offset - self.fragment_min_len = fragment_min_len - - def get_recognition_site_indexes(self) -> List[int]: - """ - Gets the start position of all recognition sites present in the sequence. - - Notes: - Also appends the start and end of the sequnece to enable clearer itteration - through the indexes. - - - Returns: - List[int]: Indexes of fragment(s) start and end positions. - """ - - indexes = [ - re_site.start() - for re_site in self.recognition_re.finditer(self.chrom.sequence.upper()) - ] - - indexes.insert(0, 0) - indexes.append(len(self.chrom.sequence)) - - return indexes - - @property - def fragments(self) -> Iterable[str]: - """ - Extracts the coordinates of restriction fragments from the sequence. - - Yields: - str: Bed formatted coordinates. - """ - - indexes = self.fragment_indexes - fragment_no = self.fragment_number_offset - - # Iterate through offset indexes to get correct start and end - for ii, (fragment_start, fragment_end) in enumerate(zip(indexes, indexes[1:])): - - # If this is not the first fragment - if ii > 0: - fragment_start += self.recognition_len - - # Check to see if the fragment is long enough to be recorded (default 1bp) - if (fragment_end - fragment_start) >= self.fragment_min_len: - yield self._prepare_fragment(fragment_start, fragment_end, fragment_no) - fragment_no += 1 - - def _prepare_fragment(self, start: int, end: int, fragment_no: int) -> str: - """Formats fragment into bed style coordinates. - - Args: - start (int): Fragment start. - end (int): Fragment end. - fragment_no (int): Fragment number. - - Returns: - str: Bed formatted coordinates. - """ - return ( - "\t".join([str(x) for x in (self.chrom.name, start, end, fragment_no)]) - + "\n" - ) - - -class DigestedRead: - """ - Performs in slico digestion of fastq files. - - Identifies all restriction sites for a supplied restriction enzyme/restriction site - and generates bed style entries. - - Attributes: - read (pysam.FastqProxy): Read to digest. - recognition_seq (str): Sequence of restriction recognition site. - recognition_len (int): Length of restriction recognition site. - recognition_seq (re.Pattern): Regular expression for restriction recognition site. - slices (List[str]): List of Fastq formatted digested reads (slices). - slice_indexes (List[int]): Indexes of fragment(s) start and end positions. - slice_number_offset (int): Starting fragment number. - min_slice_len (int): Minimum fragment length required to report fragment. - has_slices (bool): Recognition site(s) present within sequence. - - - - """ - - def __init__( - self, - read: pysam.FastqProxy, - cutsite: str, - min_slice_length: int = 18, - slice_number_start: int = 1, - allow_undigested: bool = False, - read_type: str = "flashed", - ): - """ - Args: - read (pysam.FastqProxy): Read to digest. - cutsite (str): Restriction enzyme recognition sequence. - min_slice_length (int, optional): Minimum slice length required for output. Defaults to 18. - slice_number_start (int, optional): Starting slice number. Defaults to 1. - allow_undigested (bool, optional): If True slices without a restriction site are not filtered out. Defaults to False. - read_type (str, optional): Combined (flashed) or non-combined (pe). Choose from (flashed|pe). Defaults to "flashed". - """ - - self.read = read - self.min_slice_length = min_slice_length - self.slice_number_start = slice_number_start - self.allow_undigested = allow_undigested - self.read_type = read_type - - self.recognition_seq = cutsite.upper() - self.recognition_len = len(cutsite) - self.recognition_re = re.compile(self.recognition_seq) - - self.slice_indexes = self.get_recognition_site_indexes() - self.slices_unfiltered = len(self.slice_indexes) - 1 - self.slices_filtered = 0 - self.has_slices = self.slices_unfiltered > 1 - self.slices = self._get_slices() - self.has_valid_slices = True if self.slices else False - - def get_recognition_site_indexes(self) -> List[int]: - indexes = [ - re_site.start() - for re_site in self.recognition_re.finditer(self.read.sequence.upper()) - ] - - indexes.insert(0, 0) - indexes.append(len(self.read.sequence)) - - return indexes - - def _get_slices(self) -> List[str]: - - indexes = self.slice_indexes - slice_no = self.slice_number_start - slices_list = [] - - if self.has_slices or self.allow_undigested: - - # Iterate through offset indexes to get correct start and end - for ii, (slice_start, slice_end) in enumerate(zip(indexes, indexes[1:])): - - # If this is not the first slice - if ii > 0: - slice_start += self.recognition_len - - if self._slice_passes_filter(slice_start, slice_end): - slices_list.append( - self._prepare_slice(slice_start, slice_end, slice_no) - ) - - self.slices_filtered += 1 - slice_no += 1 - - return slices_list - - def _prepare_slice(self, start, end, slice_no): - return "\n".join( - [ - f"@{self.read.name}|{self.read_type}|{slice_no}|{np.random.randint(0,100)}", - self.read.sequence[start:end], - "+", - self.read.quality[start:end], - ] - ) - - def _slice_passes_filter(self, start: int, end: int) -> bool: - """ - Determines if slice exceeds the minimum slice length. - - Args: - start (int): Slice start position. - end (int): Slice end position. - - Returns: - bool: True if greater than minimum slice length - - """ - - if (end - start) >= self.min_slice_length: - return True - - def __str__(self): - return ("\n".join(self.slices) + "\n") if self.slices else "" - - -class ReadDigestionProcess(multiprocessing.Process): - """ - Process subclass for multiprocessing fastq digestion. - - """ - - def __init__( - self, - inq: multiprocessing.Queue, - outq: multiprocessing.Queue, - stats_pipe: multiprocessing.Pipe = None, - **digestion_kwargs, - ) -> None: - - """ - Args: - inq (multiprocessing.SimpleQueue): Queue to hold list of reads to digest. - outq (multiprocessing.SimpleQueue): Queue to hold list of digested reads. - stats_pipe (multiprocessing.Pipe, optional): Pipe to send statistics to. Defaults to None. - **digestion_kwargs Dict[Any, Any]: Kwargs passed to DigestedRead. - - Raises: - KeyError: digestion_kwargs must contain: cutsite - """ - - super(ReadDigestionProcess, self).__init__() - - self.inq = inq - self.outq = outq - self.stats_pipe = stats_pipe - self.digestion_kwargs = digestion_kwargs - self.read_type = digestion_kwargs.get("read_type", "flashed") - self._stat_container = DigestionStats - - if "cutsite" not in digestion_kwargs: - raise KeyError("Cutsite is required to be present in digestion arguments") - - def _digest_reads(self, reads, **digestion_kwargs): - digested = [] - for i, read in enumerate(reads): - if i == 0: - digested.append(DigestedRead(read, **digestion_kwargs)) - else: - digestion_kwargs["slice_number_start"] = digested[i - 1].slices_filtered - digested.append(DigestedRead(read, **digestion_kwargs)) - - return digested - - def _digestion_statistics_to_dataframe(self, stats: List[DigestionStats]): - - if stats: - - df = pd.DataFrame(stats) - return ( - df.groupby(["read_type", "read_number", "unfiltered", "filtered"]) - .size() - .to_frame("count") - .reset_index() - ) - - def run(self): - """ - Performs read digestion. - - Reads to digest are pulled from inq, digested with the DigestedRead class - and the results placed on outq for writing. - - If a statq is provided, read digestion stats are placed into this queue for - aggregation. - - """ - - buffer_stats = list() - buffer_reads = list() - dframes_stats = list() - - while True: - try: - reads = self.inq.get(block=True, timeout=0.01) - - # Make sure that we don't need to terminate - if reads: - - # Accounts for PE as well as flashed - for read in reads: - - # Digest the read - digested = self._digest_reads(read, **self.digestion_kwargs) - - # Parse the digestion results - for read_number, digested_read in enumerate(digested): - - # Only write if there are valid slices present - if digested_read.has_valid_slices: - buffer_reads.append(str(digested_read)) - - # Will record all reads even if these do not digest - digested_read_stats = self._stat_container( - read_type=self.read_type, - read_number=( - read_number + 1 - if not self.read_type == "flashed" - else read_number - ), - unfiltered=digested_read.slices_unfiltered, - filtered=digested_read.slices_filtered, - ) - - # Append stats to the stats buffer - buffer_stats.append(digested_read_stats) - - # Aggregate individual read stats into a dataframe - df_stat_batch = self._digestion_statistics_to_dataframe( - buffer_stats - ) - - # Add this summary to the overall stats - dframes_stats.append(df_stat_batch) - - # Add the digested reads to the output queue - self.outq.put("".join(buffer_reads.copy())) - buffer_reads.clear() - buffer_stats.clear() - - else: - break - - except queue.Empty: - continue - - if self.stats_pipe: - # Merge all dataframes together - - try: - df_stats = pd.concat(dframes_stats) - df_stats_aggregated = ( - df_stats.groupby( - ["read_type", "read_number", "unfiltered", "filtered"] - ) - .sum() - .reset_index() - ) - - # Send the statistics to the main process - self.stats_pipe.send(df_stats_aggregated) - except ValueError: - # Might not actually have got any reads, just return none - self.stats_pipe.send(None) diff --git a/capcruncher/api/io.py b/capcruncher/api/io.py index e6f1be8a..1bdd7087 100644 --- a/capcruncher/api/io.py +++ b/capcruncher/api/io.py @@ -12,10 +12,6 @@ import pandas as pd import pysam import tqdm -from capcruncher.api.count import ( - get_fragment_combinations, - preprocess_reporters_for_counting, -) from capcruncher.utils import get_timing from pysam import FastxFile from xopen import xopen @@ -220,68 +216,6 @@ def run(self): traceback.format_exc() -class FastqWriterProcess(multiprocessing.Process): - def __init__( - self, - inq: multiprocessing.Queue, - output: Union[str, list], - compression_level: int = 5, - ): - super(FastqWriterProcess, self).__init__() - - self.inq = inq - self.output = output - self.compression_level = compression_level - self.file_handles = self._get_filehandles() - self.name = "FastqWriter" - - def _get_filehandles(self): - if isinstance(self.output, str): - return [ - xopen( - self.output, "w", compresslevel=self.compression_level, threads=0 - ), - ] - elif isinstance(self.output, (list, tuple, pd.Series)): - return [ - xopen(fn, "w", compresslevel=self.compression_level, threads=0) - for fn in self.output - ] - - def _inputs_match_number_of_handles(self, reads): - if len(reads[0]) == len(self.output): - return True - - def run(self): - while True: - try: - reads = self.inq.get(block=True, timeout=0.01) - if reads: - is_string_input = True if isinstance(reads, str) else False - - if is_string_input: - for fh in self.file_handles: - fh.write(reads) - - else: - reads_str = [ - "\n".join([str(r) for r in read_glob]) - for read_glob in zip(*reads) - ] - - for fh, read_set in zip(self.file_handles, reads_str): - fh.write((read_set + "\n")) - - else: - for fh in self.file_handles: - fh.close() - - break - - except queue.Empty: - continue - - CCAlignment = namedtuple( "CCAlignment", field_names=[ @@ -437,368 +371,3 @@ def bam_to_parquet( df_bam.to_parquet(output) return output - - -class CCHDF5ReaderProcess(multiprocessing.Process): - def __init__( - self, - path: os.PathLike, - key: str, - inq: multiprocessing.Queue, - outq: multiprocessing.Queue, - ): - # Reading vars - self.path = path - self.key = key - - # Multiprocessing vars - self.inq = inq - self.outq = outq - self.block_period = 0.01 - - super(CCHDF5ReaderProcess, self).__init__() - - def _select_by_viewpoint(self, store, viewpoint): - viewpoint = viewpoint - df = store.select( - self.key, - where="viewpoint in viewpoint", - columns=[ - "parent_id", - "restriction_fragment", - "viewpoint", - "capture", - "exclusion", - ], - ) - return df - - def run(self): - with pd.HDFStore(self.path, "r") as store: - while True: - try: - viewpoint_to_find = self.inq.get( - block=True, timeout=self.block_period - ) - - if viewpoint_to_find is None: - break - - df = self._select_by_viewpoint(store, viewpoint_to_find) - if not df.empty: - self.outq.put((viewpoint_to_find, df)) - - except queue.Empty: - pass - - -class CCParquetReaderProcess(multiprocessing.Process): - def __init__( - self, - path: os.PathLike, - inq: multiprocessing.Queue, - outq: multiprocessing.Queue, - selection_mode: Literal["single", "batch", "partition"], - ): - # Reading vars - self.path = path - self.partitions = glob.glob(f"{path}/*.parquet") or [ - self.path, - ] - self.selection_mode = selection_mode - - # Multiprocessing vars - self.inq = inq - self.outq = outq - self.block_period = 0.1 - - super(CCParquetReaderProcess, self).__init__() - - def _select_by_viewpoint(self, viewpoint): - df = pd.read_parquet( - self.path, - columns=[ - "parent_id", - "restriction_fragment", - "viewpoint", - "capture", - "exclusion", - ], - filters=[[("viewpoint", "==", viewpoint)]], - engine="pyarrow", - ) - return df - - def _select_by_viewpoint_batch(self, viewpoints): - df = pd.read_parquet( - self.path, - columns=[ - "parent_id", - "restriction_fragment", - "viewpoint", - "capture", - "exclusion", - ], - filters=[("viewpoint", "in", viewpoints)], - engine="pyarrow", - ) - return df - - def _select_by_viewpoint_batch_by_partition(self, viewpoints): - for part in self.partitions: - df = pd.read_parquet( - part, - columns=[ - "parent_id", - "restriction_fragment", - "viewpoint", - "capture", - "exclusion", - ], - filters=[("viewpoint", "in", viewpoints)], - engine="pyarrow", - ) - yield df - - def run(self): - while True: - try: - viewpoints_to_find = self.inq.get(block=True, timeout=self.block_period) - - if viewpoints_to_find is None: - break - - elif ( - self.selection_mode == "single" - ): # Slower as need to read all partitions each time - assert isinstance(viewpoints_to_find, str) - df = self._select_by_viewpoint(viewpoints_to_find) - if not df.empty: - self.outq.put((viewpoints_to_find, df)) - - elif ( - self.selection_mode == "batch" - ): # Faster as very low overhead when increasing number of viewpoints - df = self._select_by_viewpoint_batch(viewpoints_to_find) - for vp, df_vp in df.groupby("viewpoint"): - if not df_vp.empty: - logger.info(f"Queuing {vp} for counting") - self.outq.put((vp, df_vp)) - - elif ( - self.selection_mode == "partition" - ): # Low memory counting. Good for data rich viewpoints - vp_partitions = defaultdict(int) - - for df in self._select_by_viewpoint_batch_by_partition( - viewpoints_to_find - ): - for vp, df_vp in df.groupby("viewpoint"): - if not df_vp.empty: - vp_partitions[vp] += 1 - logger.info( - f"Queuing {vp} partition {vp_partitions[vp]} for counting" - ) - self.outq.put((vp, df_vp)) - - except queue.Empty: - pass - - -class FragmentCountingProcess(multiprocessing.Process): - def __init__( - self, - inq: multiprocessing.Queue, - outq: multiprocessing.Queue, - **preprocessing_kwargs, - ): - # Multiprocessing vars - self.inq = inq - self.outq = outq - self.block_period = 0.1 - self.preprocessing_kwargs = preprocessing_kwargs - - super(FragmentCountingProcess, self).__init__() - - def run(self): - while True: - try: - vp, df = self.inq.get(block=True, timeout=self.block_period) - - if df is None: - break - - df = preprocess_reporters_for_counting(df, **self.preprocessing_kwargs) - - counts = ( - df.groupby(["parent_id"]) - .apply(get_fragment_combinations) - .reset_index(drop=True) - .explode() - .to_frame("combinations") - .dropna(axis=0) - .assign( - bin1_id=lambda df: df["combinations"].map(lambda c: c[0]), - bin2_id=lambda df: df["combinations"].map(lambda c: c[1]), - ) - .groupby(["bin1_id", "bin2_id"]) - .size() - .reset_index() - .rename(columns={0: "count"}) - .sort_values(["bin1_id", "bin2_id"]) - ) - - self.outq.put((vp, counts)) - - except queue.Empty: - pass - - -class CCHDF5WriterProcess(multiprocessing.Process): - def __init__( - self, - inq: multiprocessing.Queue, - output_path: os.PathLike, - output_key: str = "/", - output_format: str = "cooler", - single_file: bool = True, - restriction_fragment_map: os.PathLike = None, - viewpoint_path: os.PathLike = None, - n_viewpoints: int = 0, - ): - # Writing vars - self.path = output_path - self.key = output_key - self.output_format = output_format - self.single_file = single_file - self.restriction_fragment_map = restriction_fragment_map - self.viewpoint_path = viewpoint_path - self.n_viewpoints = n_viewpoints - - if self.output_format == "cooler": - self.bins = pd.read_csv( - restriction_fragment_map, - sep="\t", - header=None, - names=["chrom", "start", "end", "name"], - ) - - # Multiprocessing vars - self.inq = inq - self.block_period = 0.01 - - super(CCHDF5WriterProcess, self).__init__() - - def run(self): - with tqdm.tqdm(total=self.n_viewpoints) as pbar: - while True: - try: - vp, df = self.inq.get(block=True, timeout=self.block_period) - - if df is None: - break - - if self.output_format == "tsv" and self.single_file: - df.to_csv(self.path, sep="\t", mode="a") - - elif self.output_format == "hdf5": - df.to_hdf(self.path, self.key, format="table", mode="a") - - elif self.output_format == "cooler": - if self.restriction_fragment_map and self.viewpoint_path: - from capcruncher.api.storage import create_cooler_cc - - logger.info(f"Making temporary cooler for {vp}") - create_cooler_cc( - output_prefix=self.path, - pixels=df, - bins=self.bins, - viewpoint_name=vp, - viewpoint_path=self.viewpoint_path, - ordered=True, - dupcheck=False, - triucheck=False, - ) - else: - raise ValueError( - "Restriction fragment map or path to viewpoints not supplied" - ) - - pbar.update() - - except queue.Empty: - pass - - -class CCCountsWriterProcess(multiprocessing.Process): - def __init__( - self, - inq: multiprocessing.Queue, - output_format: str = "cooler", - restriction_fragment_map: os.PathLike = None, - viewpoint_path: os.PathLike = None, - tmpdir: os.PathLike = ".", - ): - # Writing vars - self.output_format = output_format - self.restriction_fragment_map = restriction_fragment_map - self.viewpoint_path = viewpoint_path - self.tmpdir = tmpdir - - if self.output_format == "cooler": - self.bins = pd.read_csv( - restriction_fragment_map, - sep="\t", - header=None, - names=["chrom", "start", "end", "name"], - ) - - # Multiprocessing vars - self.inq = inq - self.block_period = 0.01 - - super(CCCountsWriterProcess, self).__init__() - - def run(self): - while True: - try: - vp, df = self.inq.get(block=True, timeout=self.block_period) - - if df is None: - break - else: - path = os.path.join( - self.tmpdir, - "".join( - random.choices(string.ascii_uppercase + string.digits, k=6) - ), - ) - - if self.output_format == "tsv" and self.single_file: - df.to_csv(path, sep="\t", mode="a") - - elif self.output_format == "hdf5": - df.to_hdf(path, vp, format="table", mode="a") - - elif self.output_format == "cooler": - if self.restriction_fragment_map and self.viewpoint_path: - from capcruncher.api.storage import create_cooler_cc - - logger.info(f"Making temporary cooler for {vp}") - create_cooler_cc( - output_prefix=path, - pixels=df, - bins=self.bins, - viewpoint_name=vp, - viewpoint_path=self.viewpoint_path, - ordered=True, - dupcheck=False, - triucheck=False, - ) - else: - raise ValueError( - "Restriction fragment map or path to viewpoints not supplied" - ) - - except queue.Empty: - pass diff --git a/capcruncher/cli/cli_fastq.py b/capcruncher/cli/cli_fastq.py index e27811ec..3b13996c 100644 --- a/capcruncher/cli/cli_fastq.py +++ b/capcruncher/cli/cli_fastq.py @@ -1,5 +1,49 @@ import click -from capcruncher.cli import UnsortedGroup +import pathlib +import ast +import re + + +class OptionEatAll(click.Option): + def __init__(self, *args, **kwargs): + self.save_other_options = kwargs.pop("save_other_options", True) + nargs = kwargs.pop("nargs", -1) + assert nargs == -1, "nargs, if set, must be -1 not {}".format(nargs) + super(OptionEatAll, self).__init__(*args, **kwargs) + self._previous_parser_process = None + self._eat_all_parser = None + + def add_to_parser(self, parser, ctx): + def parser_process(value, state): + # method to hook to the parser.process + done = False + value = [value] + if self.save_other_options: + # grab everything up to the next option + while state.rargs and not done: + for prefix in self._eat_all_parser.prefixes: + if state.rargs[0].startswith(prefix): + done = True + if not done: + value.append(state.rargs.pop(0)) + else: + # grab everything remaining + value += state.rargs + state.rargs[:] = [] + value = tuple(value) + + # call the actual process + self._previous_parser_process(value, state) + + retval = super(OptionEatAll, self).add_to_parser(parser, ctx) + for name in self.opts: + our_parser = parser._long_opt.get(name) or parser._short_opt.get(name) + if our_parser: + self._eat_all_parser = our_parser + self._previous_parser_process = our_parser.process + our_parser.process = parser_process + break + return retval @click.group() @@ -57,7 +101,7 @@ def split(*args, **kwargs): @cli.command() -@click.argument("input_fastq", nargs=-1, required=True) +@click.argument("fastqs", nargs=-1, required=True) @click.option( "-r", "--restriction_enzyme", @@ -72,21 +116,7 @@ def split(*args, **kwargs): required=True, ) @click.option("-o", "--output_file", default="out.fastq.gz") -@click.option("-p", "--n_cores", default=1, type=click.INT) @click.option("--minimum_slice_length", default=18, type=click.INT) -@click.option("--keep_cutsite", default=False) -@click.option( - "--compression_level", - help="Level of compression for output files (1=low, 9=high)", - default=5, - type=click.INT, -) -@click.option( - "--read_buffer", - help="Number of reads to process before writing to file to conserve memory.", - default=1e5, - type=click.INT, -) @click.option("--stats-prefix", help="Output prefix for stats file", default="stats") @click.option( "--sample-name", @@ -98,125 +128,53 @@ def digest(*args, **kwargs): Performs in silico digestion of one or a pair of fastq files. """ from capcruncher.cli.fastq_digest import digest + from capcruncher.utils import get_restriction_site - digest(*args, **kwargs) - - -@cli.group(cls=UnsortedGroup) -def deduplicate(): - """ - Identifies PCR duplicate fragments from Fastq files. - - PCR duplicates are very commonly present in Capture-C/Tri-C/Tiled-C data and must be removed - for accurate analysis. These commands attempt to identify and remove duplicate reads/fragments - from fastq file(s) to speed up downstream analysis. + kwargs["restriction_site"] = get_restriction_site(kwargs["restriction_enzyme"]) - """ + digest(*args, **kwargs) -@deduplicate.command(name="parse") -@click.argument("input_files", nargs=-1, required=True) +@cli.command() @click.option( - "-o", - "--output", - help="File to store hashed sequence identifiers", - default="out.json", + "-1", "--fastq1", help="Read 1 FASTQ files", required=True, cls=OptionEatAll ) @click.option( - "--read_buffer", - help="Number of reads to process before writing to file", - default=1e5, - type=click.INT, -) -def deduplicate_parse(*args, **kwargs): - - """ - Parses fastq file(s) into easy to deduplicate format. - - This command parses one or more fastq files and generates a dictionary containing - hashed read identifiers together with hashed concatenated sequences. The hash dictionary - is output in json format and the identify subcommand can be used to determine which read identifiers - have duplicate sequences. - """ - - from capcruncher.cli.fastq_deduplicate import parse - - parse(*args, **kwargs) - - -@deduplicate.command(name="identify") -@click.argument( - "input_files", - nargs=-1, + "-2", "--fastq2", help="Read 2 FASTQ files", required=True, cls=OptionEatAll ) -@click.option( - "-o", "--output", help="Output file", default="duplicates.json", required=True -) -def deduplicate_identify(*args, **kwargs): - - """ - Identifies fragments with duplicated sequences. - - Merges the hashed dictionaries (in json format) generated by the "parse" subcommand and - identifies read with exactly the same sequence (share an identical hash). Duplicated read - identifiers (hashed) are output in json format. The "remove" subcommand uses this dictionary - to remove duplicates from fastq files. - - """ - - from capcruncher.cli.fastq_deduplicate import identify - - identify(*args, **kwargs) - - -@deduplicate.command(name="remove") -@click.argument("input_files", nargs=-1) @click.option( "-o", - "--output_prefix", - help="Output prefix for deduplicated fastq file(s)", - default="", -) -@click.option( - "-d", - "--duplicated_ids", - help="Path to duplicate ids, identified by the identify subcommand", -) -@click.option( - "--read_buffer", - help="Number of reads to process before writing to file", - default=1e5, - type=click.INT, + "--output-prefix", + help="Output prefix for deduplicated FASTQ files", + default="deduped", ) @click.option( - "--gzip/--no-gzip", help="Determines if files are gziped or not", default=False + "--sample-name", help="Name of sample e.g. DOX_treated_1", default="sampleX" ) @click.option( - "--compression_level", - help="Level of compression for output files", - default=5, - type=click.INT, + "-s", "--statistics", help="Statistics output file name", default="stats.csv" ) @click.option( - "--sample-name", help="Name of sample e.g. DOX_treated_1", default="sampleX" -) -@click.option("--stats-prefix", help="Output prefix for stats file", default="stats") -@click.option( - "--hash-read-name/--no-hash-read-name", - help="Hashes the read id to save memory", + "--shuffle", + help="Shuffle reads before deduplication", + is_flag=True, default=False, ) -@click.option("-p", "--n_cores", default=1, type=click.INT) -def deduplicate_remove(*args, **kwargs): +def deduplicate(*args, **kwargs): """ - Removes fragments with duplicated sequences from fastq files. + Identifies PCR duplicate fragments from Fastq files. - Parses input fastq files and removes any duplicates from the fastq file(s) that are - present in the json file supplied. This json dictionary should be produced by the - "identify" subcommand. + PCR duplicates are very commonly present in Capture-C/Tri-C/Tiled-C data and must be removed + for accurate analysis. These commands attempt to identify and remove duplicate reads/fragments + from fastq file(s) to speed up downstream analysis. - Statistics for the number of duplicated and unique reads are also provided. """ - from capcruncher.cli.fastq_deduplicate import remove + from capcruncher.cli.fastq_deduplicate import deduplicate + + fq1 = [pathlib.Path(f) for f in ast.literal_eval(kwargs["fastq1"])] + fq2 = [pathlib.Path(f) for f in ast.literal_eval(kwargs["fastq2"])] + + kwargs["fastq_1"] = fq1 + kwargs["fastq_2"] = fq2 - remove(*args, **kwargs) + deduplicate(*args, **kwargs) diff --git a/capcruncher/cli/cli_genome.py b/capcruncher/cli/cli_genome.py index 91ef621c..a1fb2a81 100644 --- a/capcruncher/cli/cli_genome.py +++ b/capcruncher/cli/cli_genome.py @@ -41,5 +41,8 @@ def digest(*args, **kwargs): generated. """ from capcruncher.cli.genome_digest import digest + from capcruncher.utils import get_restriction_site + + kwargs["recognition_site"] = get_restriction_site(kwargs["recognition_site"]) digest(*args, **kwargs) diff --git a/capcruncher/cli/cli_interactions.py b/capcruncher/cli/cli_interactions.py index 4cd3aa90..6716849a 100644 --- a/capcruncher/cli/cli_interactions.py +++ b/capcruncher/cli/cli_interactions.py @@ -128,17 +128,6 @@ def pileup(*args, **kwargs): help="Subsamples reporters before analysis of interactions", type=float, ) -@click.option( - "--low-memory", - is_flag=True, - default=False, - help="Will perform counting in batches specifed by the chunksize to save memory (less accurate)", -) -@click.option( - "--chunksize", - default=int(2e6), - help="Number of records to process at once", -) @click.option( "-f", "--fragment-map", @@ -149,12 +138,6 @@ def pileup(*args, **kwargs): "--viewpoint-path", help="Path to viewpoints file", ) -@click.option( - "--cooler-output", - "output_as_cooler", - help="Output counts in cooler format", - is_flag=True, -) @click.option( "-p", "--n-cores", @@ -163,49 +146,27 @@ def pileup(*args, **kwargs): type=int, ) @click.option( - "-t", - "--file-type", - help="File format for input", - default="auto", - type=click.Choice(["auto", "tsv", "hdf5", "parquet"], case_sensitive=False), + "--assay", type=click.Choice(["capture", "tri", "tiled"]), default="capture" ) def count(*args, **kwargs): """ Determines the number of captured restriction fragment interactions genome wide. - Parses a reporter slices tsv and counts the number of unique restriction fragment - interaction combinations that occur within each fragment. + Counts the number of interactions between each restriction fragment and all other + restriction fragments in the fragment. - Options to ignore unwanted counts e.g. excluded regions or capture fragments are provided. - In addition the number of reporter fragments can be subsampled if required. - """ + The output is a cooler formatted HDF5 file containing a single group containing + the interactions between restriction fragments. - if kwargs.get("output_as_cooler"): - if not kwargs.get("fragment_map"): - raise ValueError( - "Restriction fragment map must be provided for cooler output" - ) - elif not kwargs.get("viewpoint_path"): - raise ValueError("Viewpoint path must be provided for cooler output") + See `https://cooler.readthedocs.io/en/latest/` for further details. + + """ from capcruncher.cli.interactions_count import count count(*args, **kwargs) -# @cli.group() -# def store(): -# """ -# Store reporter counts. - -# These commands store and manipulate reporter restriction fragment interaction -# counts as cooler formated groups in HDF5 files. - -# See subcommands for details. - -# """ - - @cli.command(name="counts-to-cooler") @click.argument("counts", required=True) @click.option( diff --git a/capcruncher/cli/cli_utilities.py b/capcruncher/cli/cli_utilities.py index 09f2a4cc..9614a6b2 100644 --- a/capcruncher/cli/cli_utilities.py +++ b/capcruncher/cli/cli_utilities.py @@ -253,7 +253,6 @@ def viewpoint_coordinates( recognition_site: str = "dpnii", output: os.PathLike = "viewpoint_coordinates.bed", ): - """ Aligns viewpoints to a genome and returns the coordinates of the viewpoint in the genome. @@ -274,7 +273,6 @@ def viewpoint_coordinates( ValueError: If no bowtie2 indices are supplied """ - import concurrent.futures from capcruncher.cli import genome_digest from pybedtools import BedTool @@ -282,72 +280,63 @@ def viewpoint_coordinates( viewpoints_fasta = NamedTemporaryFile("r+") viewpoints_aligned_bam = NamedTemporaryFile("r+") - with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: - # Digest genome to find restriction fragments - digestion = executor.submit( - genome_digest.digest, - **dict( - input_fasta=genome, - recognition_site=recognition_site, - output_file=digested_genome.name, - sort=True, - ), - ) + genome_digest.digest( + input_fasta=genome, + recognition_site=recognition_site, + output_file=digested_genome.name, + sort=True, + ) - # Generate a fasta file of viewpoints - if ".fa" in viewpoints: - fasta = viewpoints - elif viewpoints.endswith(".tsv") or viewpoints.endswith(".csv"): - df = pd.read_table(viewpoints) - cols = df.columns - fasta = dict_to_fasta( - df.set_index(cols[0])[cols[1]].to_dict(), viewpoints_fasta.name - ) - else: - raise ValueError("Oligos not provided in the correct format (FASTA/TSV)") + # Generate a fasta file of viewpoints + if ".fa" in viewpoints: + fasta = viewpoints + elif viewpoints.endswith(".tsv") or viewpoints.endswith(".csv"): + df = pd.read_table(viewpoints) + cols = df.columns + fasta = dict_to_fasta( + df.set_index(cols[0])[cols[1]].to_dict(), viewpoints_fasta.name + ) + else: + raise ValueError("Oligos not provided in the correct format (FASTA/TSV)") - # Align viewpoints to the genome - # if not genome_indicies or not os.path.exists(os.path.join(genome_indicies, ".1.bt2")): - # raise ValueError("No indices supplied for alignment") + # Align viewpoints to the genome + # if not genome_indicies or not os.path.exists(os.path.join(genome_indicies, ".1.bt2")): + # raise ValueError("No indices supplied for alignment") - p_alignment = subprocess.Popen( - ["bowtie2", "-x", genome_indicies, "-f", "-U", fasta], - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - ) - p_bam = subprocess.Popen( - ["samtools", "view", "-b", "-"], - stdout=viewpoints_aligned_bam, - stdin=p_alignment.stdout, - ) - p_alignment.stdout.close() - aligned_res = p_bam.communicate() - - # Ensure genome has been digested in this time - digestion.result() - - # Intersect digested genome with viewpoints - bt_genome = BedTool(digested_genome.name) - bt_viewpoints = BedTool(viewpoints_aligned_bam.name) - - intersections = bt_genome.intersect(bt_viewpoints, wa=True, wb=True) - - # Write results to file - ( - intersections.to_dataframe() - .drop_duplicates("name") - .assign(oligo_name=lambda df: df["thickEnd"].str.split("_L").str[0])[ - ["chrom", "start", "end", "oligo_name"] - ] - .to_csv(output, index=False, header=False, sep="\t") - ) + p_alignment = subprocess.Popen( + ["bowtie2", "-x", genome_indicies, "-f", "-U", fasta], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + ) + p_bam = subprocess.Popen( + ["samtools", "view", "-b", "-"], + stdout=viewpoints_aligned_bam, + stdin=p_alignment.stdout, + ) + p_alignment.stdout.close() + aligned_res = p_bam.communicate() + + # Intersect digested genome with viewpoints + bt_genome = BedTool(digested_genome.name) + bt_viewpoints = BedTool(viewpoints_aligned_bam.name) + + intersections = bt_genome.intersect(bt_viewpoints, wa=True, wb=True) + + # Write results to file + ( + intersections.to_dataframe() + .drop_duplicates("name") + .assign(oligo_name=lambda df: df["thickEnd"].str.split("_L").str[0])[ + ["chrom", "start", "end", "oligo_name"] + ] + .to_csv(output, index=False, header=False, sep="\t") + ) - for tmp in [digested_genome, viewpoints_fasta, viewpoints_aligned_bam]: - tmp.close() + for tmp in [digested_genome, viewpoints_fasta, viewpoints_aligned_bam]: + tmp.close() def dump_cooler(path: str, viewpoint: str, resolution: int = None) -> pd.DataFrame: - import cooler.api as cooler if resolution: diff --git a/capcruncher/cli/fastq_deduplicate.py b/capcruncher/cli/fastq_deduplicate.py index 5eca3bcc..09ea5a8a 100644 --- a/capcruncher/cli/fastq_deduplicate.py +++ b/capcruncher/cli/fastq_deduplicate.py @@ -4,200 +4,40 @@ Created on Fri Oct 4 13:47:20 2019 @author: asmith """ -import multiprocessing -import os -from typing import Tuple -import numpy as np -import pandas as pd -import tqdm -from capcruncher.api.deduplicate import ( - ReadDeduplicationParserProcess, - ReadDuplicateRemovalProcess, -) -from capcruncher.api.io import FastqReaderProcess, FastqWriterProcess -from capcruncher.utils import get_file_type, load_dict, save_dict - - -def parse(input_files: Tuple, output: os.PathLike = "out.json", read_buffer: int = 1e5): - """ - Parses fastq file(s) into easy to deduplicate format. - - This command parses one or more fastq files and generates a dictionary containing - hashed read identifiers together with hashed concatenated sequences. The hash dictionary - is output in json format and the identify subcommand can be used to determine which read identifiers - have duplicate sequences. - - \f - Args: - input_files (Tuple): One or more fastq files to process - output (os.PathLike, optional): Output for parsed read identifiers and sequences. Defaults to "out.json". - read_buffer (int, optional): Number of reads to process before outputting to file. Defaults to 1e5. - """ - - # Set up multiprocessing variables - inputq = ( - multiprocessing.Queue() - ) # Reads are placed into this queue for deduplication - - reader = FastqReaderProcess( - input_files=input_files, - outq=inputq, - read_buffer=read_buffer, - ) - - parser = ReadDeduplicationParserProcess(inq=inputq, output_path=output) - - reader.start() - parser.start() - - reader.join() - parser.join() - - -def identify(input_files: Tuple, output: os.PathLike = "duplicates.json"): - """ - Identifies fragments with duplicated sequences. - - Merges the hashed dictionaries (in json format) generated by the "parse" subcommand and - identifies read with exactly the same sequence (share an identical hash). Duplicated read - identifiers (hashed) are output in json format. The "remove" subcommand uses this dictionary - to remove duplicates from fastq files. - - - \f - Args: - input_files (Tuple): Paths to json files containing dictionaries with hashed read ids as the keys - and hashed sequences as the values. - output (os.PathLike, optional): Duplicate read ids identified. Defaults to "duplicates.json". - """ - - input_files = np.array(input_files) - np.random.shuffle(input_files) - sequences_dedup = set() - reads_duplicated = set() - - for ii, fn in enumerate(tqdm.tqdm(input_files)): - - input_file_type = get_file_type(fn) - fastq_hashed_dict = load_dict(fn, format=input_file_type) - - for name_hash, sequence_hash in fastq_hashed_dict.items(): - - if sequence_hash not in sequences_dedup: - sequences_dedup.add(sequence_hash) - else: - reads_duplicated.add(name_hash) - - del sequences_dedup - output_format = get_file_type(output) - save_dict(reads_duplicated, output, format=output_format) - - return reads_duplicated - - -def remove( - input_files: Tuple, - duplicated_ids: os.PathLike, - read_buffer: int = 1e5, - output_prefix: os.PathLike = "", - gzip: bool = False, - compression_level: int = 5, - sample_name: str = "", - stats_prefix: os.PathLike = "", - hash_read_name: bool = True, - n_cores: int = 1, +from typing import List, Tuple, Union +from loguru import logger as logging +import tabulate +import pathlib + + +def deduplicate( + fastq_1: List[str], + fastq_2: List[str], + output_prefix: Union[str, pathlib.Path] = "deduplicated_", + statistics: str = "deduplication_statistics.csv", + sample_name: str = "sampleX", + shuffle: bool = False, + **kwargs, ): - """ - Removes fragments with duplicated sequences from fastq files. - - Parses input fastq files and removes any duplicates from the fastq file(s) that are - present in the json file supplied. This json dictionary should be produced by the - "identify" subcommand. - - Statistics for the number of duplicated and unique reads are also provided. - - \f - Args: - input_files (Tuple): Input fastq files (in the same order as used for the parse command). - duplicated_ids (os.PathLike): Duplicated read ids from identify command (hashed and in json format). - read_buffer (int, optional): Number of reads to process before writing to file. Defaults to 1e5. - output_prefix (os.PathLike, optional): Deduplicated fastq output prefix. Defaults to "". - gzip (bool, optional): Determines if output is gzip compressed using pigz. Defaults to False. - compression_level (int, optional): Level of compression if required (1-9). Defaults to 5. - sample_name (str, optional): Name of sample processed e.g. DOX-treated_1. Defaults to "". - stats_prefix (os.PathLike, optional): Output prefix for statistics. Defaults to "". - - """ - - id_format = get_file_type(duplicated_ids) - duplicated_ids_dict = load_dict(duplicated_ids, id_format) - - readq = ( - multiprocessing.Queue() - ) # Reads are placed into this queue for deduplication - writeq = ( - multiprocessing.Queue() - ) # Deduplicated reads are placed into the queue for writing - - output_files = [ - f"{output_prefix}_{ii+1}.fastq{'.gz' if gzip else ''}" - for ii in range(len(input_files)) - ] - - deduplicator = list() - for ii in range(n_cores): - stats_pipe = multiprocessing.Pipe() - removal_process = ReadDuplicateRemovalProcess( - inq=readq, - outq=writeq, - duplicated_ids=duplicated_ids_dict, - stats_tx=stats_pipe[0], - hash_read_name=hash_read_name, - ) - stats_recv = stats_pipe[1] - - deduplicator.append((stats_recv, removal_process)) - - del duplicated_ids_dict # Reduces memory usage before starting (likely by forking) a new process - - reader = FastqReaderProcess( - input_files=input_files, - outq=readq, - read_buffer=read_buffer, - ) - - writer = FastqWriterProcess( - inq=writeq, - output=output_files, - compression_level=compression_level, + from capcruncher_tools.api import deduplicate_fastq + import pandas as pd + import pathlib + + df_stats = deduplicate_fastq( + fastq1=fastq_1, + fastq2=fastq_2, + output_prefix=output_prefix, + sample_name=sample_name, + shuffle=shuffle, ) - reader.start() - for (conn, dedup) in deduplicator: - dedup.start() - - writer.start() - reader.join() - - for _ in range(n_cores): - readq.put_nowait(None) - - stats = list() - for (conn, dedup) in deduplicator: - stats.append(conn.recv()) - dedup.join() - - writeq.put(None) - writer.join() - - # Generate statistics - df_stats = pd.DataFrame(stats) - df_stats = df_stats.sum() - df_stats = df_stats.to_frame("stat").rename_axis(index="stat_type").reset_index() - df_stats["stage"] = "deduplication" - df_stats["sample"] = sample_name - df_stats["read_type"] = "pe" - df_stats["read_number"] = 0 - df_stats.to_csv(f"{stats_prefix}.deduplication.csv", index=False) + logging.info(f"Saving stats to {statistics}") + df_stats.to_csv(statistics, index=False) - return df_stats + logging.info("Printing deduplication statistics to stdout") + # Print stats to stdout + df_vis = df_stats.copy() + df_vis["stat_type"] = df_vis["stat_type"].str.replace("_", " ").str.title() + df_vis = df_vis[["stat_type", "stat"]] + df_vis.columns = ["Stat Type", "Number of Reads"] + print(tabulate.tabulate(df_vis, headers="keys", tablefmt="psql", showindex=False)) diff --git a/capcruncher/cli/fastq_digest.py b/capcruncher/cli/fastq_digest.py index c03ae616..049d720f 100644 --- a/capcruncher/cli/fastq_digest.py +++ b/capcruncher/cli/fastq_digest.py @@ -1,175 +1,74 @@ -import multiprocessing import os -from typing import Tuple -from multiprocessing import Queue -from capcruncher.api.digest import ReadDigestionProcess -from capcruncher.api.io import FastqReaderProcess, FastqWriterProcess -from capcruncher.api.digest import get_re_site +from typing import Literal, Tuple, Dict + import pandas as pd +from loguru import logger as logging +import polars as pl def digest( - input_fastq: Tuple, - restriction_enzyme: str, - mode: str = "pe", + fastqs: Tuple, + restriction_site: str, + mode: Literal["flashed", "pe"] = "pe", output_file: os.PathLike = "out.fastq.gz", minimum_slice_length: int = 18, - compression_level: int = 5, - n_cores: int = 1, - read_buffer: int = 100000, stats_prefix: os.PathLike = "", - keep_cutsite: bool = False, - sample_name: str = "", -): + sample_name: str = "sampleX", + **kwargs, +) -> Dict[str, pl.DataFrame]: """ - Performs in silico digestion of one or a pair of fastq files. + Digest FASTQ files. - \f Args: - input_fastq (Tuple): Input fastq files to process - restriction_enzyme (str): Restriction enzyme name or site to use for digestion. - mode (str, optional): Digest combined(flashed) or non-combined(pe). - Undigested pe reads are output but flashed are not written. Defaults to "pe". - output_file (os.PathLike, optional): Output fastq file path. Defaults to "out.fastq.gz". - minimum_slice_length (int, optional): Minimum allowed length for in silico digested reads. Defaults to 18. - compression_level (int, optional): Compression level for gzip output (1-9). Defaults to 5. - n_cores (int, optional): Number of digestion processes to use. Defaults to 1. - read_buffer (int, optional): Number of reads to process before writing to file. Defaults to 100000. - stats_prefix (os.PathLike, optional): Output prefix for stats file. Defaults to "". - keep_cutsite (bool, optional): Determines if cutsite is removed from the output. Defaults to False. - sample_name (str, optional): Name of sample processed eg. DOX-treated_1. Defaults to ''. + fastqs: Tuple of FASTQ files. + restriction_site: Restriction enzyme name or sequence to use for in silico digestion. + mode: Digestion mode. Combined (Flashed) or non-combined (PE) read pairs. + output_file: Output file path. + minimum_slice_length: Minimum slice length. + stats_prefix: Output prefix for stats file. + sample_name: Name of sample e.g. DOX_treated_1. Required for correct statistics. + + Returns: + A dictionary of stats: stats_read_level, stats_hist_unfilt, stats_hist_filt """ + from capcruncher_tools.api import digest_fastq + from capcruncher.utils import get_restriction_site - # Set up multiprocessing variables - inputq = Queue() # reads are placed into this queue for processing - writeq = Queue() # digested reads are placed into the queue for writing - - cut_site = get_re_site(restriction_enzyme) - n_digestion_processes = (n_cores - 2) if n_cores > 2 else 1 - - reader = FastqReaderProcess( - input_files=input_fastq, - outq=inputq, - read_buffer=read_buffer, - ) - - digestion_processes = list() - for _ in range(n_digestion_processes): - stats_pipe = multiprocessing.Pipe() - - if mode == "flashed" and len(input_fastq) == 1: - digestion_process = ReadDigestionProcess( - inq=inputq, - outq=writeq, - cutsite=cut_site, - min_slice_length=minimum_slice_length, - read_type=mode, - allow_undigested=False, # Prevents outputting undigested reads - stats_pipe=stats_pipe[0], - ) - elif mode == "pe" and len(input_fastq) == 2: - digestion_process = ReadDigestionProcess( - inq=inputq, - outq=writeq, - cutsite=cut_site, - min_slice_length=minimum_slice_length, - read_type=mode, - allow_undigested=True, - stats_pipe=stats_pipe[0], - ) - - else: - raise ValueError("Wrong number of files specified. Flashed == 1, PE == 2") + logging.info("Digesting FASTQ files") - digestion_processes.append((stats_pipe, digestion_process)) + if len(fastqs) > 1 and mode == "flashed": + raise ValueError("Flashed mode can only be used with a single FASTQ file") - # Writer process is common to both - writer = FastqWriterProcess( - inq=writeq, + stats = digest_fastq( + fastqs=fastqs, + restriction_enzyme=get_restriction_site(restriction_site), output=output_file, - compression_level=compression_level, - ) - - # Start all processes - reader.start() - writer.start() - - for (_, process) in digestion_processes: - process.start() - - reader.join() - - for _ in range(n_cores - 2): - inputq.put_nowait(None) - - stats = list() - for (stats_pipe, digestion_process) in digestion_processes: - stats.append(stats_pipe[1].recv()) - digestion_process.join() - - writeq.put_nowait(None) - - writer.join() - - # Collate statistics - df_hist = ( - pd.concat([df for df in stats if df is not None]) - .groupby(["read_type", "read_number", "unfiltered", "filtered"]) - .sum() - .reset_index() + read_type=mode.title(), + sample_name=sample_name, + minimum_slice_length=minimum_slice_length, ) - # Melt dataframe for manipulation - df_hist = ( - df_hist.reset_index() - .melt( - id_vars=["index", "read_type", "read_number"], - value_vars=["filtered", "unfiltered"], - var_name="filtered", - value_name="n_slices", + logging.info("Digestion complete") + df_stats_read = stats["stats_read_level"].to_pandas() + df_stats_read_fmt = pd.DataFrame( + { + "stat_type": ["unfiltered", "filtered"], + "stat": [ + df_stats_read["number_of_read_pairs_unfiltered"].sum(), + df_stats_read["number_of_read_pairs_filtered"].sum(), + ], + } + ).assign(stage="digestion", sample=sample_name, read_type=mode, read_number=1) + + for hist_type in ["unfilt", "filt"]: + df = stats[f"stats_hist_{hist_type}"].to_pandas() + df = df.rename(columns={"slice_number": "n_slices"}) + df = df.replace({r"^One$": 1, r"^Two$": 2}, regex=True) + df = df.assign( + filtered=hist_type == "filt", ) - .replace("filtered", True) - .replace("unfiltered", False) - .set_index("index") - .join(df_hist["count"]) - .groupby(["read_type", "read_number", "filtered", "n_slices"]) - .sum() - .reset_index() - .assign(sample=sample_name) - ) + df.to_csv(f"{stats_prefix}.digestion.{hist_type}.histogram.csv", index=False) - # Unfiltered histogram - df_hist_unfilt = (df_hist.query("filtered == False")).sort_values( - ["n_slices", "count"] - ) - - # Filtered histogram - df_hist_filt = (df_hist.query("filtered == True and n_slices > 0")).sort_values( - ["n_slices", "count"] - ) - - # breakpoint() - - # Read summary - reads that pass the filter - df_stats = ( - df_hist.query("(n_slices >= 1) or (filtered == False) or (read_type == 'pe')") - .query("read_number < 2") - .groupby(["sample", "read_type", "read_number", "filtered"])["count"] - .sum() - .reset_index() - .assign( - stage="digestion", - filtered=lambda df: df["filtered"] - .replace(True, "filtered") - .replace(False, "unfiltered"), - ) - .rename(columns={"filtered": "stat_type", "count": "stat"}) - ) - - df_hist_unfilt.to_csv( - f"{stats_prefix}.digestion.unfiltered.histogram.csv", index=False - ) - df_hist_filt.to_csv(f"{stats_prefix}.digestion.filtered.histogram.csv", index=False) - df_stats.to_csv(f"{stats_prefix}.digestion.read.summary.csv", index=False) + df_stats_read_fmt.to_csv(f"{stats_prefix}.digestion.read.summary.csv", index=False) - return df_stats + return stats diff --git a/capcruncher/cli/fastq_split.py b/capcruncher/cli/fastq_split.py index 0655050d..3291bd12 100644 --- a/capcruncher/cli/fastq_split.py +++ b/capcruncher/cli/fastq_split.py @@ -17,6 +17,9 @@ import re from joblib import Parallel, delayed from typing import Literal +import sys + +PLATFORM = sys.platform def run_unix_split( @@ -42,6 +45,10 @@ def run_unix_split( if ".gz" not in fn: cmd = cmd.replace("zcat", "cat") + if PLATFORM == "darwin": + cmd = cmd.replace("split", "gsplit") + cmd = cmd.replace("zcat", "gzcat") + if gzip: cmd = cmd.replace("FILTER", f"--filter='pigz -p {n_cores} > $FILE.gz'") else: diff --git a/capcruncher/cli/genome_digest.py b/capcruncher/cli/genome_digest.py index 185183f8..579284a4 100644 --- a/capcruncher/cli/genome_digest.py +++ b/capcruncher/cli/genome_digest.py @@ -9,7 +9,6 @@ """ import pysam import xopen -from capcruncher.api.digest import DigestedChrom, get_re_site from typing import Iterator import os from loguru import logger @@ -33,10 +32,9 @@ def parse_chromosomes(fasta: pysam.FastxFile) -> Iterator[pysam.FastqProxy]: def digest( input_fasta: os.PathLike, recognition_site: str, - logfile: os.PathLike = "genome_digest.log", output_file: os.PathLike = "genome_digest.bed", - remove_cutsite: bool = True, sort=False, + **kwargs, ): """ Performs in silico digestion of a genome in fasta format. @@ -51,54 +49,36 @@ def digest( Args: input_fasta (os.PathLike): Path to fasta file containing whole genome sequence, split by chromosome recognition_site (str): Restriction enzyme name/ Sequence of recognition site. - logfile (os.PathLike, optional): Output path of the digestion logfile. Defaults to genome_digest.log. output_file (os.PathLike, optional): Output path for digested chromosome bed file. Defaults to genome_digest.bed. - remove_cutsite (bool, optional): Determines if restriction site is removed. Defaults to True. """ - # TODO: Include option to keep or remove the cutsite. For now will just remove to keep inline with the fastq digestion script + from capcruncher_tools.api import digest_genome + from capcruncher.utils import get_restriction_site + import polars as pl - fragment_stats = dict() - fragment_number = 0 - cut_sequence = get_re_site(recognition_site=recognition_site) + logger.info("Digesting genome") + df_stats = digest_genome( + fasta=input_fasta, + output=output_file, + restriction_enzyme=get_restriction_site(recognition_site), + remove_recognition_site=True, + minimum_slice_length=18, + n_threads=1, + ) - with xopen.xopen(output_file, "w") as output: - - for chrom in parse_chromosomes(input_fasta): - - logger.info(f"Processing chrom {chrom.name}") - - digested_chrom = DigestedChrom( - chrom, - cut_sequence, - fragment_number_offset=fragment_number, - fragment_min_len=1, - ) - - for n_fragments, fragment in enumerate(digested_chrom.fragments): - if n_fragments % 10000 == 0: - logger.info(f"Written {n_fragments} fragments") - - output.write(fragment) - - fragment_stats[chrom.name] = n_fragments - fragment_number += n_fragments + 1 + logger.info("Digestion complete") if sort: logger.info("Sorting output") - df = pd.read_csv(output_file, sep="\t", names=["chrom", "start", "end", "name"]) + df = pl.read_csv( + output_file, separator="\t", new_columns=["chrom", "start", "end", "name"] + ) # If changing the order, also need to change the fragment number df = ( - df.sort_values(["chrom", "start"]) + df.sort(["chrom", "start"]) .drop(columns="name") - .reset_index(drop=True) - .reset_index() - .rename(columns={"index": "name"})[["chrom", "start", "end", "name"]] + .with_row_count("name")[["chrom", "start", "end", "name"]] ) - df.to_csv(output_file, sep="\t", index=False, header=False) - - with xopen.xopen(logfile, "w") as output: - for chrom, n_fragments in fragment_stats.items(): - output.write(f"{chrom}\t{n_fragments}\n") + df.write_csv(output_file, separator="\t", has_header=False) diff --git a/capcruncher/cli/interactions_count.py b/capcruncher/cli/interactions_count.py index 916241b0..936453cc 100644 --- a/capcruncher/cli/interactions_count.py +++ b/capcruncher/cli/interactions_count.py @@ -2,164 +2,52 @@ from loguru import logger import tempfile import glob -import more_itertools -import multiprocessing - -from capcruncher.api.io import ( - CCParquetReaderProcess, - FragmentCountingProcess, - CCCountsWriterProcess, -) -from capcruncher.api.storage import merge_coolers - - -def get_number_of_reader_threads(n_cores): - threads = n_cores // 2 - - if 1 < threads < 4: - threads = threads - elif threads < 1: - threads = 1 - - return threads +from typing import Literal def count( reporters: os.PathLike, - output: os.PathLike = "counts.tsv", - file_type: str = "auto", + output: os.PathLike = "CC_cooler.hdf5", remove_exclusions: bool = False, - remove_capture: bool = False, - subsample: int = 0, - low_memory: bool = False, - chunksize: int = 2e6, - output_as_cooler: bool = False, + remove_viewpoint: bool = False, + subsample: float = 0, fragment_map: os.PathLike = None, viewpoint_path: os.PathLike = None, n_cores: int = 1, -): + assay: Literal["capture", "tri", "tiled"] = "capture", + **kwargs, +) -> os.PathLike: """ - Determines the number of captured restriction fragment interactions genome wide. - - Parses a reporter slices tsv and counts the number of unique restriction fragment - interaction combinations that occur within each fragment. + Counts interactions between the viewpoint and the rest of the genome. - Options to ignore unwanted counts e.g. excluded regions or capture fragments are provided. - In addition the number of reporter fragments can be subsampled if required. - - \f Args: - reporters (os.PathLike): Reporter tsv file path. - output (os.PathLike, optional): Output file path for interaction counts tsv. Defaults to 'counts.tsv'. - remove_exclusions (bool, optional): Removes regions marked as capture proximity exclusions before counting. Defaults to False. - remove_capture (bool, optional): Removes all capture fragments before counting. Defaults to False. - subsample (int, optional): Subsamples the fragments by the specified fraction. Defaults to 0 i.e. No subsampling. - """ + reporters: Path to reporters file. + output: Output file name. + remove_exclusions: Remove excluded regions. + remove_viewpoint: Remove capture regions. + subsample: Subsample reads. + fragment_map: Path to fragment map. + viewpoint_path: Path to viewpoint file. + n_cores: Number of cores. + assay: Assay type. + **kwargs: Additional arguments. + Returns: + Path to the generated cooler file. - import pyarrow - import pandas as pd - - logger.info(f"Examining viewpoints from parquet file: {reporters}") - # Unsure of the best way to do this. Will just load the first partion vp column and extract - - df = pd.read_parquet(reporters, engine="pyarrow", columns=["viewpoint"]) - viewpoints = df["viewpoint"].cat.categories.to_list() - viewpoint_sizes = df["viewpoint"].value_counts() - - logger.info(f"Number of viewpoints: {len(viewpoints)}") - logger.info(f"Number of slices per viewpoint: {viewpoint_sizes.to_dict()}") - - MAX_SLICE_NUMBER = 5e6 - - if ( - viewpoint_sizes[viewpoint_sizes > MAX_SLICE_NUMBER].shape[0] == 0 - ): # Less than MAX_SLICE_NUMBER slices, read in batch - mode = "batch" - else: - # More than MAX_SLICE_NUMBER slices, read by partition - mode = "partition" - - # Multiprocessing queues - viewpoints_queue = multiprocessing.Queue() - slices_queue = multiprocessing.Queue() - counts_queue = multiprocessing.Queue() - - reader = CCParquetReaderProcess( - path=reporters, - inq=viewpoints_queue, - outq=slices_queue, - selection_mode=mode, + """ + from capcruncher_tools.api import count_interactions + + clr = count_interactions( + reporters=reporters, + output=output, + remove_exclusions=remove_exclusions, + remove_viewpoint=remove_viewpoint, + subsample=subsample, + fragment_map=fragment_map, + viewpoint_path=viewpoint_path, + n_cores=n_cores, + assay=assay, + **kwargs, ) - # Multiprocessing set-up - n_reading_threads = get_number_of_reader_threads(n_cores=n_cores) - pyarrow.set_cpu_count(n_reading_threads) - - n_worker_processes = (n_cores - n_reading_threads) // 2 - n_counting_processes = n_worker_processes if n_worker_processes > 1 else 1 - n_writing_processes = n_counting_processes - - logger.info(f"Starting {n_reading_threads} reader threads") - - counters = [ - FragmentCountingProcess( - inq=slices_queue, - outq=counts_queue, - remove_capture=remove_capture, - remove_exclusions=remove_exclusions, - subsample=subsample, - ) - for _ in range(n_counting_processes) - ] - - tmpdir = tempfile.TemporaryDirectory() - writers = [ - CCCountsWriterProcess( - inq=counts_queue, - output_format="cooler" if output_as_cooler else file_type, - restriction_fragment_map=fragment_map, - viewpoint_path=viewpoint_path, - tmpdir=tmpdir.name, - ) - for i in range(n_writing_processes) - ] - - # Start all processes - processes = [*writers, *counters, reader] - for process in processes: - process.start() - - for vp_chunk in more_itertools.chunked(viewpoints, 10): - viewpoints_queue.put(vp_chunk) - - logger.info("Finished loading viewpoints") - viewpoints_queue.put(None) # Sentinel value to signal end of viewpoints - reader.join() - - # End the counting inqueue - for _ in range(n_cores): - slices_queue.put((None, None)) - - # Join the counters - for counter in counters: - counter.join() - - # End the counting queue - for _ in range(n_counting_processes): - counts_queue.put((None, None)) - - logger.info("Finished counting") - - # Join the writers - for writer in writers: - writer.join() - - logger.info("Finished writing") - - # Merge the output files together - # TODO: Allow other than cooler outputs - logger.info(f"Making final cooler at {output}") - output_files = glob.glob(os.path.join(tmpdir.name, "*.hdf5")) - merge_coolers(output_files, output=output) - - tmpdir.cleanup() + return clr diff --git a/capcruncher/pipeline/workflow/Snakefile b/capcruncher/pipeline/workflow/Snakefile index 3396b65f..780665aa 100644 --- a/capcruncher/pipeline/workflow/Snakefile +++ b/capcruncher/pipeline/workflow/Snakefile @@ -82,14 +82,6 @@ PERFORM_BINNING = capcruncher.pipeline.utils.can_perform_binning(config) ASSAY = config["analysis"]["method"] SAMPLE_NAMES = FASTQ_SAMPLES.sample_names_all - -## Check if capcruncher_tools is installed -if importlib.util.find_spec("capcruncher_tools"): - print("CapCruncherTools is installed. Enabling CapCruncherTools functions.") - CAPCRUNCHER_TOOLS = True -else: - CAPCRUNCHER_TOOLS = False - # CLEANUP = "full" if config["analysis"].get("cleanup", False) else "partial" CLEANUP = False diff --git a/capcruncher/pipeline/workflow/rules/digest.smk b/capcruncher/pipeline/workflow/rules/digest.smk index 0ae13820..501791e5 100644 --- a/capcruncher/pipeline/workflow/rules/digest.smk +++ b/capcruncher/pipeline/workflow/rules/digest.smk @@ -3,7 +3,6 @@ rule digest_genome: fasta=config["genome"]["fasta"], output: bed="capcruncher_output/resources/restriction_fragments/genome.digest.bed.gz", - stats="capcruncher_output/interim/statistics/digest_genome/genome_digestion_statistics.txt", log: "capcruncher_output/resources/restriction_fragments/genome.digest.log", params: @@ -13,7 +12,7 @@ rule digest_genome: mem_mb=2000, shell: """ - capcruncher genome digest {input.fasta} -r {params.enzyme_or_site} -o {output.bed}.tmp --sort -l {output.stats} > {log} 2>&1 && + capcruncher genome digest {input.fasta} -r {params.enzyme_or_site} -o {output.bed}.tmp --sort > {log} 2>&1 && pigz -p {threads} {output.bed}.tmp -c > {output.bed} 2> {log} rm {output.bed}.tmp """ diff --git a/capcruncher/pipeline/workflow/rules/fastq.smk b/capcruncher/pipeline/workflow/rules/fastq.smk index 1af3e5d3..8a7ea46b 100644 --- a/capcruncher/pipeline/workflow/rules/fastq.smk +++ b/capcruncher/pipeline/workflow/rules/fastq.smk @@ -197,169 +197,49 @@ checkpoint split: """ -if not CAPCRUNCHER_TOOLS: - - rule deduplication_parse: - input: - fq1="capcruncher_output/interim/fastq/split/{sample}/{sample}_part{part}_1.fastq.gz", - fq2="capcruncher_output/interim/fastq/split/{sample}/{sample}_part{part}_2.fastq.gz", - output: - temp( - "capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_{part}.pkl" - ), - resources: - mem_mb=2000, - log: - "capcruncher_output/logs/deduplication_fastq/parse/{sample}_part{part}.log", - shell: - """ - capcruncher \ - fastq \ - deduplicate \ - parse \ - {input.fq1} \ - {input.fq2} \ - -o \ - {output} \ - > {log} 2>&1 - """ - - rule deduplication_identify: - input: - hashed_reads=get_pickles, - output: - temp("capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}.pkl"), - log: - "capcruncher_output/logs/deduplication_fastq/identify/{sample}.log", - resources: - mem_mb=5000, - threads: 3 - shell: - """ - capcruncher \ - fastq \ - deduplicate \ - identify \ - {input.hashed_reads} \ - -o \ - {output} - > {log} 2>&1 - """ - - rule deduplication_remove: - input: - fq1="capcruncher_output/interim/fastq/split/{sample}/{sample}_part{part}_1.fastq.gz", - fq2="capcruncher_output/interim/fastq/split/{sample}/{sample}_part{part}_2.fastq.gz", - ids_duplicated="capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}.pkl", - output: - fq1=temp( - "capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_part{part}_1.fastq.gz" - ), - fq2=temp( - "capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_part{part}_2.fastq.gz" - ), - stats=temp( - "capcruncher_output/interim/statistics/deduplication/data/{sample}_part{part}.deduplication.csv" - ), - params: - prefix_fastq="capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_part{part}", - prefix_stats="capcruncher_output/interim/statistics/deduplication/data/{sample}_part{part}", - resources: - mem_mb=2000, - log: - "capcruncher_output/logs/deduplication_fastq/remove/{sample}_part{part}.log", - threads: 4 - shell: - """ - capcruncher \ - fastq \ - deduplicate \ - remove \ - {input.fq1} \ - {input.fq2} \ - -d \ - {input.ids_duplicated} \ - --sample-name \ - {wildcards.sample} \ - --stats-prefix \ - {params.prefix_stats} \ - -o \ - {params.prefix_fastq} \ - -p \ - {threads} \ - --hash-read-name \ - --gzip \ - > {log} 2>&1 - """ - - rule trim: - input: - fq1="capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_part{part}_1.fastq.gz", - fq2="capcruncher_output/interim/fastq/deduplicated/{sample}/{sample}_part{part}_2.fastq.gz", - output: - trimmed1="capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_1.fastq.gz", - trimmed2="capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_2.fastq.gz", - params: - outdir="capcruncher_output/interim/fastq/trimmed/{sample}/", - threads: 4 - resources: - mem_mb=2000, - log: - "capcruncher_output/logs/trimming/{sample}_{part}.log", - shell: - """ - trim_galore --cores {threads} --trim-n --paired --output_dir {params.outdir} {input.fq1} {input.fq2} >> {log} 2>&1 && - mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_1_val_1.fq.gz {output.trimmed1} && - mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_2_val_2.fq.gz {output.trimmed2} - """ - -else: - - checkpoint deduplication: - input: - unpack(get_fastq_split_1), - output: - fastq_dir=directory( - "capcruncher_output/interim/fastq/deduplicated/{sample}/" - ), - stats="capcruncher_output/interim/statistics/deduplication/data/{sample}.deduplication.csv", - params: - prefix_fastq="capcruncher_output/interim/fastq/deduplicated/{sample}/", - prefix_stats="capcruncher_output/interim/statistics/deduplication/data/{sample}/", - log: - "capcruncher_output/logs/deduplication_fastq/{sample}.log", - threads: workflow.cores * 0.5 - resources: - mem_mb=lambda wildcards, attempt: 2000 * 2**attempt, - shell: - """ - mkdir -p {params.prefix_stats} && - capcruncher-tools fastq-deduplicate -1 {input.fq1} -2 {input.fq2} -o {params.prefix_fastq} --statistics {output.stats} --sample-name {wildcards.sample} > {log} 2>&1 - """ - - rule trim: - input: - unpack(get_deduplicated_fastq_pair), - output: - trimmed1=temp( - "capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_1.fastq.gz" - ), - trimmed2=temp( - "capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_2.fastq.gz" - ), - params: - outdir="capcruncher_output/interim/fastq/trimmed/{sample}/", - threads: 4 - resources: - mem_mb=2000, - log: - "capcruncher_output/logs/trimming/{sample}_{part}.log", - shell: - """ - trim_galore --cores {threads} --trim-n --paired --output_dir {params.outdir} {input.fq1} {input.fq2} >> {log} 2>&1 && - mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_1_val_1.fq.gz {output.trimmed1} && - mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_2_val_2.fq.gz {output.trimmed2} - """ +checkpoint deduplication: + input: + unpack(get_fastq_split_1), + output: + fastq_dir=directory("capcruncher_output/interim/fastq/deduplicated/{sample}/"), + stats="capcruncher_output/interim/statistics/deduplication/data/{sample}.deduplication.csv", + params: + prefix_fastq="capcruncher_output/interim/fastq/deduplicated/{sample}/", + log: + "capcruncher_output/logs/deduplication_fastq/{sample}.log", + threads: workflow.cores * 0.5 + resources: + mem_mb=lambda wildcards, attempt: 2000 * 2**attempt, + shell: + """ + mkdir -p {params.prefix_fastq} && + capcruncher fastq deduplicate -1 {input.fq1} -2 {input.fq2} -o {params.prefix_fastq} --statistics {output.stats} --sample-name {wildcards.sample} > {log} 2>&1 + """ + + +rule trim: + input: + unpack(get_deduplicated_fastq_pair), + output: + trimmed1=temp( + "capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_1.fastq.gz" + ), + trimmed2=temp( + "capcruncher_output/interim/fastq/trimmed/{sample}/{sample}_part{part}_2.fastq.gz" + ), + params: + outdir="capcruncher_output/interim/fastq/trimmed/{sample}/", + threads: 4 + resources: + mem_mb=2000, + log: + "capcruncher_output/logs/trimming/{sample}_{part}.log", + shell: + """ + trim_galore --cores {threads} --trim-n --paired --output_dir {params.outdir} {input.fq1} {input.fq2} >> {log} 2>&1 && + mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_1_val_1.fq.gz {output.trimmed1} && + mv {params.outdir}/{wildcards.sample}_part{wildcards.part}_2_val_2.fq.gz {output.trimmed2} + """ rule flash: @@ -476,14 +356,13 @@ checkpoint rebalance_partitions_pe: rule digest_flashed_combined: input: flashed="capcruncher_output/interim/fastq/rebalanced/{sample}/flashed/{sample}_part{part}_flashed_1.fastq.gz", - sentinel="capcruncher_output/interim/fastq/rebalanced/{sample}/flashed/.complete.sentinel", output: digested=temp( "capcruncher_output/interim/fastq/digested/{sample}/{sample}_part{part}_flashed.fastq.gz" ), stats_read="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed.digestion.read.summary.csv", - stats_unfiltered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed.digestion.unfiltered.histogram.csv", - stats_filtered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed.digestion.filtered.histogram.csv", + stats_unfiltered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed.digestion.unfilt.histogram.csv", + stats_filtered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed.digestion.filt.histogram.csv", params: prefix_stats="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_flashed", restriction_site=config["analysis"]["restriction_enzyme"], @@ -500,8 +379,6 @@ rule digest_flashed_combined: {input.flashed} \ -o \ {output.digested} \ - -p \ - {threads} \ -m \ flashed \ -r \ @@ -520,14 +397,13 @@ rule digest_flashed_pe: input: pe1="capcruncher_output/interim/fastq/rebalanced/{sample}/pe/{sample}_part{part}_pe_1.fastq.gz", pe2="capcruncher_output/interim/fastq/rebalanced/{sample}/pe/{sample}_part{part}_pe_2.fastq.gz", - sentinel="capcruncher_output/interim/fastq/rebalanced/{sample}/pe/.complete.sentinel", output: digested=temp( "capcruncher_output/interim/fastq/digested/{sample}/{sample}_part{part}_pe.fastq.gz" ), stats_read="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe.digestion.read.summary.csv", - stats_unfiltered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe.digestion.unfiltered.histogram.csv", - stats_filtered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe.digestion.filtered.histogram.csv", + stats_unfiltered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe.digestion.unfilt.histogram.csv", + stats_filtered="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe.digestion.filt.histogram.csv", params: prefix_stats="capcruncher_output/interim/statistics/digestion/data/{sample}_part{part}_pe", restriction_site=config["analysis"]["restriction_enzyme"], @@ -545,8 +421,6 @@ rule digest_flashed_pe: {input.pe2} \ -o \ {output.digested} \ - -p \ - {threads} \ -m \ pe \ -r \ diff --git a/capcruncher/pipeline/workflow/rules/pileup.smk b/capcruncher/pipeline/workflow/rules/pileup.smk index 6a05d9aa..f99a2109 100644 --- a/capcruncher/pipeline/workflow/rules/pileup.smk +++ b/capcruncher/pipeline/workflow/rules/pileup.smk @@ -9,73 +9,37 @@ def get_outdir(wildcards, output): return str(pathlib.Path(output[0]).parent) -if CAPCRUNCHER_TOOLS: - - rule count: - input: - slices=rules.combine_flashed_and_pe_post_deduplication.output.slices, - restriction_fragment_map=rules.digest_genome.output.bed, - viewpoints=VIEWPOINTS, - output: - temp( - "capcruncher_output/interim/pileups/counts_by_restriction_fragment/{sample}.hdf5" - ), - log: - "capcruncher_output/logs/counts/{sample}.log", - threads: 8 - resources: - mem_mb=get_mem_mb, - params: - outdir=get_outdir, - assay=config["analysis"]["method"], - shell: - """ - mkdir -p {params.outdir} && \ - capcruncher-tools \ - count \ - {input.slices} \ - -o {output} \ - -f {input.restriction_fragment_map} \ - -v {input.viewpoints} \ - -p {threads} \ - --assay {params.assay} - > {log} 2>&1 - """ - -else: - - rule count: - input: - slices=rules.combine_flashed_and_pe_post_deduplication.output.slices, - restriction_fragment_map=rules.digest_genome.output.bed, - viewpoints=VIEWPOINTS, - output: - temp( - "capcruncher_output/interim/pileups/counts_by_restriction_fragment/{sample}.hdf5" - ), - log: - "capcruncher_output/logs/counts/{sample}.log", - threads: 8 - resources: - mem_mb=lambda wc, attempt: 3000 * 2**attempt, - params: - outdir=get_outdir, - assay=config["analysis"]["method"], - shell: - """ - mkdir -p {params.outdir} && \ - capcruncher \ - interactions \ - count \ - {input.slices} \ - -o {output} \ - -f {input.restriction_fragment_map} \ - -v {input.viewpoints} \ - --cooler-output \ - -p {threads} \ - --assay {params.assay} - > {log} 2>&1 - """ +rule count: + input: + slices=rules.combine_flashed_and_pe_post_deduplication.output.slices, + restriction_fragment_map=rules.digest_genome.output.bed, + viewpoints=VIEWPOINTS, + output: + temp( + "capcruncher_output/interim/pileups/counts_by_restriction_fragment/{sample}.hdf5" + ), + log: + "capcruncher_output/logs/counts/{sample}.log", + threads: 8 + resources: + mem_mb=get_mem_mb, + params: + outdir=get_outdir, + assay=config["analysis"]["method"], + shell: + """ + mkdir -p {params.outdir} && \ + capcruncher \ + interactions \ + count \ + {input.slices} \ + -o {output} \ + -f {input.restriction_fragment_map} \ + -v {input.viewpoints} \ + -p {threads} \ + --assay {params.assay} + > {log} 2>&1 + """ rule bin_counts: diff --git a/capcruncher/pipeline/workflow/rules/statistics.smk b/capcruncher/pipeline/workflow/rules/statistics.smk index d750fc5a..da59ed4c 100644 --- a/capcruncher/pipeline/workflow/rules/statistics.smk +++ b/capcruncher/pipeline/workflow/rules/statistics.smk @@ -6,8 +6,8 @@ from typing import List def get_digestion_statistics(wc, sample_names: List[str]): stat_types = { "read_level_stats": "digestion.read.summary.csv", - "histogram_unfiltered": "digestion.unfiltered.histogram.csv", - "histogram_filtered": "digestion.filtered.histogram.csv", + "histogram_unfiltered": "digestion.unfilt.histogram.csv", + "histogram_filtered": "digestion.filt.histogram.csv", } stat_prefixes = [] @@ -58,28 +58,16 @@ def get_stat_parts(wc, sample_names: List[str]): return files -if not CAPCRUNCHER_TOOLS: - - rule combine_stats_fastq_deduplication: - input: - fastq_deduplication=get_stat_parts, - output: - "capcruncher_output/interim/statistics/deduplication/fastq_deduplication.csv", - script: - "../scripts/combine_deduplication_stats.py" - -else: - - rule combine_stats_fastq_deduplication: - input: - fastq_deduplication=expand( - "capcruncher_output/interim/statistics/deduplication/data/{sample}.deduplication.csv", - sample=SAMPLE_NAMES, - ), - output: - "capcruncher_output/interim/statistics/deduplication/fastq_deduplication.csv", - script: - "../scripts/combine_deduplication_stats.py" +rule combine_stats_fastq_deduplication: + input: + fastq_deduplication=expand( + "capcruncher_output/interim/statistics/deduplication/data/{sample}.deduplication.csv", + sample=SAMPLE_NAMES, + ), + output: + "capcruncher_output/interim/statistics/deduplication/fastq_deduplication.csv", + script: + "../scripts/combine_deduplication_stats.py" rule combine_stats_digestion: diff --git a/capcruncher/utils.py b/capcruncher/utils.py index dae0fb4f..798571d9 100644 --- a/capcruncher/utils.py +++ b/capcruncher/utils.py @@ -33,11 +33,6 @@ def read_dataframes(filenames: Iterable, **kwargs): ) -def invert_dict(d: dict) -> Generator[Tuple[str, str], None, None]: - """Inverts key: value pairs into value: key pairs""" - yield from ((v, k) for k, v in d.items()) - - def is_on(param: str) -> bool: """ Returns True if parameter in "on" values @@ -101,7 +96,7 @@ def is_valid_bed(bed: Union[str, BedTool], verbose=True) -> bool: elif isinstance(e, IndexError): logger.warning( - "Wrong number of fields detected check separator/ number of columns" + "Wrong number of fields detected check separator or number of columns" ) else: @@ -567,25 +562,33 @@ def get_cooler_uri(store: os.PathLike, viewpoint: str, resolution: Union[str, in return uri -class MockFastqRecord: - """Testing class used to supply a pysam FastqProxy like object""" +def get_restriction_site(restriction_enzyme: str): + """ + Gets the restriction site for a given restriction enzyme. + + Can be either the name of the restriction enzyme or the restriction site itself. + The restriction site will just be returned if it is a valid DNA sequence. + + Args: + restriction_enzyme: Name of restriction enzyme or restriction site. - def __init__(self, name, sequence, quality): - self.name = name - self.sequence = sequence - self.quality = quality - self.comment = "" + Returns: + Restriction site. - def __repr__(self) -> str: - return "|".join([self.name, self.sequence, "+", self.quality]) + Raises: + ValueError: If restriction enzyme is not found. + """ -class MockFastaRecord: - """Testing class used to supply a pysam FastqProxy like object""" + if re.match(r"^[ACGTacgt]+$", restriction_enzyme): + return restriction_enzyme - def __init__(self, name, sequence): - self.name = name - self.sequence = sequence + import Bio.Restriction - def __repr__(self) -> str: - return f">{self.name}\n{self.sequence}\n" + all_enzymes = {e.lower(): e for e in Bio.Restriction.AllEnzymes.as_string()} + if restriction_enzyme.lower() not in all_enzymes: + raise ValueError(f"Restriction enzyme {restriction_enzyme} not found.") + else: + return Bio.Restriction.AllEnzymes.get( + all_enzymes[restriction_enzyme.lower()] + ).site diff --git a/codecov.yml b/codecov.yml index 47f428c7..b8999b06 100644 --- a/codecov.yml +++ b/codecov.yml @@ -24,4 +24,4 @@ ignore: - "capcruncher/utils.py" - "setup.py" - "capcruncher/cli/__init__.py" - - "capcruncher/tools/statistics.py" + - "conftest.py" diff --git a/conftest.py b/conftest.py index a6411e63..0934afa6 100644 --- a/conftest.py +++ b/conftest.py @@ -8,3 +8,27 @@ def pytest_addoption(parser): @pytest.fixture(scope='session', autouse=True) def cores(request): return request.config.getoption("--cores") + + +class MockFastqRecord: + """Testing class used to supply a pysam FastqProxy like object""" + + def __init__(self, name, sequence, quality): + self.name = name + self.sequence = sequence + self.quality = quality + self.comment = "" + + def __repr__(self) -> str: + return "|".join([self.name, self.sequence, "+", self.quality]) + + +class MockFastaRecord: + """Testing class used to supply a pysam FastqProxy like object""" + + def __init__(self, name, sequence): + self.name = name + self.sequence = sequence + + def __repr__(self) -> str: + return f">{self.name}\n{self.sequence}\n" diff --git a/docs/installation.md b/docs/installation.md index 5a8ab4d6..124624d8 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -37,9 +37,13 @@ conda activate cc # Install CapCruncher using pip pip install capcruncher - +s # Optional - highly recommended to install the optional dependencies -pip install capcruncher[stats,plotting,experimental] +# Installs dependencies for: +# * plotting, +# * differential interaction analysis +# * speeding up the pipeline using experimental features (capcruncher-tools) +pip install capcruncher[full] ``` ### Install CapCruncher in a minimal conda environment and use singularity to run the pipeline diff --git a/environment.yml b/environment.yml index 9771d8b7..5ad48102 100644 --- a/environment.yml +++ b/environment.yml @@ -8,27 +8,27 @@ dependencies: - bowtie2>=2.4.4 - click<=8.1.0 - cooler + - cookiecutter - fastqc - flash - ibis-framework>=6.0.0 - iced - joblib - - more-itertools + - loguru - multiqc - - natsort - numpy<=1.23 + - pandas<=2.0.1 - papermill - - pandas<=1.5.2 - pip<=22.2.2 - - plotly>5.0.0,<=5.10.0 - - pyarrow>8.0.0,<=9.0.0 + - plotly>5.0.0,<=5.15.0 + - pyarrow>8.0.0,<12.0.0 - pybedtools - pyranges - pysam>0.15.0,<=0.19.1 - python-duckdb - python-xxhash - python>=3.10,<3.11 - - ray-core>=2.6.0 + - quarto - samtools - seaborn - setuptools>50.1.0,<=65.0.1 @@ -40,4 +40,3 @@ dependencies: - ucsc-bedtobigbed - ucsc-fetchchromsizes - ujson - - quarto diff --git a/mkdocs.yml b/mkdocs.yml index 3eb5780c..4aba7151 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -3,6 +3,7 @@ nav: - Home: index.md - Installation: installation.md - Pipeline: pipeline.md + - Cluster Setup: cluster_config.md - Hints and Tips: tips.md - Plotting: plotting.ipynb - CLI Reference: cli.md diff --git a/pyproject.toml b/pyproject.toml index 13cc4872..e597b2b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ dynamic = ["version", "dependencies"] [tool.setuptools.packages.find] include = ["capcruncher", "capcruncher.*"] +[tool.setuptools.dynamic] +dependencies = {file = ["requirements.txt"]} [tool.setuptools_scm] local_scheme = "no-local-version" @@ -27,8 +29,11 @@ write_to = "capcruncher/_version.py" [project.optional-dependencies] stats = ["pydeseq2"] -plotting = ["coolbox"] -experimental = ["capcruncher-tools >= 0.1.8"] +visualisation = ["coolbox"] +full = ["pydeseq2", "coolbox"] + +[project.scripts] +capcruncher = "capcruncher.cli:cli" [project.urls] repo = "https://github.com/sims-lab/CapCruncher.git" diff --git a/requirements.txt b/requirements.txt index cdc399e7..edaa5225 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,23 +1,22 @@ +capcruncher-tools>=0.1.9 click<=8.2.0 +cookiecutter cooler duckdb h5py ibis-framework[duckdb] loguru more-itertools -natsort numpy pandas<=2.0.1 plotly>5.0.0,<=5.10.0 -pyarrow>8.0.0,<=9.0.0 +pyarrow>8.0.0,<12.0.0 pybedtools pyranges pysam>0.15.0,<=0.19.1 -pyyaml>=6.0 quarto ray seaborn -sh snakemake<=7.30.2 tqdm trackhub diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index eeb0c8c6..00000000 --- a/setup.cfg +++ /dev/null @@ -1,45 +0,0 @@ -[metadata] -name = capcruncher -# version = 0.3.0 -author = asmith -author_email = alastair.smith@ndcls.ox.ac.uk -description = An end-to-end solution for processing Capture-C, Tri-C and Tiled-C data -long_description = file: README.md -long_description_content_type = text/markdown -license = GNU GENERAL PUBLIC LICENSE Version 3 -license_file = LICENSE -url = https://github.com/sims-lab/CapCruncher.git -classifiers = - Environment :: Console - Intended Audience :: Science/Research - Operating System :: POSIX :: Linux - License :: OSI Approved :: GNU General Public License v3 (GPLv3) - Topic :: Scientific/Engineering :: Bio-Informatics - Programming Language :: Python :: 3 :: Only - Programming Language :: Python :: Implementation :: CPython - - -[options] -zip_safe = False -include_package_data = True -packages = find_namespace: -install_requires= file: - requirements.txt - -python_requires = >= 3.8 - -[options.entry_points] -console_scripts = capcruncher = capcruncher.cli:cli - -[options.extras_require] -stats = pydeseq2 -plotting = coolbox -experimental = capcruncher-tools >= 0.1.8 - -[tool:pytest] -minversion = 6.0 -addopts = -ra -q -testpaths = tests/ - -[options.package_data] -* = *.yml diff --git a/sm b/sm deleted file mode 100644 index 9c351f83..00000000 --- a/sm +++ /dev/null @@ -1,91 +0,0 @@ - -checkpoint check_viewpoints_exist: - input: - cooler="capcruncher_output/results/{sample}/{sample}.hdf5", - output: - "capcruncher_output/resources/viewpoints/{sample}.json", - log: - "capcruncher_output/logs/check_viewpoints_exist/{sample}.log", - script: - "../scripts/identify_viewpoints_with_interactions.py" - - - -def check_viewpoints_exist(wildcards): - output = checkpoints.check_viewpoints_exist.get(sample=wildcards.sample).output[0] - samples = glob_wildcards(pathlib.Path(output).parent / "{sample}.json").sample - files = [pathlib.Path(output).parent / f"{sample}.json" for sample in samples] - return files - -rule viewpoint_checks: - input: - check_viewpoints_exist, - output: - touch("capcruncher_output/resources/viewpoints/viewpoints_present.txt"), - - - - - -def get_existing_viewpoints(sample): - identified_viewpoints = checkpoints.check_viewpoints_exist.get( - sample=sample - ).output[0] - - with open(identified_viewpoints) as f: - viewpoints = json.load(f) - - return [vp for vp, count in viewpoints.items() if count > 0] - - -def define_viewpoint_bigwigs(): - bigwigs = [] - for sample in SAMPLE_NAMES: - viewpoints = get_existing_viewpoints(sample) - bw = expand( - "capcruncher_output/results/{sample}/bigwigs/{norm}/{sample}_{viewpoint}.bigWig", - sample=[sample], - norm=["raw", "norm"], - viewpoint=viewpoints, - ) - bigwigs.extend(bw) - - return bigwigs - - -def define_comparisons(): - # Define which viewpoints are present in the samples - viewpoints_per_sample = {} - for sample in SAMPLE_NAMES: - viewpoints_per_sample[sample] = get_existing_viewpoints(sample) - - # Define which viewpoints are present in all samples - viewpoints_all_samples = set.intersection( - *[set(v) for v in viewpoints_per_sample.values()] - ) - - # Define comparisons - comparisons = [] - if COMPARE_SAMPLES: - bigwigs_summary = ( - expand( - "capcruncher_output/results/comparisons/bigwigs/{group}.{method}-summary.{viewpoint}.bigWig", - group=DESIGN["condition"].unique(), - viewpoint=viewpoints_all_samples, - method=SUMMARY_METHODS, - ), - ) - bigwigs_compare = ( - expand( - "capcruncher_output/results/comparisons/bigwigs/{comparison}.{method}-subtraction.{viewpoint}.bigWig", - comparison=[ - f"{a}-{b}" - for a, b in itertools.permutations(DESIGN["condition"].unique(), 2) - ], - method=SUMMARY_METHODS, - viewpoint=viewpoints_all_samples, - ), - ) - comparisons = bigwigs_summary + bigwigs_compare - - return comparisons diff --git a/tests/data/fastq_digestion/chrom_to_digest.fa b/tests/data/fastq_digestion/chrom_to_digest.fa deleted file mode 100644 index a6edbf52..00000000 --- a/tests/data/fastq_digestion/chrom_to_digest.fa +++ /dev/null @@ -1,2 +0,0 @@ ->chrUNKNOWN -AAANNNGGGTTTGATCGGGGGGGGGGGGG diff --git a/tests/data/fastq_digestion/chrom_to_digest.fa.gz b/tests/data/fastq_digestion/chrom_to_digest.fa.gz new file mode 100644 index 0000000000000000000000000000000000000000..5590d6fe5969bbac47cd1b55a0dd50872f7bd66a GIT binary patch literal 79 zcmV-V0I>fbiwFoS(*vrEn>3ib2$^AGpqa&&a` l^Ye3ecMk~(ad!-Hc1H$Wb_kV*Af*%nE&z$