Skip to content

Commit

Permalink
fix(annotation): Corrects only null values for "get" intersections (#222
Browse files Browse the repository at this point in the history
)

* fix: corrected issue where annotate only reported NA

* fix(annotate): polars issue with categorical nums

* fix(annotate): typo

* fix(annotation): correct dtype specification
  • Loading branch information
alsmith151 authored Nov 10, 2023
1 parent e18c6fa commit 6bd4489
Showing 1 changed file with 108 additions and 41 deletions.
149 changes: 108 additions & 41 deletions capcruncher/api/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pybedtools
import pyranges as pr
import numpy as np
from pandas.api.types import is_categorical_dtype, is_numeric_dtype

from capcruncher.utils import convert_bed_to_pr

Expand Down Expand Up @@ -104,32 +105,56 @@ def intersection(self) -> pr.PyRanges:


class IntersectionGet(Intersection):
def get_new_dtype(self):
# Determine the dtype of the name column
if is_numeric_dtype(self.b.df["Name"]):
dtype_new = self.b.df["Name"].dtype

elif is_categorical_dtype(self.b.df["Name"]):
if is_numeric_dtype(self.b.df["Name"].cat.categories):
dtype_new = self.b.df["Name"].cat.categories.dtype
numeric_dtype_mapping = {
"int64": "Int64",
"float64": "Float64",
"float32": "Float32",
"int32": "Int32",
}
dtype_new = numeric_dtype_mapping.get(str(dtype_new), "Int64")

else:
dtype_new = self.b.df["Name"].dtype

else:
dtype_new = pd.CategoricalDtype([*self.b.df["Name"].unique().astype(str)])

return dtype_new

@property
def intersection(self) -> pr.PyRanges:

dtype = pd.CategoricalDtype([*self.b.df["Name"].unique().astype(str)])

dtype_new = self.get_new_dtype()

# Hack to get around the fact that pyranges has a bug when joining categorical columns
# See https://github.com/pyranges/pyranges/issues/230

name_col = self.a.df["Name"]

gr_a = pr.PyRanges(self.a.df.drop(columns="Name").assign(Name=lambda df: df.reset_index().index))

df_overlapping = gr_a.join(self.b, nb_cpu=self.n_cores, report_overlap=True).df


df_overlapping = self.a.join(
self.b, nb_cpu=self.n_cores, report_overlap=True
).df

if not df_overlapping.empty:
df_non_overlapping = gr_a.df.loc[lambda df: df.Name.isin(df_overlapping.Name.unique()) == False]
df_non_overlapping = self.a.df.loc[
lambda df: ~df.Name.isin(df_overlapping.Name)
]
else:
raise ValueError("No overlapping regions found")



df_both = pd.concat([df_overlapping, df_non_overlapping]).sort_values("Name")

df_both['Name'] = df_both['Name'].map(name_col)
df_both['frac'] = df_both.eval("Overlap / (End - Start)")
df_both[self.name] = np.where(df_both['frac'] >= self.fraction, df_both['Name_b'], pd.NA)
df_both[self.name] = df_both[self.name].astype(str).astype(dtype)

# Filter out the non-overlapping regions
df_both["frac"] = df_both.eval("Overlap / (End - Start)")
df_both[self.name] = np.where(
df_both["frac"] >= self.fraction, df_both["Name_b"], pd.NA
)
df_both[self.name] = df_both[self.name].astype(dtype_new)

df_both.drop(
columns=[
Expand All @@ -142,7 +167,7 @@ def intersection(self) -> pr.PyRanges:
"Score_b",
],
errors="ignore",
inplace=True
inplace=True,
)

return df_both.pipe(pr.PyRanges)
Expand Down Expand Up @@ -189,7 +214,6 @@ def __init__(
fraction: float = 0,
max_cores: int = 1,
):

self.annotation_columns = None

if isinstance(bed_a, pr.PyRanges):
Expand All @@ -198,49 +222,92 @@ def __init__(
self.a = convert_bed_to_pr(bed_a)
self.a = self.process_bed(self.a)
else:
raise ValueError(f"bed_a must be of type str, pybedtools.BedTool, or pr.PyRanges. Got {type(bed_a)}")

raise ValueError(
f"bed_a must be of type str, pybedtools.BedTool, or pr.PyRanges. Got {type(bed_a)}"
)

self.b = bed_b if isinstance(bed_b, pr.PyRanges) else convert_bed_to_pr(bed_b)
self.name = name
self.fraction = fraction
self.n_cores = max_cores if self.b.df.shape[0] > 50_000 else 1

def get_intersection(self, method: Literal["get", "count"] = "get") -> pr.PyRanges:

try:

if self.b.empty:
_intersection = IntersectionFailed(
_intersection = IntersectionFailed(
self.a, self.b, self.name, self.fraction, self.n_cores
).intersection
elif method == "get":
_intersection = IntersectionGet(
_intersection = IntersectionGet(
self.a, self.b, self.name, self.fraction, self.n_cores
).intersection
elif method == "count":
_intersection = IntersectionCount(
self.a, self.b, self.name, self.fraction, self.n_cores
).intersection
else:
_intersection = IntersectionFailed(
_intersection = IntersectionFailed(
self.a, self.b, self.name, self.fraction, self.n_cores
).intersection

except (OSError, IndexError, FileNotFoundError, StopIteration, AssertionError, ValueError):
_intersection = IntersectionFailed(

except (
OSError,
IndexError,
FileNotFoundError,
StopIteration,
AssertionError,
ValueError,
):
_intersection = IntersectionFailed(
self.a, self.b, self.name, self.fraction, self.n_cores
).intersection

if self.annotation_columns is not None:
_intersection = _intersection.df.join(self.annotation_columns, how="left").pipe(pr.PyRanges)

return _intersection

def process_bed(self, bed):
annotation_col_names = [col for col in bed.df.columns if col not in ["Chromosome", "Start", "End", "Strand", "Score", "Name"]]
if annotation_col_names:
self.annotation_columns = bed.df.loc[:, annotation_col_names]
return bed.df.loc[:, ["Chromosome", "Start", "End", "Name"]].pipe(pr.PyRanges)

# If there are annotation columns, join them to the intersection
if not self.annotation.empty:
_intersection = (
_intersection.df.set_index("Name")
.join(self.annotation, how="left")
.reset_index()
.pipe(pr.PyRanges)
)

# Put the original name back
_intersection = _intersection.df.assign(
Name=lambda df: df.Name.map(self.original_name_mapping)
)

return pr.PyRanges(df=_intersection)

def process_bed(self, bed: pr.PyRanges):
# Convert to dataframe
bed = bed.df

# Create a unique identifier for each slice
self.uid = pd.util.hash_pandas_object(
bed.loc[:, ["Chromosome", "Start", "End", "Name"]]
)
self.original_name_mapping = dict(zip(self.uid, bed.Name))

# Add the unique identifier to the bed
bed = bed.assign(Name=self.uid)

# Identify colunms that have annotation information
self.annotation_col_names = [
col
for col in bed.columns
if col not in ["Chromosome", "Start", "End", "Strand", "Score", "Name"]
]

# If there are annotation columns, store them in a separate dataframe
if self.annotation_col_names:
self.annotation = bed.set_index("Name").loc[:, self.annotation_col_names]
else:
self.annotation = pd.DataFrame()

# Re-generate the pyranges object with the unique identifier
bed = bed.loc[:, ["Chromosome", "Start", "End", "Name"]]

return bed.pipe(pr.PyRanges)


# @ray.remote
Expand Down

0 comments on commit 6bd4489

Please sign in to comment.