Skip to content

Commit

Permalink
working on phenomes before we start merging genomes and phenomes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffersonfparil committed Nov 14, 2024
1 parent 758efda commit ec2a238
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 53 deletions.
184 changes: 144 additions & 40 deletions data/genotype.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,33 @@ def __init__(
if (p != chromosomes.shape[0]) or (p != positions.shape[0]) or (p != alleles.shape[0]):
raise IncompatibleParameters
loci: np.ndarray = np.full((pk1,), fill_value='', dtype='<U256', order='C')
n_digits: int = len(str(np.max(positions)))
for i in range(p):
chr: str = chromosomes[i]
pos: str = str(positions[i])
pos = ''.join(['0' for i in range(n_digits - len(pos))]) + pos
all_alleles: str = '|'.join(alleles[i, :])
for j in range(k - 1):
ale: str = alleles[i, j]
loci[(i * (k - 1)) + j] = '\t'.join([chr, pos, all_alleles, ale])
self.entries = entries
self.loci = loci
self.genotypes = genotypes
# Sorting indices
idx_rows: np.ndarray = np.argsort(entries)
idx_cols: np.ndarray = np.argsort(loci)
# Insert the sorted data
self.entries = entries[idx_rows]
self.loci = loci[idx_cols]
self.genotypes = genotypes[np.ix_(idx_rows, idx_cols)]
self.mask = np.ones((n, pk1)).astype(np.bool)
else:
self.entries = np.array([''])
self.loci = np.array([''])
self.genotypes = np.array([[0.0]])
self.genotypes = np.array([[np.nan]])
self.mask = np.array([[False]])
return None

def __str__(self: Self) -> str:
"""
Print the contents of the class
Preview the genotype data
"""
info: str = (
'{\n\tentries: '
Expand All @@ -95,7 +101,7 @@ def __str__(self: Self) -> str:

def __hash__(self: Self) -> int:
"""
Hash all the fields in the class
Hash genotype data
"""
hash_int: int = hash(str(self.entries))
hash_int += hash(str(self.loci))
Expand All @@ -109,64 +115,162 @@ def __eq__(self: Self, other) -> bool:
"""
return hash(self) == hash(other)

def check_dimensions(self: Self) -> bool:
"""
Check the compatibilities of the dimensions of the genotype data
"""
n: int = len(self.entries)
if n != self.genotypes.shape[0]:
return False
pk1: int = len(self.loci)
if pk1 != self.genotypes.shape[1]:
return False
if self.genotypes.shape != self.mask.shape:
return False
return True

def update(
self: Self, entries: np.ndarray, loci: np.ndarray, genotypes: np.ndarray, mask: np.ndarray
) -> None:
"""
Update the contents of the genotype data with automatic sorting of the rows (entries) and columns (loci)
"""
if not self.check_dimensions():
raise IncompatibleParameters
n: int = len(entries)
if n != genotypes.shape[0]:
raise IncompatibleParameters
pk1: int = len(loci)
if pk1 != genotypes.shape[1]:
raise IncompatibleParameters
if genotypes.shape != mask.shape:
raise IncompatibleParameters
# Sorting indices
idx_rows: np.ndarray = np.argsort(entries)
idx_cols: np.ndarray = np.argsort(loci)
# Update self with the sorted data
self.entries = entries[idx_rows]
self.loci = loci[idx_cols]
self.genotypes = genotypes[np.ix_(idx_rows, idx_cols)]
self.mask = mask[np.ix_(idx_rows, idx_cols)]
return None

def slice_inplace(self: Self) -> None:
"""
Slice across all rows and columns with at least on True along both axes in self.mask mutating self
"""
if not self.check_dimensions():
raise IncompatibleParameters
idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist()
idx_loci: list[bool] = (self.mask.sum(axis=0) > 0).tolist()
self.entries = self.entries[idx_entries]
self.loci = self.loci[idx_loci]
idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j]
idx_cols: list[int] = [i for i, j in enumerate(idx_loci) if j]
self.genotypes = self.genotypes[idx_rows, :][:, idx_cols]
self.mask = self.mask[idx_rows, :][:, idx_cols]
self.update(
entries=self.entries[idx_entries],
loci=self.loci[idx_loci],
genotypes=self.genotypes[np.ix_(idx_rows, idx_cols)],
mask=self.mask[np.ix_(idx_rows, idx_cols)],
)
return None

def slice(self: Self) -> 'Genomes':
"""
Slice across all rows and columns with at least on True along both axes in self.mask returning a new a Genomes object
"""
if not self.check_dimensions():
raise IncompatibleParameters
idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist()
idx_loci: list[bool] = (self.mask.sum(axis=0) > 0).tolist()
out: Genomes = Genomes()
out.entries = self.entries[idx_entries]
out.loci = self.loci[idx_loci]
idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j]
idx_cols: list[int] = [i for i, j in enumerate(idx_loci) if j]
out.genotypes = self.genotypes[idx_rows, :][:, idx_cols]
out.mask = self.mask[idx_rows, :][:, idx_cols]
out: Genomes = Genomes()
out.update(
entries=self.entries[idx_entries],
loci=self.loci[idx_loci],
genotypes=self.genotypes[np.ix_(idx_rows, idx_cols)],
mask=self.mask[np.ix_(idx_rows, idx_cols)],
)
return out

def merge_genotype(self: Self, other: Self, conflict_resolution: tuple[float, float]) -> Self:
entries_intersection: np.ndarray = np.intersect1d(self.entries, other.entries)
loci_intersection: np.ndarray = np.intersect1d(self.loci, other.loci)
print(entries_intersection)
print(loci_intersection)
entries: np.ndarray = np.unique(np.concatenate(self.entries, other.entries))
loci: np.ndarray = np.unique(np.concatenate(self.loci, other.loci))
print(entries)
print(loci)
print(conflict_resolution)

return self
def merge_genotype(
self: Self, other: Self, conflict_resolution: tuple[float, float]
) -> 'Genomes':
"""
Merge two genotype datasets
"""
if (not self.check_dimensions()) or (not other.check_dimensions()):
raise IncompatibleParameters
# Initialise the merged genotype dataset
entries: np.ndarray = np.unique(np.concatenate((self.entries, other.entries)))
loci: np.ndarray = np.unique(np.concatenate((self.loci, other.loci)))
n: int = len(entries)
pk1: int = len(loci)
genotypes: np.ndarray = np.full(
shape=(n, pk1), fill_value=np.nan, dtype=np.float64, order='C'
)
mask: np.ndarray = np.full(shape=(n, pk1), fill_value=True, dtype=bool, order='C')
merged: Genomes = Genomes()
# Insert the data from self
_, idx_rows, idx_rows_self = np.intersect1d(entries, self.entries, return_indices=True)
_, idx_cols, idx_cols_self = np.intersect1d(loci, self.loci, return_indices=True)
genotypes[np.ix_(idx_rows, idx_cols)] = self.genotypes[np.ix_(idx_rows_self, idx_cols_self)]
mask[np.ix_(idx_rows, idx_cols)] = self.mask[np.ix_(idx_rows_self, idx_cols_self)]
# Insert the data from other
_, idx_rows, idx_rows_other = np.intersect1d(entries, other.entries, return_indices=True)
_, idx_cols, idx_cols_other = np.intersect1d(loci, other.loci, return_indices=True)
genotypes[np.ix_(idx_rows, idx_cols)] = other.genotypes[
np.ix_(idx_rows_other, idx_cols_other)
]
mask[np.ix_(idx_rows, idx_cols)] = other.mask[np.ix_(idx_rows_other, idx_cols_other)]
# Resolve intersection using a weighted mean
if np.sum(conflict_resolution) != 1.00:
sum: float = np.sum(conflict_resolution)
conflict_resolution = (conflict_resolution[0] / sum, conflict_resolution[1] / sum)
entries_intersection, idx_rows_self, _ = np.intersect1d(
other.entries, self.entries, return_indices=True
)
loci_intersection, idx_cols_self, _ = np.intersect1d(
other.loci, self.loci, return_indices=True
)
_, idx_rows, _ = np.intersect1d(entries, entries_intersection, return_indices=True)
_, idx_cols, _ = np.intersect1d(loci, loci_intersection, return_indices=True)
genotypes[np.ix_(idx_rows, idx_cols)] *= conflict_resolution[1]
genotypes[np.ix_(idx_rows, idx_cols)] += (
conflict_resolution[0] * self.genotypes[np.ix_(idx_rows_self, idx_cols_self)]
)
mask[np.ix_(idx_rows, idx_cols)] *= bool(conflict_resolution[1])
mask[np.ix_(idx_rows, idx_cols)] += (
bool(conflict_resolution[0]) * self.mask[np.ix_(idx_rows_self, idx_cols_self)]
)
# Update the merged genotype dataset
merged.update(
entries=entries,
loci=loci,
genotypes=genotypes,
mask=mask,
)
return merged


def test_genotype():
# from data.genotype import *
from data.simulation import simulate
import copy

genomes, _ = simulate()
assert isinstance(genomes, Genomes)
genomes.mask[:, :] = False
genomes.mask[1:6, 10:20] = True
sliced_genomes = copy.deepcopy(genomes)
sliced_genomes.slice_inplace()
sliced_clone = genomes.slice()
assert sliced_genomes == sliced_clone
print(sliced_genomes)
print(sliced_clone)

self = genomes
other_genomes = sliced_genomes
self, _ = simulate()
self.mask[:, :] = False
self.mask[1:6, 10:20] = True
other = copy.deepcopy(self)
other.slice_inplace()
conflict_resolution = 0.5, 0.5

assert isinstance(self, Genomes)
sliced_copy = self.slice()
assert other == sliced_copy
print(other)
print(sliced_copy)

merged = self.merge_genotype(other=other, conflict_resolution=conflict_resolution)
print(self)
print(merged)
assert merged == self
Loading

0 comments on commit ec2a238

Please sign in to comment.