Skip to content

Commit

Permalink
add canonical gaussian model
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengp0 committed Dec 26, 2024
1 parent ed147c4 commit 3badcc0
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/regmod/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from .binomial import BinomialModel, CanonicalBinomialModel, create_binomial_model
from .gaussian import GaussianModel
from .gaussian import CanonicalGaussianModel, GaussianModel, create_gaussian_model
from .model import Model
from .negativebinomial import NegativeBinomialModel
from .pogit import PogitModel
Expand Down
69 changes: 68 additions & 1 deletion src/regmod/models/gaussian.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
from scipy.stats import norm

from regmod._typing import Callable, DataFrame, Matrix, NDArray
from regmod.data import Data
from regmod.optimizer import msca_optimize

from .model import Model
from .utils import model_post_init
from .utils import get_params, model_post_init


class GaussianModel(Model):
Expand All @@ -31,6 +32,13 @@ def attach_df(self, df: DataFrame):
def hessian_from_gprior(self) -> Matrix:
return self.hmat

def get_lin_param(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
lin_param = mat.dot(coefs)
if self.params[0].offset is not None:
lin_param += self.data.get_cols(self.params[0].offset)
return lin_param

def objective(self, coefs: NDArray) -> float:
"""Objective function.
Parameters
Expand Down Expand Up @@ -158,3 +166,62 @@ def get_ui(self, params: list[NDArray], bounds: tuple[float, float]) -> NDArray:
norm.ppf(bounds[0], loc=mean, scale=sd),
norm.ppf(bounds[1], loc=mean, scale=sd),
]


class CanonicalGaussianModel(GaussianModel):
def __init__(self, data: Data, **kwargs):
super().__init__(data, **kwargs)
if self.params[0].inv_link.name != "identity":
raise ValueError(
"Canonical Gaussian model requires inverse link to be identity."
)

def objective(self, coefs: NDArray) -> float:
weights = self.data.weights * self.data.trim_weights
y = self.get_lin_param(coefs)

prior_obj = self.objective_from_gprior(coefs)
likli_obj = 0.5 * weights.dot((y - self.data.obs) ** 2)
return prior_obj + likli_obj

def gradient(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
weights = self.data.weights * self.data.trim_weights
y = self.get_lin_param(coefs)

prior_grad = self.gradient_from_gprior(coefs)
likli_grad = mat.T.dot(weights * (y - self.data.obs))
return prior_grad + likli_grad

def hessian(self, coefs: NDArray) -> Matrix:
mat = self.mat[0]
weights = self.data.weights * self.data.trim_weights
likli_hess_scale = weights

prior_hess = self.hessian_from_gprior()
likli_hess_right = mat.scale_rows(likli_hess_scale)
likli_hess = mat.T.dot(likli_hess_right)

return prior_hess + likli_hess

def jacobian2(self, coefs: NDArray) -> NDArray:
mat = self.mat[0]
weights = self.data.weights * self.data.trim_weights
y = self.get_lin_param(coefs)
likli_jac_scale = weights * (y - self.data.obs)

likli_jac = mat.T.scale_cols(likli_jac_scale)
likli_jac2 = likli_jac.dot(likli_jac.T)
return self.hessian_from_gprior() + likli_jac2


def create_gaussian_model(data: Data, **kwargs) -> GaussianModel:
params = get_params(
params=kwargs.get("params"),
param_specs=kwargs.get("param_specs"),
default_param_specs=GaussianModel.default_param_specs,
)

if params[0].inv_link.name == "identity":
return CanonicalGaussianModel(data, params=params)
return GaussianModel(data, params=params)
67 changes: 37 additions & 30 deletions tests/test_gaussianmodel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
"""
Test Gaussian Model
"""

import numpy as np
import pandas as pd
import pytest

from regmod.data import Data
from regmod.function import fun_dict
from regmod.models import GaussianModel
from regmod.prior import (GaussianPrior, SplineGaussianPrior,
SplineUniformPrior, UniformPrior)
from regmod.models import create_gaussian_model
from regmod.prior import (
GaussianPrior,
SplineGaussianPrior,
SplineUniformPrior,
UniformPrior,
)
from regmod.utils import SplineSpecs
from regmod.variable import SplineVariable, Variable

Expand All @@ -19,14 +23,14 @@
@pytest.fixture
def data():
num_obs = 5
df = pd.DataFrame({
"obs": np.random.randn(num_obs),
"cov0": np.random.randn(num_obs),
"cov1": np.random.randn(num_obs)
})
return Data(col_obs="obs",
col_covs=["cov0", "cov1"],
df=df)
df = pd.DataFrame(
{
"obs": np.random.randn(num_obs),
"cov0": np.random.randn(num_obs),
"cov1": np.random.randn(num_obs),
}
)
return Data(col_obs="obs", col_covs=["cov0", "cov1"], df=df)


@pytest.fixture
Expand All @@ -41,9 +45,9 @@ def uprior():

@pytest.fixture
def spline_specs():
return SplineSpecs(knots=np.linspace(0.0, 1.0, 5),
degree=3,
knots_type="rel_domain")
return SplineSpecs(
knots=np.linspace(0.0, 1.0, 5), degree=3, knots_type="rel_domain"
)


@pytest.fixture
Expand All @@ -58,20 +62,21 @@ def spline_uprior():

@pytest.fixture
def var_cov0(gprior, uprior):
return Variable(name="cov0",
priors=[gprior, uprior])
return Variable(name="cov0", priors=[gprior, uprior])


@pytest.fixture
def var_cov1(spline_gprior, spline_uprior, spline_specs):
return SplineVariable(name="cov1",
spline_specs=spline_specs,
priors=[spline_gprior, spline_uprior])
return SplineVariable(
name="cov1", spline_specs=spline_specs, priors=[spline_gprior, spline_uprior]
)


@pytest.fixture
def model(data, var_cov0, var_cov1):
return GaussianModel(data, param_specs={"mu": {"variables": [var_cov0, var_cov1]}})
return create_gaussian_model(
data, param_specs={"mu": {"variables": [var_cov0, var_cov1]}}
)


def test_model_result(model):
Expand Down Expand Up @@ -117,7 +122,7 @@ def test_model_gradient(model, inv_link):
tr_grad = np.zeros(model.size)
for i in range(model.size):
coefs_c[i] += 1e-16j
tr_grad[i] = model.objective(coefs_c).imag/1e-16
tr_grad[i] = model.objective(coefs_c).imag / 1e-16
coefs_c[i] -= 1e-16j
assert np.allclose(my_grad, tr_grad)

Expand All @@ -132,7 +137,7 @@ def test_model_hessian(model, inv_link):
for i in range(model.size):
for j in range(model.size):
coefs_c[j] += 1e-16j
tr_hess[i][j] = model.gradient(coefs_c).imag[i]/1e-16
tr_hess[i][j] = model.gradient(coefs_c).imag[i] / 1e-16
coefs_c[j] -= 1e-16j

assert np.allclose(my_hess, tr_hess)
Expand All @@ -152,25 +157,27 @@ def test_model_jacobian2(model):

mat = model.mat[0].to_numpy()
param = model.get_params(beta)[0]
residual = (model.data.obs - param)*np.sqrt(model.data.weights)
jacobian = mat.T*residual
residual = (model.data.obs - param) * np.sqrt(model.data.weights)
jacobian = mat.T * residual
true_jacobian2 = jacobian.dot(jacobian.T) + model.hessian_from_gprior()

assert np.allclose(jacobian2, true_jacobian2)


def test_model_no_variables():
num_obs = 5
df = pd.DataFrame({
"obs": np.random.randn(num_obs),
"offset": np.ones(num_obs),
})
df = pd.DataFrame(
{
"obs": np.random.randn(num_obs),
"offset": np.ones(num_obs),
}
)
data = Data(
col_obs="obs",
col_offset="offset",
df=df,
)
model = GaussianModel(data, param_specs={"mu": {"offset": "offset"}})
model = create_gaussian_model(data, param_specs={"mu": {"offset": "offset"}})
coefs = np.array([])
grad = model.gradient(coefs)
hessian = model.hessian(coefs)
Expand Down

0 comments on commit 3badcc0

Please sign in to comment.