Skip to content

Commit

Permalink
using mypy and pylint for type checking + will need to revise allele …
Browse files Browse the repository at this point in the history
…effect sim to accomodate for multiple correlated traits
  • Loading branch information
jeffersonfparil committed Nov 8, 2024
1 parent e4e2543 commit 7eeb936
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 74 deletions.
12 changes: 12 additions & 0 deletions data/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class LogicError(Error):
"""


class IncorrectParameter(Error):
"""
Incorrect parameter value.
"""


class IncompatibleParameters(Error):
"""
Input parameters are incompatible, e.g.
Expand All @@ -67,6 +73,12 @@ class AlleleFreqOverUnderflow(Error):
"""


class FractionOverUnderflow(Error):
"""
Fraction value expected to range from zero to one is lower than zero or higher than one.
"""


def test_error():
error1 = Error()
error2 = LogicError()
Expand Down
245 changes: 172 additions & 73 deletions data/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
# import scipy as sp
# import random as rand
import progressbar
from data.error import LogicError, IncompatibleParameters, RandomSamplingError, AlleleFreqOverUnderflow
from data.error import (
LogicError,
IncompatibleParameters,
RandomSamplingError,
AlleleFreqOverUnderflow,
)
# from data.genotype import Genomes
# from data.phenotype import Phenomes

Expand All @@ -24,7 +29,9 @@ def simulate_chromosome_lengths(m: int, s: int) -> np.ndarray:
- LogicError
"""
base_length_per_chrom: int = round(s / m)
lengths_per_chrom: np.ndarray = np.array([base_length_per_chrom for i in range(m)], dtype=np.uint64, order='C')
lengths_per_chrom: np.ndarray = np.array(
[base_length_per_chrom for i in range(m)], dtype=np.uint64, order='C'
)
if lengths_per_chrom.sum() < s:
lengths_per_chrom[-1] += s - lengths_per_chrom.sum()
elif lengths_per_chrom.sum() > s:
Expand All @@ -49,7 +56,9 @@ def simulate_chromosome_coverage(m: int, p: int) -> np.ndarray:
- LogicError
"""
base_n_loci_per_chrom: int = round(p / m)
n_loci_per_chrom: np.ndarray = np.array([base_n_loci_per_chrom for i in range(m)], dtype=np.uint64, order='C')
n_loci_per_chrom: np.ndarray = np.array(
[base_n_loci_per_chrom for i in range(m)], dtype=np.uint64, order='C'
)
if n_loci_per_chrom.sum() < p:
n_loci_per_chrom[-1] += p - n_loci_per_chrom.sum()
elif n_loci_per_chrom.sum() > p:
Expand Down Expand Up @@ -107,7 +116,9 @@ def simulate_loci_identities(
for i in range(m):
tmp_chr = ['chrom_' + str(i + 1) for j in range(n_loci_per_chrom[i])]
try:
tmp_pos = np.random.choice(a=int(lengths_per_chrom[i]), size=n_loci_per_chrom[i], replace=False)
tmp_pos = np.random.choice(
a=int(lengths_per_chrom[i]), size=n_loci_per_chrom[i], replace=False
)
except ValueError as err:
raise RandomSamplingError() from err
tmp_pos.sort()
Expand Down Expand Up @@ -162,92 +173,164 @@ def simulate_genotype_matrix(
- LogicError
- AlleleFreqOverUnderflow
"""

