diff --git a/src/regmod/models/__init__.py b/src/regmod/models/__init__.py index 052e4b8..4ec1c81 100644 --- a/src/regmod/models/__init__.py +++ b/src/regmod/models/__init__.py @@ -3,7 +3,7 @@ """ from .binomial import BinomialModel, CanonicalBinomialModel, create_binomial_model -from .gaussian import GaussianModel +from .gaussian import CanonicalGaussianModel, GaussianModel, create_gaussian_model from .model import Model from .negativebinomial import NegativeBinomialModel from .pogit import PogitModel diff --git a/src/regmod/models/gaussian.py b/src/regmod/models/gaussian.py index 9067a8e..f1f270e 100644 --- a/src/regmod/models/gaussian.py +++ b/src/regmod/models/gaussian.py @@ -6,10 +6,11 @@ from scipy.stats import norm from regmod._typing import Callable, DataFrame, Matrix, NDArray +from regmod.data import Data from regmod.optimizer import msca_optimize from .model import Model -from .utils import model_post_init +from .utils import get_params, model_post_init class GaussianModel(Model): @@ -31,6 +32,13 @@ def attach_df(self, df: DataFrame): def hessian_from_gprior(self) -> Matrix: return self.hmat + def get_lin_param(self, coefs: NDArray) -> NDArray: + mat = self.mat[0] + lin_param = mat.dot(coefs) + if self.params[0].offset is not None: + lin_param += self.data.get_cols(self.params[0].offset) + return lin_param + def objective(self, coefs: NDArray) -> float: """Objective function. Parameters @@ -158,3 +166,62 @@ def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray: norm.ppf(bounds[0], loc=mean, scale=sd), norm.ppf(bounds[1], loc=mean, scale=sd), ] + + +class CanonicalGaussianModel(GaussianModel): + def __init__(self, data: Data, **kwargs): + super().__init__(data, **kwargs) + if self.params[0].inv_link.name != "identity": + raise ValueError( + "Canonical Gaussian model requires inverse link to be identity." + ) + + def objective(self, coefs: NDArray) -> float: + weights = self.data.weights * self.data.trim_weights + y = self.get_lin_param(coefs) + + prior_obj = self.objective_from_gprior(coefs) + likli_obj = 0.5 * weights.dot((y - self.data.obs) ** 2) + return prior_obj + likli_obj + + def gradient(self, coefs: NDArray) -> NDArray: + mat = self.mat[0] + weights = self.data.weights * self.data.trim_weights + y = self.get_lin_param(coefs) + + prior_grad = self.gradient_from_gprior(coefs) + likli_grad = mat.T.dot(weights * (y - self.data.obs)) + return prior_grad + likli_grad + + def hessian(self, coefs: NDArray) -> Matrix: + mat = self.mat[0] + weights = self.data.weights * self.data.trim_weights + likli_hess_scale = weights + + prior_hess = self.hessian_from_gprior() + likli_hess_right = mat.scale_rows(likli_hess_scale) + likli_hess = mat.T.dot(likli_hess_right) + + return prior_hess + likli_hess + + def jacobian2(self, coefs: NDArray) -> NDArray: + mat = self.mat[0] + weights = self.data.weights * self.data.trim_weights + y = self.get_lin_param(coefs) + likli_jac_scale = weights * (y - self.data.obs) + + likli_jac = mat.T.scale_cols(likli_jac_scale) + likli_jac2 = likli_jac.dot(likli_jac.T) + return self.hessian_from_gprior() + likli_jac2 + + +def create_gaussian_model(data: Data, **kwargs) -> GaussianModel: + params = get_params( + params=kwargs.get("params"), + param_specs=kwargs.get("param_specs"), + default_param_specs=GaussianModel.default_param_specs, + ) + + if params[0].inv_link.name == "identity": + return CanonicalGaussianModel(data, params=params) + return GaussianModel(data, params=params) diff --git a/tests/test_gaussianmodel.py b/tests/test_gaussianmodel.py index 4e85392..476ab18 100644 --- a/tests/test_gaussianmodel.py +++ b/tests/test_gaussianmodel.py @@ -1,15 +1,19 @@ """ Test Gaussian Model """ + import numpy as np import pandas as pd import pytest - from regmod.data import Data from regmod.function import fun_dict -from regmod.models import GaussianModel -from regmod.prior import (GaussianPrior, SplineGaussianPrior, - SplineUniformPrior, UniformPrior) +from regmod.models import create_gaussian_model +from regmod.prior import ( + GaussianPrior, + SplineGaussianPrior, + SplineUniformPrior, + UniformPrior, +) from regmod.utils import SplineSpecs from regmod.variable import SplineVariable, Variable @@ -19,14 +23,14 @@ @pytest.fixture def data(): num_obs = 5 - df = pd.DataFrame({ - "obs": np.random.randn(num_obs), - "cov0": np.random.randn(num_obs), - "cov1": np.random.randn(num_obs) - }) - return Data(col_obs="obs", - col_covs=["cov0", "cov1"], - df=df) + df = pd.DataFrame( + { + "obs": np.random.randn(num_obs), + "cov0": np.random.randn(num_obs), + "cov1": np.random.randn(num_obs), + } + ) + return Data(col_obs="obs", col_covs=["cov0", "cov1"], df=df) @pytest.fixture @@ -41,9 +45,9 @@ def uprior(): @pytest.fixture def spline_specs(): - return SplineSpecs(knots=np.linspace(0.0, 1.0, 5), - degree=3, - knots_type="rel_domain") + return SplineSpecs( + knots=np.linspace(0.0, 1.0, 5), degree=3, knots_type="rel_domain" + ) @pytest.fixture @@ -58,20 +62,21 @@ def spline_uprior(): @pytest.fixture def var_cov0(gprior, uprior): - return Variable(name="cov0", - priors=[gprior, uprior]) + return Variable(name="cov0", priors=[gprior, uprior]) @pytest.fixture def var_cov1(spline_gprior, spline_uprior, spline_specs): - return SplineVariable(name="cov1", - spline_specs=spline_specs, - priors=[spline_gprior, spline_uprior]) + return SplineVariable( + name="cov1", spline_specs=spline_specs, priors=[spline_gprior, spline_uprior] + ) @pytest.fixture def model(data, var_cov0, var_cov1): - return GaussianModel(data, param_specs={"mu": {"variables": [var_cov0, var_cov1]}}) + return create_gaussian_model( + data, param_specs={"mu": {"variables": [var_cov0, var_cov1]}} + ) def test_model_result(model): @@ -117,7 +122,7 @@ def test_model_gradient(model, inv_link): tr_grad = np.zeros(model.size) for i in range(model.size): coefs_c[i] += 1e-16j - tr_grad[i] = model.objective(coefs_c).imag/1e-16 + tr_grad[i] = model.objective(coefs_c).imag / 1e-16 coefs_c[i] -= 1e-16j assert np.allclose(my_grad, tr_grad) @@ -132,7 +137,7 @@ def test_model_hessian(model, inv_link): for i in range(model.size): for j in range(model.size): coefs_c[j] += 1e-16j - tr_hess[i][j] = model.gradient(coefs_c).imag[i]/1e-16 + tr_hess[i][j] = model.gradient(coefs_c).imag[i] / 1e-16 coefs_c[j] -= 1e-16j assert np.allclose(my_hess, tr_hess) @@ -152,8 +157,8 @@ def test_model_jacobian2(model): mat = model.mat[0].to_numpy() param = model.get_params(beta)[0] - residual = (model.data.obs - param)*np.sqrt(model.data.weights) - jacobian = mat.T*residual + residual = (model.data.obs - param) * np.sqrt(model.data.weights) + jacobian = mat.T * residual true_jacobian2 = jacobian.dot(jacobian.T) + model.hessian_from_gprior() assert np.allclose(jacobian2, true_jacobian2) @@ -161,16 +166,18 @@ def test_model_jacobian2(model): def test_model_no_variables(): num_obs = 5 - df = pd.DataFrame({ - "obs": np.random.randn(num_obs), - "offset": np.ones(num_obs), - }) + df = pd.DataFrame( + { + "obs": np.random.randn(num_obs), + "offset": np.ones(num_obs), + } + ) data = Data( col_obs="obs", col_offset="offset", df=df, ) - model = GaussianModel(data, param_specs={"mu": {"offset": "offset"}}) + model = create_gaussian_model(data, param_specs={"mu": {"offset": "offset"}}) coefs = np.array([]) grad = model.gradient(coefs) hessian = model.hessian(coefs)