From 1e06fc5d2bf52c7ade2789c5f4c3c3f3448095d4 Mon Sep 17 00:00:00 2001 From: Martin Vonk Date: Thu, 7 Mar 2024 23:57:54 +0100 Subject: [PATCH] make seperate fit_dist function if pars are not return (e.g. with the norm distribution) --- src/spei/si.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/spei/si.py b/src/spei/si.py index c749182..6a18ae3 100644 --- a/src/spei/si.py +++ b/src/spei/si.py @@ -1,4 +1,4 @@ -from typing import Optional, Union +from typing import Optional, Tuple, Union from numpy import linspace, std from pandas import DatetimeIndex, Grouper, Series @@ -51,35 +51,47 @@ def compute_si_ppf( for _, grval in dfval.groupby(Grouper(freq=inf_freq)): data = get_data_series(grval) if prob_zero: - cdf = compute_cdf_probzero(data=data.values, dist=dist) + p0 = (data == 0.0).sum() / len(data) + pars, loc, scale = fit_dist(data=data[data != 0.0], dist=dist) + cdf_sub = compute_cdf(data=data, dist=dist, loc=loc, scale=scale, pars=pars) + cdf = p0 + (1 - p0) * cdf_sub + cdf[data == 0.0] = p0 else: - cdf = compute_cdf(data=data.values, dist=dist) + pars, loc, scale = fit_dist(data=data, dist=dist) + cdf = compute_cdf(data=data, dist=dist, loc=loc, scale=scale, pars=pars) ppf = norm.ppf(cdf) si.loc[data.index] = ppf return si -def compute_cdf( - data: Union[Series, NDArrayFloat], dist: ContinuousDist -) -> NDArrayFloat: - *pars, loc, scale = dist.fit(data, scale=std(data)) - cdf = dist.cdf(data, pars, loc=loc, scale=scale) - return cdf +def fit_dist(data: Union[Series, NDArrayFloat], dist: ContinuousDist) -> Tuple: + """Fit a Scipy Continuous Distribution""" + fit_tuple = dist.fit(data, scale=std(data)) + if len(fit_tuple) == 2: + loc, scale = fit_tuple + pars = None + else: + *pars, loc, scale = fit_tuple + return pars, loc, scale -def compute_cdf_probzero( - data: Union[Series, NDArrayFloat], dist: ContinuousDist +def compute_cdf( + data: Union[Series, NDArrayFloat], + dist: ContinuousDist, + loc: float, + scale: float, + pars: Optional[Tuple[float]] = None, ) -> NDArrayFloat: - p0 = (data == 0.0).sum() / len(data) - *pars, loc, scale = dist.fit(data[data != 0.0], scale=std(data)) - cdf_sub = dist.cdf(data, pars, loc=loc, scale=scale) - cdf = p0 + (1 - p0) * cdf_sub - cdf[data == 0.0] = p0 + """Compute cumulative density function of a Scipy Continuous Distribution""" + if pars is not None: + cdf = dist.cdf(data, pars, loc=loc, scale=scale) + else: + cdf = dist.cdf(data, loc=loc, scale=scale) return cdf def compute_cdf_nsf(data: Union[Series, NDArrayFloat]) -> NDArrayFloat: - """Normal Scores Transform""" + """Compute cumulative density function using the Normal Scores Transform""" n = data.size cdf = linspace(1 / (2 * n), 1 - 1 / (2 * n), n) return cdf