if (chromosomes.shape[0] != positions.shape[0]) or (chromosomes.shape[0] != alleles.shape[0]):
raise LogicError()
uniq_chrom, n_loci_per_chrom = np.unique(chromosomes, return_counts=True)
m: int = uniq_chrom.shape[0]
p: int = alleles.shape[0]
k: int = alleles.shape[1]
# Simulate mean frequencies of the first allele per locus
u: np.ndarray = np.random.beta(a=fbeta[0], b=fbeta[1], size=p).astype(dtype=np.float64, order='C')
u: np.ndarray = np.random.beta(a=fbeta[0], b=fbeta[1], size=p).astype(
dtype=np.float64, order='C'
)
# Simulate linkage by sampling from a multivariate normal distribution where the covariance matrix is a correlation matrix
# and we set linkage disequilibrium has decayed to 50% correlation at d50
# and this decay is an exponential function: corr = exp(-rate*distance).
rate: np.float64 = -1 * np.log(0.5) / np.float64(d50)
G: np.ndarray = np.empty((k, n, p), dtype=np.float64, order='C')
genotype_3d_array: np.ndarray = np.empty((k, n, p), dtype=np.float64, order='C')
for i in range(m):
idx_ini = int(n_loci_per_chrom[:i].sum())
idx_fin = int(idx_ini + n_loci_per_chrom[i])
C: np.ndarray = np.zeros((n_loci_per_chrom[i], n_loci_per_chrom[i]), dtype=np.float64, order='C')
for iC in range(n_loci_per_chrom[i]):
for jC in range(n_loci_per_chrom[i]):
pos_1 = positions[idx_ini:idx_fin][iC]
pos_2 = positions[idx_ini:idx_fin][jC]
covariance_matrix: np.ndarray = np.zeros(
(n_loci_per_chrom[i], n_loci_per_chrom[i]), dtype=np.float64, order='C'
)
for ic in range(n_loci_per_chrom[i]):
for jc in range(n_loci_per_chrom[i]):
pos_1 = positions[idx_ini:idx_fin][ic]
pos_2 = positions[idx_ini:idx_fin][jc]
if pos_1 > pos_2:
distance_bp = np.float64(pos_1 - pos_2)
else:
distance_bp = np.float64(pos_2 - pos_1)
C[iC][jC] = np.exp(-rate * distance_bp)
G[:, :, idx_ini:idx_fin] = np.random.multivariate_normal(mean=u[idx_ini:idx_fin], cov=C, size=(k, n))
covariance_matrix[ic][jc] = np.exp(-rate * distance_bp)
genotype_3d_array[:, :, idx_ini:idx_fin] = np.random.multivariate_normal(
mean=u[idx_ini:idx_fin], cov=covariance_matrix, size=(k, n)
)
# Restrict the sample allele frequencies between zero and one
idx_zeros: np.ndarray = np.ndarray(
(1, genotype_3d_array.shape[1], genotype_3d_array.shape[2]), dtype=bool
)
idx_ones: np.ndarray = np.ndarray(
(1, genotype_3d_array.shape[1], genotype_3d_array.shape[2]), dtype=bool
)
for i in range(k):
idx_zeros = G[i, :, :] > 1.00
idx_ones = G[i, :, :] < 0.00
G[i, idx_zeros] = 1.00
G[i, idx_ones] = 0.00
idx_zeros = genotype_3d_array[i, :, :] > 1.00
idx_ones = genotype_3d_array[i, :, :] < 0.00
genotype_3d_array[i, idx_zeros] = 1.00
genotype_3d_array[i, idx_ones] = 0.00
# Force allele frequencies to sum up to one per locus
for i in range(1, k):
S = G[0 : (i + 1), :, :].sum(axis=0)
idx = S > 1.00
G[i, idx] -= S[idx] - 1.00
idx_zeros = G[i, :, :] < 0.0
idx_ones = G[i, :, :] > 1.0
G[i, idx_zeros] = 0.0
G[i, idx_ones] = 1.0
allele_sums: np.ndarray = genotype_3d_array[0 : (i + 1), :, :].sum(axis=0)
idx: np.ndarray = allele_sums > 1.00
genotype_3d_array[i, idx] -= allele_sums[idx] - 1.00
idx_zeros = genotype_3d_array[i, :, :] < 0.0
idx_ones = genotype_3d_array[i, :, :] > 1.0
genotype_3d_array[i, idx_zeros] = 0.0
genotype_3d_array[i, idx_ones] = 1.0
if (i + 1) == k:
S = G.sum(axis=0)
idx = S < 1.00
G[i, idx] += 1.00 - S[idx]
if (np.abs(G.sum(axis=0) - 1.00) > 0.00001).sum() > 0:
allele_sums_k: np.ndarray = genotype_3d_array.sum(axis=0)
idx_k: np.ndarray = allele_sums_k < 1.00
genotype_3d_array[i, idx_k] += 1.00 - allele_sums_k[idx_k]
if (np.abs(genotype_3d_array.sum(axis=0) - 1.00) > 0.00001).sum() > 0:
raise AlleleFreqOverUnderflow()
# Reshape the genotype tensor into a matrix with n rows and p*(k-1) alleles
# where the k-1 alleles are adjacent each other along the columns
# and convert into the requested ploidy level
X: np.ndarray = np.round(G[0 : (k - 1), :, :].transpose(1, 2, 0).reshape(n, p * (k - 1)) * ploidy) / ploidy
return X
genotype_matrix: np.ndarray = (
np.round(
genotype_3d_array[0 : (k - 1), :, :].transpose(1, 2, 0).reshape(n, p * (k - 1)) * ploidy
)
/ ploidy
)
return genotype_matrix


