diff --git a/data/genotype.py b/data/genotype.py index 4d4627d..5b4c5e8 100644 --- a/data/genotype.py +++ b/data/genotype.py @@ -55,27 +55,33 @@ def __init__( if (p != chromosomes.shape[0]) or (p != positions.shape[0]) or (p != alleles.shape[0]): raise IncompatibleParameters loci: np.ndarray = np.full((pk1,), fill_value='', dtype=' str: """ - Print the contents of the class + Preview the genotype data """ info: str = ( '{\n\tentries: ' @@ -95,7 +101,7 @@ def __str__(self: Self) -> str: def __hash__(self: Self) -> int: """ - Hash all the fields in the class + Hash genotype data """ hash_int: int = hash(str(self.entries)) hash_int += hash(str(self.loci)) @@ -109,47 +115,141 @@ def __eq__(self: Self, other) -> bool: """ return hash(self) == hash(other) + def check_dimensions(self: Self) -> bool: + """ + Check the compatibilities of the dimensions of the genotype data + """ + n: int = len(self.entries) + if n != self.genotypes.shape[0]: + return False + pk1: int = len(self.loci) + if pk1 != self.genotypes.shape[1]: + return False + if self.genotypes.shape != self.mask.shape: + return False + return True + + def update( + self: Self, entries: np.ndarray, loci: np.ndarray, genotypes: np.ndarray, mask: np.ndarray + ) -> None: + """ + Update the contents of the genotype data with automatic sorting of the rows (entries) and columns (loci) + """ + if not self.check_dimensions(): + raise IncompatibleParameters + n: int = len(entries) + if n != genotypes.shape[0]: + raise IncompatibleParameters + pk1: int = len(loci) + if pk1 != genotypes.shape[1]: + raise IncompatibleParameters + if genotypes.shape != mask.shape: + raise IncompatibleParameters + # Sorting indices + idx_rows: np.ndarray = np.argsort(entries) + idx_cols: np.ndarray = np.argsort(loci) + # Update self with the sorted data + self.entries = entries[idx_rows] + self.loci = loci[idx_cols] + self.genotypes = genotypes[np.ix_(idx_rows, idx_cols)] + self.mask = mask[np.ix_(idx_rows, idx_cols)] + return None + def slice_inplace(self: Self) -> None: """ Slice across all rows and columns with at least on True along both axes in self.mask mutating self """ + if not self.check_dimensions(): + raise IncompatibleParameters idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist() idx_loci: list[bool] = (self.mask.sum(axis=0) > 0).tolist() - self.entries = self.entries[idx_entries] - self.loci = self.loci[idx_loci] idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j] idx_cols: list[int] = [i for i, j in enumerate(idx_loci) if j] - self.genotypes = self.genotypes[idx_rows, :][:, idx_cols] - self.mask = self.mask[idx_rows, :][:, idx_cols] + self.update( + entries=self.entries[idx_entries], + loci=self.loci[idx_loci], + genotypes=self.genotypes[np.ix_(idx_rows, idx_cols)], + mask=self.mask[np.ix_(idx_rows, idx_cols)], + ) return None def slice(self: Self) -> 'Genomes': """ Slice across all rows and columns with at least on True along both axes in self.mask returning a new a Genomes object """ + if not self.check_dimensions(): + raise IncompatibleParameters idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist() idx_loci: list[bool] = (self.mask.sum(axis=0) > 0).tolist() - out: Genomes = Genomes() - out.entries = self.entries[idx_entries] - out.loci = self.loci[idx_loci] idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j] idx_cols: list[int] = [i for i, j in enumerate(idx_loci) if j] - out.genotypes = self.genotypes[idx_rows, :][:, idx_cols] - out.mask = self.mask[idx_rows, :][:, idx_cols] + out: Genomes = Genomes() + out.update( + entries=self.entries[idx_entries], + loci=self.loci[idx_loci], + genotypes=self.genotypes[np.ix_(idx_rows, idx_cols)], + mask=self.mask[np.ix_(idx_rows, idx_cols)], + ) return out - def merge_genotype(self: Self, other: Self, conflict_resolution: tuple[float, float]) -> Self: - entries_intersection: np.ndarray = np.intersect1d(self.entries, other.entries) - loci_intersection: np.ndarray = np.intersect1d(self.loci, other.loci) - print(entries_intersection) - print(loci_intersection) - entries: np.ndarray = np.unique(np.concatenate(self.entries, other.entries)) - loci: np.ndarray = np.unique(np.concatenate(self.loci, other.loci)) - print(entries) - print(loci) - print(conflict_resolution) - - return self + def merge_genotype( + self: Self, other: Self, conflict_resolution: tuple[float, float] + ) -> 'Genomes': + """ + Merge two genotype datasets + """ + if (not self.check_dimensions()) or (not other.check_dimensions()): + raise IncompatibleParameters + # Initialise the merged genotype dataset + entries: np.ndarray = np.unique(np.concatenate((self.entries, other.entries))) + loci: np.ndarray = np.unique(np.concatenate((self.loci, other.loci))) + n: int = len(entries) + pk1: int = len(loci) + genotypes: np.ndarray = np.full( + shape=(n, pk1), fill_value=np.nan, dtype=np.float64, order='C' + ) + mask: np.ndarray = np.full(shape=(n, pk1), fill_value=True, dtype=bool, order='C') + merged: Genomes = Genomes() + # Insert the data from self + _, idx_rows, idx_rows_self = np.intersect1d(entries, self.entries, return_indices=True) + _, idx_cols, idx_cols_self = np.intersect1d(loci, self.loci, return_indices=True) + genotypes[np.ix_(idx_rows, idx_cols)] = self.genotypes[np.ix_(idx_rows_self, idx_cols_self)] + mask[np.ix_(idx_rows, idx_cols)] = self.mask[np.ix_(idx_rows_self, idx_cols_self)] + # Insert the data from other + _, idx_rows, idx_rows_other = np.intersect1d(entries, other.entries, return_indices=True) + _, idx_cols, idx_cols_other = np.intersect1d(loci, other.loci, return_indices=True) + genotypes[np.ix_(idx_rows, idx_cols)] = other.genotypes[ + np.ix_(idx_rows_other, idx_cols_other) + ] + mask[np.ix_(idx_rows, idx_cols)] = other.mask[np.ix_(idx_rows_other, idx_cols_other)] + # Resolve intersection using a weighted mean + if np.sum(conflict_resolution) != 1.00: + sum: float = np.sum(conflict_resolution) + conflict_resolution = (conflict_resolution[0] / sum, conflict_resolution[1] / sum) + entries_intersection, idx_rows_self, _ = np.intersect1d( + other.entries, self.entries, return_indices=True + ) + loci_intersection, idx_cols_self, _ = np.intersect1d( + other.loci, self.loci, return_indices=True + ) + _, idx_rows, _ = np.intersect1d(entries, entries_intersection, return_indices=True) + _, idx_cols, _ = np.intersect1d(loci, loci_intersection, return_indices=True) + genotypes[np.ix_(idx_rows, idx_cols)] *= conflict_resolution[1] + genotypes[np.ix_(idx_rows, idx_cols)] += ( + conflict_resolution[0] * self.genotypes[np.ix_(idx_rows_self, idx_cols_self)] + ) + mask[np.ix_(idx_rows, idx_cols)] *= bool(conflict_resolution[1]) + mask[np.ix_(idx_rows, idx_cols)] += ( + bool(conflict_resolution[0]) * self.mask[np.ix_(idx_rows_self, idx_cols_self)] + ) + # Update the merged genotype dataset + merged.update( + entries=entries, + loci=loci, + genotypes=genotypes, + mask=mask, + ) + return merged def test_genotype(): @@ -157,16 +257,20 @@ def test_genotype(): from data.simulation import simulate import copy - genomes, _ = simulate() - assert isinstance(genomes, Genomes) - genomes.mask[:, :] = False - genomes.mask[1:6, 10:20] = True - sliced_genomes = copy.deepcopy(genomes) - sliced_genomes.slice_inplace() - sliced_clone = genomes.slice() - assert sliced_genomes == sliced_clone - print(sliced_genomes) - print(sliced_clone) - - self = genomes - other_genomes = sliced_genomes + self, _ = simulate() + self.mask[:, :] = False + self.mask[1:6, 10:20] = True + other = copy.deepcopy(self) + other.slice_inplace() + conflict_resolution = 0.5, 0.5 + + assert isinstance(self, Genomes) + sliced_copy = self.slice() + assert other == sliced_copy + print(other) + print(sliced_copy) + + merged = self.merge_genotype(other=other, conflict_resolution=conflict_resolution) + print(self) + print(merged) + assert merged == self diff --git a/data/phenotype.py b/data/phenotype.py index 5e04d9b..e7f8fc7 100644 --- a/data/phenotype.py +++ b/data/phenotype.py @@ -1,37 +1,175 @@ import numpy as np +from typing import Self from data.error import IncompatibleParameters class Phenomes: """ Phenotype data - n: number of entries (an entry maybe an individual diploid genotype, or a tetraploid genotype or a pool of 50 diploid genotypes) + n: number of entries (an entry maybe an individual diploid genotype, or a tetraploid genotype or a pool of 50 diploid phenotypes) ntraits: number of traits entries: vector (nx0; str) of entry names traits: vector (ntraitsx0; str) of trait names - genotypes: matrix (nxntraits); np.float64) of phenotype data + phenotypes: matrix (nxntraits); np.float64) of phenotype data mask: matrix (nxntraits); np.bool) of boolean mask """ - n: int entries: np.ndarray traits: np.ndarray phenotypes: np.ndarray mask: np.ndarray - def __init__(self, entries: np.ndarray, traits: np.ndarray, phenotypes: np.ndarray) -> None: - n: int = entries.shape[0] + def __init__( + self, + entries: np.ndarray | None = None, + traits: np.ndarray | None = None, + phenotypes: np.ndarray | None = None, + ) -> None: + """ + Initialise Phenomes with or without data + """ + if ( + isinstance(entries, np.ndarray) + and isinstance(traits, np.ndarray) + and isinstance(phenotypes, np.ndarray) + ): + n: int = entries.shape[0] + if n != phenotypes.shape[0]: + raise IncompatibleParameters + ntraits: int = traits.shape[0] + if ntraits != phenotypes.shape[1]: + raise IncompatibleParameters + # Sorting indices + idx_rows: np.ndarray = np.argsort(entries) + idx_cols: np.ndarray = np.argsort(traits) + # Insert the sorted data + self.entries = entries[idx_rows] + self.traits = traits[idx_cols] + self.phenotypes = phenotypes[np.ix_(idx_rows, idx_cols)] + self.mask = np.ones((n, ntraits)).astype(np.bool) + else: + self.entries = np.array(['']) + self.traits = np.array(['']) + self.phenotypes = np.array([[np.nan]]) + self.mask = np.array([[False]]) + return None + + def __str__(self: Self) -> str: + """ + Preview the phenotype data + """ + info: str = ( + '{\n\tentries: ' + + str(self.entries) + + '\n\t' + + 'traits: ' + + str(self.traits) + + '\n\t' + + 'phenotypes: ' + + str(self.phenotypes) + + '\n\t' + + 'mask: ' + + str(self.mask) + + '\n}' + ) + return info + + def __hash__(self: Self) -> int: + """ + Hash phenotype data + """ + hash_int: int = hash(str(self.entries)) + hash_int += hash(str(self.traits)) + hash_int += hash(str(self.phenotypes)) + hash_int += hash(str(self.mask)) + return hash_int + + def __eq__(self: Self, other) -> bool: + """ + Equality test using hashes + """ + return hash(self) == hash(other) + + def check_dimensions(self: Self) -> bool: + """ + Check the compatibilities of the dimensions of the phenotype data + """ + n: int = len(self.entries) + if n != self.phenotypes.shape[0]: + return False + pk1: int = len(self.traits) + if pk1 != self.phenotypes.shape[1]: + return False + if self.phenotypes.shape != self.mask.shape: + return False + return True + + def update( + self: Self, + entries: np.ndarray, + traits: np.ndarray, + phenotypes: np.ndarray, + mask: np.ndarray, + ) -> None: + """ + Update the contents of the phenotype data with automatic sorting of the rows (entries) and columns (traits) + """ + if not self.check_dimensions(): + raise IncompatibleParameters + n: int = len(entries) if n != phenotypes.shape[0]: raise IncompatibleParameters - ntraits: int = traits.shape[0] - if ntraits != phenotypes.shape[1]: + pk1: int = len(traits) + if pk1 != phenotypes.shape[1]: + raise IncompatibleParameters + if phenotypes.shape != mask.shape: + raise IncompatibleParameters + # Sorting indices + idx_rows: np.ndarray = np.argsort(entries) + idx_cols: np.ndarray = np.argsort(traits) + # Update self with the sorted data + self.entries = entries[idx_rows] + self.traits = traits[idx_cols] + self.phenotypes = phenotypes[np.ix_(idx_rows, idx_cols)] + self.mask = mask[np.ix_(idx_rows, idx_cols)] + return None + + def slice_inplace(self: Self) -> None: + """ + Slice across all rows and columns with at least on True along both axes in self.mask mutating self + """ + if not self.check_dimensions(): + raise IncompatibleParameters + idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist() + idx_traits: list[bool] = (self.mask.sum(axis=0) > 0).tolist() + idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j] + idx_cols: list[int] = [i for i, j in enumerate(idx_traits) if j] + self.update( + entries=self.entries[idx_entries], + traits=self.traits[idx_traits], + phenotypes=self.phenotypes[np.ix_(idx_rows, idx_cols)], + mask=self.mask[np.ix_(idx_rows, idx_cols)], + ) + return None + + def slice(self: Self) -> 'Phenomes': + """ + Slice across all rows and columns with at least on True along both axes in self.mask returning a new a Phenomes object + """ + if not self.check_dimensions(): raise IncompatibleParameters - self.n = n - self.ntraits = ntraits - self.entries = entries - self.traits = traits - self.phenotypes = phenotypes - self.mask = np.ones((n, ntraits)).astype(np.bool) + idx_entries: list[bool] = (self.mask.sum(axis=1) > 0).tolist() + idx_traits: list[bool] = (self.mask.sum(axis=0) > 0).tolist() + idx_rows: list[int] = [i for i, j in enumerate(idx_entries) if j] + idx_cols: list[int] = [i for i, j in enumerate(idx_traits) if j] + out: Phenomes = Phenomes() + out.update( + entries=self.entries[idx_entries], + traits=self.traits[idx_traits], + phenotypes=self.phenotypes[np.ix_(idx_rows, idx_cols)], + mask=self.mask[np.ix_(idx_rows, idx_cols)], + ) + return out def test_phenomes():