Skip to content

Commit

Permalink
WIP refactor: implement MomentumPool from expertsystem
Browse files Browse the repository at this point in the history
  • Loading branch information
redeboer committed Mar 10, 2021
1 parent 1f16c5f commit ff54436
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 360 deletions.
1 change: 1 addition & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
"seealso",
"sharex",
"subsys",
"tolist",
"tqdm",
"unflatten",
"unflattened",
Expand Down
61 changes: 33 additions & 28 deletions src/tensorwaves/data/generate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Tools to facilitate data sample generation."""

import logging
from typing import Callable, Optional
from typing import Callable, Optional, Tuple

import numpy as np
from expertsystem.amplitude.data import MomentumPool
from expertsystem.amplitude.kinematics import HelicityKinematics, ReactionInfo
from tqdm import tqdm

Expand All @@ -24,19 +25,23 @@ def _generate_data_bunch(
random_generator: UniformRealNumberGenerator,
intensity: Function,
kinematics: HelicityKinematics,
) -> np.ndarray:
) -> Tuple[MomentumPool, float]:
phsp_sample, weights = phsp_generator.generate(
bunch_size, random_generator
)
dataset = kinematics.convert(phsp_sample)
intensities = intensity(dataset)
maxvalue = np.max(intensities)
maxvalue: float = np.max(intensities)

uniform_randoms = random_generator(bunch_size, max_value=maxvalue)

np_phsp_sample = np.array(phsp_sample.values())
np_phsp_sample = np_phsp_sample.transpose(1, 0, 2)
return (np_phsp_sample[weights * intensities > uniform_randoms], maxvalue)
hit_and_miss_sample = MomentumPool(
{
i: four_momenta[weights * intensities > uniform_randoms]
for i, four_momenta in phsp_sample.items()
}
)
return hit_and_miss_sample, maxvalue


def generate_data(
Expand All @@ -48,7 +53,7 @@ def generate_data(
] = TFPhaseSpaceGenerator,
random_generator: Optional[UniformRealNumberGenerator] = None,
bunch_size: int = 50000,
) -> np.ndarray:
) -> MomentumPool:
"""Facade function for creating data samples based on an intensities.
Args:
Expand All @@ -72,36 +77,35 @@ def generate_data(
desc="Generating intensity-based sample",
disable=logging.getLogger().level > logging.WARNING,
)
events = np.array([])
momentum_pool = MomentumPool({})
current_max = 0.0
while np.size(events, 0) < size:
while momentum_pool.n_events < size:
bunch, maxvalue = _generate_data_bunch(
bunch_size,
phsp_gen_instance,
random_generator,
intensity,
kinematics,
)

if maxvalue > current_max:
current_max = 1.05 * maxvalue
if np.size(events, 0) > 0:
if momentum_pool.n_events > 0:
logging.info(
"processed bunch maximum of %s is over current"
" maximum %s. Restarting generation!",
maxvalue,
current_max,
)
events = np.array([])
momentum_pool = MomentumPool({})
progress_bar.update()
continue
if np.size(events, 0) > 0:
events = np.vstack((events, bunch))
if np.size(momentum_pool, 0) > 0:
momentum_pool.append(bunch)
else:
events = bunch
momentum_pool = bunch
progress_bar.update()
progress_bar.close()
return events[0:size].transpose(1, 0, 2)
return momentum_pool


def generate_phsp(
Expand All @@ -112,7 +116,7 @@ def generate_phsp(
] = TFPhaseSpaceGenerator,
random_generator: Optional[UniformRealNumberGenerator] = None,
bunch_size: int = 50000,
) -> np.ndarray:
) -> MomentumPool:
"""Facade function for creating (unweighted) phase space samples.
Args:
Expand All @@ -139,22 +143,23 @@ def generate_phsp(
desc="Generating phase space sample",
disable=logging.getLogger().level > logging.WARNING,
)
events = np.array([])
while np.size(events, 0) < size:
momentum_pool = MomentumPool({})
while momentum_pool.n_events < size:
phsp_sample, weights = phsp_gen_instance.generate(
bunch_size, random_generator
)
np_phsp_sample = np.array(phsp_sample.values())
np_phsp_sample = np_phsp_sample.transpose(1, 0, 2)

hit_and_miss_randoms = random_generator(bunch_size)
bunch = MomentumPool(
{
i: four_momenta[weights > hit_and_miss_randoms]
for i, four_momenta in phsp_sample.items()
}
)

bunch = np_phsp_sample[weights > hit_and_miss_randoms]

if np.size(events, 0) > 0:
events = np.vstack((events, bunch))
if momentum_pool.n_events > 0:
momentum_pool.append(bunch)
else:
events = bunch
momentum_pool = bunch
progress_bar.update()
progress_bar.close()
return events[0:size].transpose(1, 0, 2)
return momentum_pool.select_events(slice(0, size))
34 changes: 19 additions & 15 deletions src/tensorwaves/data/tf_phasespace.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Phase space generation using tensorflow."""

from typing import Dict, Optional, Tuple
from typing import Optional, Tuple

import expertsystem.amplitude.kinematics as es
import numpy as np
import phasespace
import tensorflow as tf
from expertsystem.amplitude.data import MomentumPool, ScalarSequence
from phasespace.random import get_rng