def simulate_allele_effects(X: np.ndarray, a: int, fnorm: tuple[float, float]) -> np.ndarray:
"""
Simulate allele effects
Parameters
----------
- X: matrix (nx(p*(k-1)); np.float64) of allele frequencies across n genotypes and p*(k-1) alleles,
where the k-1 alleles per locus are adjacent each other along the columns
- a: total number of alleles (i.e. columns in X) with non-zero effects
- fnorm: tuple containing the mean and standard deviation of the Normally distributed non-zero allele effects
Returns
-------
- np.ndarray: vector (pk1x0; np.float64) of allele effects corresponding to each column in X
Raises
------
- RandomSamplingError
"""
pk1: int = X.shape[1]
b: np.ndarray = np.zeros(pk1, dtype=np.float64, order='C')
try:
idx: np.ndarray = np.random.choice(a=pk1, size=a, replace=False)
except ValueError as err:
raise RandomSamplingError() from err
try:
b[idx] = np.random.normal(loc=fnorm[0], scale=-1, size=a)
except ValueError as err:
raise RandomSamplingError() from err
return b
# TODO: Revise allele effects simulation for multiple correlated or uncorrelated traits and where effects may come from various distributions
# def simulate_allele_effects(
# genotype_matrix: np.ndarray,
# n_effects: list[int],
# fnorm: list[tuple[float, float]],
# foverlap: float | np.ndarray,
# ) -> np.ndarray:
# """
# Simulate allele effects
# Parameters
# ----------
# - genotype_matrix: matrix (nx(p*(k-1)); np.float64) of allele frequencies across n genotypes and p*(k-1) alleles,
# where the k-1 alleles per locus are adjacent each other along the columns
# - n_effects: list of integers, each corresponding to the total number of alleles (i.e. columns in genotype_matrix) with non-zero effects on one trait
# - fnorm: list of tuples of floats, each containing the mean and standard deviation of the Normally distributed non-zero allele effects one a one trait
# - foverlap: a float or a matrix (len(a) x len(a)) of floats from zero to one corresponding to the fraction of overlaps in the alleles between each pair of traits
# relative to the trait with less non-zero effect alleles. For instance, if trait_1 has 10 non-zero effect alleles, trait_2 has 5, and the fraction
# of overlap is 0.5, then the two traits share 3 allele: round(0.5*5).
# Returns
# -------
# - np.ndarray: matrix (pk1 x len(n_effects); np.float64) of allele effects per trait simulation
# Raises
# ------
# - IncorrectParameter
# - IncompatibleParameters
# - FractionOverUnderflow
# - RandomSamplingError
# """
# pk1: int = genotype_matrix.shape[1]
# t: int = len(n_effects)
# if t != len(fnorm):
# raise IncompatibleParameters
# if isinstance(foverlap, np.ndarray):
# foverlap_matrix: np.ndarray = foverlap
# else:
# foverlap_matrix = np.full((t, t), fill_value=foverlap, dtype=np.float64)
# np.fill_diagonal(foverlap_matrix, 1.00)
# for i in range(t):
# for j in range(t):
# if (i==j) and (foverlap_matrix[i, j] != 1.00):
# raise IncorrectParameter()
# if (foverlap_matrix[i, j] < 0.0) or (foverlap_matrix[i, j] > 1.0):
# raise FractionOverUnderflow()
# indexes: list[list[int]] = [[] for j in range(t)]
# allele_effects: np.ndarray = np.zeros((pk1, t), dtype=np.float64, order='C')
# # Iterate per trait with the number of non-zero effect alleles decreasing each step so that we can sample overlapping alleles
# sorted_trait_indexes: np.ndarray = np.argsort(np.max(n_effects) - n_effects)
# for ix in range(t):
# i = sorted_trait_indexes[ix]
# a: int = n_effects[i]
# try:
# idx: np.ndarray = np.random.choice(a=pk1, size=a, replace=False)
# indexes[i].extend(idx)
# except ValueError as err:
# raise RandomSamplingError() from err
# for jx in range(ix):


# try:
# allele_effects[idx,i] = np.random.normal(loc=fnorm[i][0], scale=fnorm[i][1], size=a)
# except ValueError as err:
# raise RandomSamplingError() from err
# return allele_effects


# def simulate_phenotype_matrix(genotype_matrix: np.ndarray, allele_effects: np.ndarray, heritabilities: list[float]) -> np.ndarray:
# """
# Simulate phenotypes from a genotype matrix and an allele effects vector
# """
# n: int = genotype_matrix.shape[0]
# if genotype_matrix.shape[1] != allele_effects.shape[0]:
# raise IncompatibleParameters()
# r: int = len(heritabilities)
# phenotype_matrix: np.ndarray = np.zeros((n, r), dtype=np.float64, order='C')
# for i in range(r):
# y: np.ndarray = genotype_matrix @ allele_effects
# variance_of_y: float = y.var(axis=0)
# return phenotype_matrix


