Skip to content

Commit

Permalink
Merge pull request #39 from Herculens/35-create-interface-for-a-noise…
Browse files Browse the repository at this point in the history
…-model

Interface for a noise model
  • Loading branch information
aymgal authored Nov 15, 2024
2 parents a15896a + d7b8455 commit 4110803
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 74 deletions.
15 changes: 11 additions & 4 deletions herculens/Analysis/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def model_summary(self, lens_image, kwargs_result,
lock_colorbars=False, masked_residuals=True,
vmin_pot=None, vmax_pot=None, # TEMP
k_source=None, k_lens=None,
show_plot=True):
kwargs_noise=None, show_plot=True):
n_cols = 3
n_rows = sum([show_image, show_source, show_lens_light,
show_lens_potential, show_lens_others])
Expand All @@ -117,11 +117,14 @@ def model_summary(self, lens_image, kwargs_result,
extent = lens_image.Grid.plt_extent

##### PREPARE IMAGES #####

if kwargs_noise is None:
kwargs_noise = {}

if show_image:
# create the resulting model image
model = lens_image.model(**kwargs_result, k_lens=k_lens)
noise_var = lens_image.Noise.C_D_model(model)
noise_var = lens_image.Noise.C_D_model(model, **kwargs_noise)
if likelihood_mask is None:
mask_bool = False
likelihood_mask = np.ones_like(model)
Expand Down Expand Up @@ -325,12 +328,16 @@ def model_summary(self, lens_image, kwargs_result,
colorbar_kwargs={'orientation': 'horizontal'})

ax = axes[i_row, 2]
model_residuals, residuals = lens_image.normalized_residuals(data, model, mask=likelihood_mask)
model_residuals, residuals = lens_image.normalized_residuals(
data, model, kwargs_noise=kwargs_noise, mask=likelihood_mask,
)
if masked_residuals is True:
residuals_plot = model_residuals
else:
residuals_plot = residuals
red_chi2 = lens_image.reduced_chi2(data, model, mask=likelihood_mask)
red_chi2 = lens_image.reduced_chi2(
data, model, kwargs_noise=kwargs_noise, mask=likelihood_mask,
)
im = ax.imshow(residuals_plot, cmap=self.cmap_res, extent=extent, norm=self.norm_res)
im.set_rasterized(True)
if mask_bool is True and masked_residuals is False:
Expand Down
215 changes: 151 additions & 64 deletions herculens/Instrument/noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,81 @@


import numpy as np
from functools import partial
import jax.numpy as jnp
from jax import random
from jax import random, jit
from herculens.Util import image_util


__all__ = ['Noise']


class Noise(object):
"""
class that deals with noise properties of imaging data
"""Class that builds a noise model, to be used in LensImage.
Three possibilities exist, depending on what the user provides to the constructor:
- a fixed noise map, which does not depend on any model-predicted flux.
- an exposure time (or map) and a fixed background noise estimate,
which will be used to estimate the noise based on an image model (see LensImage.model()).
- an exposure time (or map) but no background noise estimate. In this case,
the user should provide a (possibly varying) background_rms value when
calling the Noise.C_D_model().
Parameters
----------
nx : int
number of data pixels along the x direction.
ny : int
number of data pixels along the y direction.
exposure_time : np.array or float, optional
exposure time, either common for all pixels or for each individual
data pixel. By default None
background_rms : float, optional
Root-mean-square value (standard deviation) of Gaussian background noise.
By default None.
noise_map : np.array or int, optional
noise standard of each individual pixel.
If provide, overwrites any value of background_rms and exposure_time.
By default None.
variance_boost_map : np.array, optional
fixed (not model-dependent) variance boost map. By default None.
verbose : bool, optional
If True, outputs warning message at construction. By default True.
Raises
------
ValueError
If neither a noise map or exposure time is provided.
"""

def __init__(self, nx, ny, exposure_time=None, background_rms=None,
noise_map=None, variance_boost_map=None, verbose=True):
"""
:param image_data: numpy array, pixel data values
:param exposure_time: int or array of size the data; exposure time
(common for all pixels or individually for each individual pixel)
:param background_rms: root-mean-square value of Gaussian background noise
:param noise_map: int or array of size the data; joint noise sqrt(variance) of each individual pixel.
Overwrites meaning of background_rms and exposure_time.
:param variance_boost_map: fixed (not model-dependent) variance boost map.
"""
self._background_rms = float(background_rms)
self._data = None # TODO: is that really useful?
self._nx, self._ny = nx, ny # TODO: is that really useful?
if noise_map is not None:
assert np.shape(noise_map) == (nx, ny)
assert np.all(noise_map > 0.)
if exposure_time is None and noise_map is None:
raise ValueError("Either a fixed noise map or an exposure time "
"(or exposure map) should be provided.")
self._noise_map = noise_map
if exposure_time is not None:
# make sure no negative exposure values are present no dividing by zero
if isinstance(exposure_time, int) or isinstance(exposure_time, float):
if exposure_time <= 1e-10:
exposure_time = 1e-10
else:
exposure_time[exposure_time <= 1e-10] = 1e-10
elif noise_map is None:
noise_map = self._background_rms * np.ones((nx, ny))
self._exp_map = exposure_time
self._noise_map = noise_map
if noise_map is not None:
assert np.shape(noise_map) == (nx, ny)
else:
if background_rms is not None and exposure_time is not None:
if background_rms * np.max(exposure_time) < 1 and verbose is True:
UserWarning("sigma_b*f %s < 1 count may introduce unstable error estimates with a Gaussian "
"error function for a Poisson distribution with mean < 1." % (
background_rms * np.max(exposure_time)))
self._nx, self._ny = nx, ny
self._data = None
if variance_boost_map is not None:
# NOTE: we use a setter for backward compatibility reasons
self.variance_boost_map = variance_boost_map
self._background_rms = background_rms
if (self._noise_map is None and self._background_rms is None
and verbose is True):
print("Warning: both `noise_map` and `background_rms` are None; "
"`background_rms` should then be given as a model parameter "
"to estimate the noise variance via C_D_model().")
if variance_boost_map is None:
variance_boost_map = np.ones((self._nx, self._ny))
self.global_boost_map = variance_boost_map # NOTE: we use a setter for backward compatibility reasons

def set_data(self, data):
assert np.shape(data) == (self._nx, self._ny)
Expand All @@ -76,10 +101,14 @@ def compute_noise_map_from_model(self, model, as_jax_array=True):
def realisation(self, model, prng_key, add_gaussian=True, add_poisson=True):
noise_real = 0.
key1, key2 = random.split(prng_key)
if add_poisson and self._exp_map is not None:
noise_real += image_util.add_poisson(model, self._exp_map, key1)
if add_poisson:
if self.exposure_map is None:
raise ValueError("An exposure time (or map) is needed to add Poisson noise")
noise_real += image_util.add_poisson(model, self.exposure_map, key1)
if add_gaussian:
noise_real += image_util.add_background(model, self._background_rms, key2)
if self.background_rms is None:
raise ValueError("An background RMS value is needed to add Poisson noise")
noise_real += image_util.add_background(model, self.background_rms, key2)
return noise_real

@property
Expand All @@ -95,12 +124,26 @@ def variance_boost_map(self, boost_map):
@property
def background_rms(self):
"""
The standard deviation ("RMS") of the background noise.
NOTE: "RMS" is a misleading term; it will be changed to sigma
or standard deviation in the future.
Returns
-------
float
Standard deviation of the background noise
:return: rms value of background noise
Raises
------
ValueError
If neither a background_rms value nor a noise map is available.
"""
if self._background_rms is None:
if self._noise_map is None:
raise ValueError("rms background value as 'background_rms' not specified!")
else:
print("Warning: Estimating the background RMS by the median of the noise map.")
self._background_rms = np.median(self._noise_map)
return self._background_rms

Expand All @@ -110,7 +153,15 @@ def exposure_map(self):
Units of data and exposure map should result in:
number of flux counts = data * exposure_map
:return: exposure map for each pixel
Returns
-------
_type_
exposure map for each pixel
Raises
------
ValueError
If neither an exposure time (or map) nor a noise map is available.
"""
if self._exp_map is None:
if self._noise_map is None:
Expand All @@ -119,64 +170,100 @@ def exposure_map(self):

@property
def C_D(self):
"""
Covariance matrix of all pixel values in 2d numpy array (only diagonal component)
The covariance matrix is estimated from the data.
"""Covariance matrix of all pixel values in 2d numpy array (only diagonal component)
The covariance matrix is estimated from the data (not from any model).
WARNING: For low count statistics, the noise in the data may lead to biased estimates of the covariance matrix.
:return: covariance matrix of all pixel values in 2d numpy array (only diagonal component).
Returns
-------
jax.numpy.array
Noise variance per pixel.
Raises
------
ValueError
If no data image nor a noise map is available.
"""
if not hasattr(self, '_C_D'):
if self._noise_map is not None:
self._C_D = self._noise_map ** 2
else:
if self._data is None:
raise ValueError("No imaging data array has been set, impossible to estimate the diagonal covariance matrix")
self._C_D = self.covariance_matrix(self._data, self.background_rms, self.exposure_map)
self._C_D = self.total_variance(self._data, self.background_rms, self.exposure_map)
return self._C_D

def C_D_model(self, model, boost_map=None, force_recompute=False):
"""
@partial(jit, static_argnums=(0, 4))
def C_D_model(self, model, background_rms=None, boost_map=1.,
force_recompute=False):
"""Returns the estimate of the variance per pixel (i.e. the diagonal
of the data covariance matrix) with contributions from background noise
and shot noise from the model-predicted flux.
:param model: model (same as data but without noise)
:return: estimate of the noise per pixel based on the model flux
Parameters
----------
model : jax.numpy.array
Image model for shot noise (Poisson noise) estimation.
background_rms : float, optional
Standard deviation of the background noise. If not given, uses the
(fixed) value provided to the constructor, or is ignored if a
(fixed) noise map has been provided.
boost_map : jax.numpy.array, optional
2D map (same dimensions as the model image), that contains
multiplicative factors for the noise variance. By default 1.
force_recompute : bool, optional
If True, forces the use of the fixed noise map. By default False.
Returns
-------
jax.numpy.array
Noise variance per pixel corresponding to the input image model.
"""
if boost_map is None:
boost_map = self.variance_boost_map
if not force_recompute and self._noise_map is not None:
# variance fixed by the noise map
c_d = self._noise_map**2
elif self._background_rms is not None:
# variance computed based on model and fixed background RMS and exposure map
c_d = self.total_variance(model, self._background_rms, self.exposure_map)
else:
c_d = self.covariance_matrix(model, self._background_rms, self._exp_map)
return boost_map * c_d

# variance computed based on model, given background RMS and fixed exposure map
c_d = self.total_variance(model, background_rms, self.exposure_map)
return self.global_boost_map * boost_map * c_d

def _reset_cache(self):
if hasattr(self, '_C_D'):
delattr(self, '_C_D')


@staticmethod
def covariance_matrix(data, background_rms, exposure_map):
"""
returns a diagonal matrix for the covariance estimation which describes the error
def total_variance(flux, background_rms, exposure_map):
"""Computes the diagonal of the data covariance matrix, i.e. the noise variance.
Notes:
- the exposure map must be positive definite. Values that deviate too much from the mean exposure time will be
given a lower limit to not under-predict the Poisson component of the noise.
- the data must be positive semi-definite for the Poisson noise estimate.
- the flux must be positive semi-definite for the Poisson noise estimate.
Values < 0 (Possible after mean subtraction) will not have a Poisson component in their noise estimate.
Parameters
----------
flux : jax.numpy.array
Pixels containing the flux from which the shot (Poisson) noise can be estimated.
background_rms : float
Standard deviation of the background noise.
exposure_map : float or jax.numpy.array
Global exposure time or exposure time per pixel.
:param data: data array, eg in units of photons/second
:param background_rms: background noise rms, eg. in units (photons/second)^2
:param exposure_map: exposure time per pixel, e.g. in units of seconds
:return: len(d) x len(d) matrix that give the error of background and Poisson components; (photons/second)^2
Returns
-------
jax.numpy.array
Noise variance (background + flux-dependent shot noise) per pixel.
"""
sigma2 = background_rms**2
sigma2_bkg = background_rms**2
if exposure_map is not None:
d_pos = jnp.maximum(0, data)
sigma2 += d_pos / exposure_map
flux_pos = jnp.maximum(0, flux)
sigma2_poisson = flux_pos / exposure_map
sigma2_tot = sigma2_bkg + sigma2_poisson
else:
sigma2 = sigma2 * jnp.ones_like(data)
return sigma2
sigma2_tot = sigma2_bkg * jnp.ones_like(flux)
return sigma2_tot
14 changes: 8 additions & 6 deletions herculens/LensImage/lens_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,19 @@ def simulation(self, add_poisson=True, add_gaussian=True,
self.Noise.compute_noise_map_from_model(model)
return simu

def C_D_model(self, model, boost_map=None):
return self.Noise.C_D_model(model, boost_map=boost_map)
def C_D_model(self, model, **kwargs_noise):
return self.Noise.C_D_model(model, **kwargs_noise)

def normalized_residuals(self, data, model, mask=None):
def normalized_residuals(self, data, model, kwargs_noise=None, mask=None):
"""
compute the map of normalized residuals,
given the data and the model image
"""
if kwargs_noise is None:
kwargs_noise = {}
if mask is None:
mask = np.ones(self.Grid.num_pixel_axes)
noise_var = self.C_D_model(model)
noise_var = self.C_D_model(model, **kwargs_noise)
noise = np.sqrt(noise_var)
norm_res_model = (data - model) / noise * mask
norm_res_tot = norm_res_model
Expand All @@ -299,13 +301,13 @@ def normalized_residuals(self, data, model, mask=None):
norm_res_tot = np.where(np.isfinite(norm_res_tot), norm_res_tot, 0.)
return norm_res_model, norm_res_tot

def reduced_chi2(self, data, model, mask=None):
def reduced_chi2(self, data, model, kwargs_noise=None, mask=None):
"""
compute the reduced chi2 of the data given the model
"""
if mask is None:
mask = np.ones(self.Grid.num_pixel_axes)
norm_res, _ = self.normalized_residuals(data, model, mask=mask)
norm_res, _ = self.normalized_residuals(data, model, kwargs_noise=kwargs_noise, mask=mask)
num_data_points = np.sum(mask)
return np.sum(norm_res**2) / num_data_points

Expand Down

0 comments on commit 4110803

Please sign in to comment.