from tensorwaves.interfaces import (
Expand All @@ -30,7 +30,7 @@ def __init__(self, reaction_info: es.ReactionInfo) -> None:

def generate(
self, size: int, rng: UniformRealNumberGenerator
) -> Tuple[Dict[int, np.ndarray], np.ndarray]:
) -> Tuple[MomentumPool, ScalarSequence]:
if not isinstance(rng, TFUniformRealNumberGenerator):
raise TypeError(
f"{TFPhaseSpaceGenerator.__name__} requires a "
Expand All @@ -40,11 +40,13 @@ def generate(
weights, particles = self.phsp_gen.generate(
n_events=size, seed=rng.generator
)
momentum_pool = {
int(label): momenta.numpy().T
for label, momenta in particles.items()
}
return momentum_pool, weights.numpy()
momentum_pool = MomentumPool(
{
int(label): momenta.numpy()[:, [3, 0, 1, 2]]
for label, momenta in particles.items()
}
)
return momentum_pool, ScalarSequence(weights.numpy())


class TFUniformRealNumberGenerator(UniformRealNumberGenerator):
Expand All @@ -56,13 +58,15 @@ def __init__(self, seed: Optional[float] = None):

def __call__(
self, size: int, min_value: float = 0.0, max_value: float = 1.0
) -> np.ndarray:
return self.generator.uniform(
shape=[size],
minval=min_value,
maxval=max_value,
dtype=self.dtype,
).numpy()
) -> ScalarSequence:
return ScalarSequence(
self.generator.uniform(
shape=[size],
minval=min_value,
maxval=max_value,
dtype=self.dtype,
).numpy()
)

@property
def seed(self) -> Optional[float]:
Expand Down
70 changes: 18 additions & 52 deletions src/tensorwaves/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@
All estimators have to implement the `~.interfaces.Estimator` interface.
"""
from typing import Callable, Dict, List, Union
from typing import Callable, Dict, Mapping, Union

from expertsystem.amplitude.data import DataSet

from tensorwaves.interfaces import Estimator, Model
from tensorwaves.physics.amplitude import get_backend_modules


def gradient_creator(
function: Callable[[Dict[str, Union[float, complex]]], float],
function: Callable[[Mapping[str, Union[float, complex]]], float],
backend: Union[str, tuple, dict],
) -> Callable[
[Dict[str, Union[float, complex]]], Dict[str, Union[float, complex]]
[Mapping[str, Union[float, complex]]], Dict[str, Union[float, complex]]
]:
# pylint: disable=import-outside-toplevel
def not_implemented(
parameters: Dict[str, Union[float, complex]]
parameters: Mapping[str, Union[float, complex]]
) -> Dict[str, Union[float, complex]]:
raise NotImplementedError("Gradient not implemented.")

Expand Down Expand Up @@ -50,18 +52,15 @@ class SympyUnbinnedNLL( # pylint: disable=too-many-instance-attributes
def __init__(
self,
model: Model,
dataset: dict,
phsp_dataset: dict,
dataset: DataSet,
phsp_dataset: DataSet,
phsp_volume: float = 1.0,
backend: Union[str, tuple, dict] = "numpy",
) -> None:
self.__function = model.lambdify(backend)
self.__gradient = gradient_creator(self.__call__, backend)
backend_modules = get_backend_modules(backend)

self.__bare_model = model.lambdify(
backend=backend,
)

def find_function_in_backend(name: str) -> Callable:
if isinstance(backend_modules, dict) and name in backend_modules:
return backend_modules[name]
Expand All @@ -75,56 +74,23 @@ def find_function_in_backend(name: str) -> Callable:
self.__sum_function = find_function_in_backend("sum")
self.__log_function = find_function_in_backend("log")

self.__dataset = dataset
self.__phsp_dataset = phsp_dataset
self.__phsp_volume = phsp_volume

self.__data_args = []
self.__phsp_args = []
self.__parameter_index_mapping: Dict[str, int] = {}

for i, var_name in enumerate(model.variables):
if var_name in dataset and var_name in phsp_dataset:
self.__data_args.append(dataset[var_name])
self.__phsp_args.append(phsp_dataset[var_name])
elif var_name in dataset:
raise ValueError(
f"Datasets do not match! {var_name} exists in dataset but "
"not in phase space dataset."
)
elif var_name in phsp_dataset:
raise ValueError(
f"Datasets do not match! {var_name} exists in phase space "
"dataset but not in dataset."
)
else:
self.__data_args.append(model.parameters[var_name])
self.__phsp_args.append(model.parameters[var_name])
self.__parameter_index_mapping[var_name] = i

def __call__(self, parameters: Dict[str, Union[float, complex]]) -> float:
self.__update_parameters(parameters)

bare_intensities = self.__bare_model(*self.__data_args)
def __call__(
self, parameters: Mapping[str, Union[float, complex]]
) -> float:
self.__function.update_parameters(parameters)
bare_intensities = self.__function(self.__dataset)
normalization_factor = 1.0 / (
self.__phsp_volume
* self.__mean_function(self.__bare_model(*self.__phsp_args))
* self.__mean_function(self.__function(self.__phsp_dataset))
)
likelihoods = normalization_factor * bare_intensities
return -self.__sum_function(self.__log_function(likelihoods))

def __update_parameters(
self, parameters: Dict[str, Union[float, complex]]
) -> None:
for par_name, value in parameters.items():
if par_name in self.__parameter_index_mapping:
index = self.__parameter_index_mapping[par_name]
self.__data_args[index] = value
self.__phsp_args[index] = value

@property
def parameters(self) -> List[str]:
return list(self.__parameter_index_mapping)

def gradient(
self, parameters: Dict[str, Union[float, complex]]
self, parameters: Mapping[str, Union[float, complex]]
) -> Dict[str, Union[float, complex]]:
return self.__gradient(parameters)
Loading

0 comments on commit ff54436

Please sign in to comment.