Skip to content

Commit

Permalink
add canonical link function to binomial distribution and pearson resi…
Browse files Browse the repository at this point in the history
…dual function
  • Loading branch information
zhengp0 committed Dec 26, 2024
1 parent d728c69 commit f1b1d9f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 90 deletions.
137 changes: 49 additions & 88 deletions src/regmod/models/binomial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
Binomial Model
"""

from functools import cached_property

import numpy as np
from scipy.stats import binom

from regmod._typing import Callable, DataFrame, NDArray
from regmod.data import Data
from regmod.optimizer import msca_optimize
from regmod._typing import Callable, NDArray, DataFrame

from .model import Model
from .utils import model_post_init
Expand All @@ -30,116 +32,75 @@ def attach_df(self, df: DataFrame):
self.mat[0], self.uvec, self.linear_umat, self.linear_uvec
)

def objective(self, coefs: NDArray) -> float:
"""Objective function.
Parameters
----------
coefs : NDArray
Given coefficients.
def get_lin_param(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
lin_param = mat.dot(coefs)
if self.params[0].offset is not None:
lin_param += self.data.get_cols(self.params[0].offset)
return lin_param

@cached_property
def hessian_from_gprior(self) -> NDArray:
"""Hessian matrix from the Gaussian prior.
Returns
-------
float
Objective value.
Matrix
Hessian matrix.
"""
inv_link = self.params[0].inv_link
lin_param = self.params[0].get_lin_param(coefs, self.data, mat=self.mat[0])
param = inv_link.fun(lin_param)
hess = np.diag(1.0 / self.gvec[1] ** 2)
if self.linear_gvec.size > 0:
hess += (self.linear_gmat.T.scale_cols(1.0 / self.linear_gvec[1] ** 2)).dot(
self.linear_gmat
)
return hess

def objective(self, coefs: NDArray) -> float:
weights = self.data.weights * self.data.trim_weights
obj_param = -weights * (
self.data.obs * np.log(param) + (1 - self.data.obs) * np.log(1 - param)
)
return obj_param.sum() + self.objective_from_gprior(coefs)
y = self.get_lin_param(coefs)

def gradient(self, coefs: NDArray) -> NDArray:
"""Gradient function.
prior_obj = self.objective_from_gprior(coefs)
likli_obj = weights.dot(np.log(1 + np.exp(-y)) + (1 - self.data.obs) * y)
return prior_obj + likli_obj

Parameters
----------
coefs : NDArray
Given coefficients.
Returns
-------
NDArray
Gradient vector.
"""
def gradient(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
inv_link = self.params[0].inv_link
lin_param = self.params[0].get_lin_param(coefs, self.data, mat=self.mat[0])
param = inv_link.fun(lin_param)
dparam = inv_link.dfun(lin_param)

weights = self.data.weights * self.data.trim_weights
grad_param = weights * (
(param - self.data.obs) / (param * (1 - param)) * dparam
)
z = np.exp(self.get_lin_param(coefs))

return mat.T.dot(grad_param) + self.gradient_from_gprior(coefs)
prior_grad = self.gradient_from_gprior(coefs)
likli_grad = mat.T.dot(weights * (z / (1 + z) - self.data.obs))
return prior_grad + likli_grad

def hessian(self, coefs: NDArray) -> NDArray:
"""Hessian function.
mat = self.mat[0]
weights = self.data.weights * self.data.trim_weights
z = np.exp(self.get_lin_param(coefs))
likli_hess_scale = weights * (z / ((1 + z) ** 2))

Parameters
----------
coefs : NDArray
Given coefficients.
likli_hess_right = mat.scale_rows(likli_hess_scale)
likli_hess = mat.T.dot(likli_hess_right)

Returns
-------
NDArray
Hessian matrix.
return self.hessian_from_gprior + likli_hess

"""
def jacobian2(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
inv_link = self.params[0].inv_link
lin_param = self.params[0].get_lin_param(coefs, self.data, mat=self.mat[0])
param = inv_link.fun(lin_param)
dparam = inv_link.dfun(lin_param)
d2param = inv_link.d2fun(lin_param)

weights = self.data.weights * self.data.trim_weights
hess_param = weights * (
(self.data.obs / param**2 + (1 - self.data.obs) / (1 - param) ** 2)
* dparam**2
+ (param - self.data.obs) / (param * (1 - param)) * d2param
)
z = np.exp(self.get_lin_param(coefs))
likli_jac_scale = weights * (z / (1 + z) - self.data.obs)

scaled_mat = mat.scale_rows(hess_param)
hess_mat = mat.T.dot(scaled_mat)
hess_mat_gprior = type(hess_mat)(self.hessian_from_gprior())
return hess_mat + hess_mat_gprior
likli_jac = mat.T.scale_cols(likli_jac_scale)
likli_jac2 = likli_jac.dot(likli_jac.T)
return self.hessian_from_gprior + likli_jac2

def jacobian2(self, coefs: NDArray) -> NDArray:
"""Jacobian function.
def get_pearson_residuals(self, coefs: NDArray) -> NDArray:
z = np.exp(self.get_lin_param(coefs))

Parameters
----------
coefs : NDArray
Given coefficients.
Returns
-------
NDArray
Jacobian matrix.
pred = z / (1 + z)
pred_sd = np.sqrt(pred * (1 - pred) / self.data.weights)

"""
mat = self.mat[0]
inv_link = self.params[0].inv_link
lin_param = self.params[0].get_lin_param(coefs, self.data, mat=self.mat[0])
param = inv_link.fun(lin_param)
dparam = inv_link.dfun(lin_param)
weights = self.data.weights * self.data.trim_weights
grad_param = weights * (
(param - self.data.obs) / (param * (1 - param)) * dparam
)
jacobian = mat.T.scale_cols(grad_param)
hess_mat_gprior = type(jacobian)(self.hessian_from_gprior())
jacobian2 = jacobian.dot(jacobian.T) + hess_mat_gprior
return jacobian2
return (self.data.obs - pred) / pred_sd

def fit(self, optimizer: Callable = msca_optimize, **optimizer_options):
"""Fit function.
Expand Down
6 changes: 4 additions & 2 deletions src/regmod/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
"""

import numpy as np

from scipy.linalg import block_diag
from scipy.sparse import csc_matrix

from regmod._typing import Callable, DataFrame, Matrix, NDArray
from regmod.data import Data
from regmod.optimizer import scipy_optimize
from regmod.parameter import Parameter
from regmod.utils import sizes_to_slices
from regmod._typing import Callable, NDArray, DataFrame, Matrix


class Model:
Expand Down Expand Up @@ -430,6 +429,9 @@ def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray:
"""
raise NotImplementedError()

def get_pearson_residuals(self, coefs: NDArray) -> NDArray:
raise NotImplementedError()

def detect_outliers(self, coefs: NDArray, bounds: tuple[float, float]) -> NDArray:
"""Detect outliers.
Expand Down

0 comments on commit f1b1d9f

Please sign in to comment.