def test_simulation() -> None:
"""
For Pytest
data.simulation submodule tests
"""
# Input parameters
p: int = 500 # number of loci
m: int = 5 # number of chromosomes
s: int = 5_000_000 # total size of the genome in base-pairs
Expand All @@ -259,27 +342,38 @@ def test_simulation() -> None:
) # shape parameters of the Beta distributed mean allele frequencies
d50 = 100_000 # distance in base-pairs at which correlation between loci is 50%
ploidy = 100 # number of haploid genomes per genotype (e.g. 100 means there maybe 50 diploid individuals per entry)
a: int = int(np.ceil(p * (k - 1) * 0.01)) # number of alleles with non-zero additive effects
fnorm: tuple[float, float] = (
np.pi,
1.00,
) # mean and standard deviation of the Normally distributed non-zero additive allele effects
# n_effects: list[int] = [10, 5, 7] # number of alleles with non-zero additive effects
# foverlap: float = 0.5
# fnorm: list[tuple[float, float]] = [
# (0.00, 1.00),
# (10.00, 2.00),
# (np.pi, 1.00),
# ] # mean and standard deviation of the Normally distributed non-zero additive allele effects
# heritabilities: list[float] = [
# 0.1,
# 0.5,
# 0.9,
# ] # narrow-sense heritabilities for each replication
# Chromosome lengths
lengths_per_chrom = simulate_chromosome_lengths(m=m, s=s)
assert lengths_per_chrom.shape == (m,)
assert lengths_per_chrom[1] == s / m
assert lengths_per_chrom[-1] == s / m
# Chromosome coverage
n_loci_per_chrom = simulate_chromosome_coverage(m=m, p=p)
assert n_loci_per_chrom.shape == (m,)
assert n_loci_per_chrom[1] == p / m
assert n_loci_per_chrom[-1] == p / m
# Loci identities
chromosomes, positions, alleles = simulate_loci_identities(
lengths_per_chrom=lengths_per_chrom, n_loci_per_chrom=n_loci_per_chrom, k=k
)
assert chromosomes.shape[0] == p
assert positions.shape[0] == p
assert alleles.shape[0] == p
assert alleles.shape[1] == k
X = simulate_genotype_matrix(
# Genotype matrix
genotype_matrix = simulate_genotype_matrix(
n=n,
chromosomes=chromosomes,
positions=positions,
Expand All @@ -288,31 +382,36 @@ def test_simulation() -> None:
d50=d50,
ploidy=ploidy,
)
assert X.shape[0] == n
assert X.shape[1] == p * (k - 1)
assert genotype_matrix.shape[0] == n
assert genotype_matrix.shape[1] == p * (k - 1)
assert (
(X < 1.00 / ploidy) * (X != 0.0)
).sum() == 0 # respects the minimum allowed allele frequency above zero as dictated by the ploidy
((genotype_matrix < 1.00 / ploidy) * (genotype_matrix != 0.0)).sum() == 0
) # respects the minimum allowed allele frequency above zero as dictated by the ploidy
assert (
(X > 1.00 - (1.00 / ploidy)) * (X != 1.0)
).sum() == 0 # respects the maximum allowed allele frequency below one as dictated by the ploidy
((genotype_matrix > 1.00 - (1.00 / ploidy)) * (genotype_matrix != 1.0)).sum() == 0
) # respects the maximum allowed allele frequency below one as dictated by the ploidy
# allele_effects: np.ndarray = simulate_allele_effects(
# genotype_matrix=genotype_matrix, a=a, fnorm=fnorm
# )
# assert allele_effects.shape[0] == genotype_matrix.shape[1]
# assert (allele_effects != 0.0).sum() == a


# def visualise_simulation_output():
# import matplotlib.pyplot as plt
# from scipy.stats import gaussian_kde

# density = gaussian_kde(
# X.reshape(
# X.shape[0] * X.shape[1],
# genotype_matrix.reshape(
# genotype_matrix.shape[0] * genotype_matrix.shape[1],
# )
# )
# xs = np.linspace(0, 1, 200)
# plt.figure(figsize=(14, 8))
# plt.plot(xs, density(xs))
# plt.show()

# C = np.corrcoef(X.transpose())
# C = np.corrcoef(genotype_matrix.transpose())
# C.shape
# plt.imshow(C)
# plt.show()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
]

[tool.ruff]
line-length = 120
line-length = 100
indent-width = 4
fix = true
show-fixes = true
Expand Down

0 comments on commit 7eeb936

Please sign in to comment.