Skip to content

Commit

Permalink
Sync pop
Browse files Browse the repository at this point in the history
  commit 76077e374ee0449801b8987df0f7308f06a1936c
  Author: Dominik Jain <[email protected]>
  Date:   Thu Aug 10 16:15:01 2023 +0200

      Improve tracking
        * Proper support for cross-validation
        * Adapt context validity to capture learning times

  src/sensai/evaluation/crossval.py
  src/sensai/evaluation/eval_util.py
  src/sensai/evaluation/evaluator.py
  src/sensai/tracking/tracking_base.py

  commit ee92d5a48a8d6529419dcccdd326303e0019b84f
  Author: Dominik Jain <[email protected]>
  Date:   Thu Aug 10 16:13:19 2023 +0200

      mlflow tracking: Improve naming of metrics/tags

  src/sensai/tracking/mlflow_tracking.py

  commit c9eaa1b095ce3e85db33201874dccc915dadcab6
  Author: Dominik Jain <[email protected]>
  Date:   Thu Aug 10 16:12:42 2023 +0200

      TagBuilder:
        * Add with_component
        * Support multiple initial components

  src/sensai/util/string.py

  commit bdc002759191eb5d3415e7e376a7ab932708be02
  Author: Dominik Jain <[email protected]>
  Date:   Tue Aug 8 14:29:23 2023 +0200

      PickleLoadSaveMixin: Support Path in addition to str

  src/sensai/util/cache.py

  commit b76ee10af87de4cce5f6dce9f1970b1b30497f53
  Author: Dominik Jain <[email protected]>
  Date:   Tue Aug 8 14:10:54 2023 +0200

      Support Path objects in load_pickle

  src/sensai/util/pickle.py

  commit be3e90c59fbd4a6cce139cdcf06b2e55a896fed8
  Author: Dominik Jain <[email protected]>
  Date:   Tue Aug 8 12:35:14 2023 +0200

      Tracking: Track pretty-printed version of model string representation

      mlflow tracking: Track model class name as tag "model_class"

  src/sensai/tracking/mlflow_tracking.py
  src/sensai/tracking/tracking_base.py
  src/sensai/vector_model.py

  commit be6ce86f83f035ece748b04c273a96f68603766d
  Author: Dominik Jain <[email protected]>
  Date:   Mon Aug 7 21:31:36 2023 +0200

      MLFlowExperiment: Support tracking of log files via in-memory mechanism

  src/sensai/tracking/mlflow_tracking.py
  src/sensai/tracking/tracking_base.py

  commit 0f0253e7daf1a3e1a4e5157ca0bb4b1254f835a2
  Author: Dominik Jain <[email protected]>
  Date:   Mon Aug 7 21:19:50 2023 +0200

      Improve docstring of add_memory_logger

  src/sensai/util/logging.py
  • Loading branch information
opcode81 committed Aug 11, 2023
1 parent 03b202f commit c6f76f0
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 43 deletions.
52 changes: 35 additions & 17 deletions src/sensai/evaluation/crossval.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MetricsDictProvider, VectorModelEvaluator, VectorClassificationModelEvaluatorParams, \
VectorRegressionModelEvaluatorParams, MetricsDictProviderFromFunction
from ..data import InputOutputData, DataSplitterFractional
from ..tracking.tracking_base import TrackingContext
from ..util.typing import PandasNamedTuple
from ..vector_model import VectorClassificationModel, VectorRegressionModel, VectorModel

Expand Down Expand Up @@ -57,6 +58,13 @@ def iter_input_output_ground_truth_tuples(self, predicted_var_name=None) -> Gene
for i, namedTuple in enumerate(evalData.input_data.itertuples()):
yield namedTuple, eval_stats.y_predicted[i], eval_stats.y_true[i]

def track_metrics(self, tracking_context: TrackingContext):
is_multivar = len(self.predicted_var_names) > 1
for predicted_var_name in self.predicted_var_names:
eval_stats_collection = self.get_eval_stats_collection(predicted_var_name=predicted_var_name)
metrics_dict = eval_stats_collection.agg_metrics_dict()
tracking_context.track_metrics(metrics_dict, predicted_var_name=predicted_var_name if is_multivar else None)


TCrossValData = TypeVar("TCrossValData", bound=VectorModelCrossValidationData)

Expand Down Expand Up @@ -148,7 +156,7 @@ def __init__(self, data: InputOutputData, params: Union[VectorModelCrossValidato
:param params: parameters
"""
self.params = params
self.modelEvaluators = []
self.modelEvaluators: List[VectorModelEvaluator] = []
for trainIndices, testIndices in self.params.splitter.create_folds(data, self.params.folds):
self.modelEvaluators.append(self._create_model_evaluator(data.filter_indices(trainIndices), data.filter_indices(testIndices)))

Expand All @@ -168,26 +176,36 @@ def _create_model_evaluator(self, training_data: InputOutputData, test_data: Inp
def _create_result_data(self, trained_models, eval_data_list, test_indices_list, predicted_var_names) -> TCrossValData:
pass

def eval_model(self, model: VectorModel):
def eval_model(self, model: VectorModel, track: bool = True):
"""
:param model: the model to evaluate
:param track: whether tracking shall be enabled for the case where a tracked experiment is set on this object
:return: cross-validation results
"""
trained_models = [] if self.params.returnTrainedModels else None
eval_data_list = []
test_indices_list = []
predicted_var_names = None
for i, evaluator in enumerate(self.modelEvaluators, start=1):
log.info(f"Training and evaluating model with fold {i}/{len(self.modelEvaluators)} ...")
model_to_fit: VectorModel = copy.deepcopy(model) if self.params.returnTrainedModels else model
evaluator.fit_model(model_to_fit)
eval_data = evaluator.eval_model(model_to_fit)
if predicted_var_names is None:
predicted_var_names = eval_data.predicted_var_names
if self.params.returnTrainedModels:
trained_models.append(model_to_fit)
for predictedVarName in predicted_var_names:
log.info(f"Evaluation result for {predictedVarName}, fold {i}/{len(self.modelEvaluators)}: "
f"{eval_data.get_eval_stats(predicted_var_name=predictedVarName)}")
eval_data_list.append(eval_data)
test_indices_list.append(evaluator.test_data.outputs.index)
return self._create_result_data(trained_models, eval_data_list, test_indices_list, predicted_var_names)
with self.begin_optional_tracking_context_for_model(model, track=track) as tracking_context:
for i, evaluator in enumerate(self.modelEvaluators, start=1):
evaluator: VectorModelEvaluator
log.info(f"Training and evaluating model with fold {i}/{len(self.modelEvaluators)} ...")
model_to_fit: VectorModel = copy.deepcopy(model) if self.params.returnTrainedModels else model
evaluator.fit_model(model_to_fit)
eval_data = evaluator.eval_model(model_to_fit)
if predicted_var_names is None:
predicted_var_names = eval_data.predicted_var_names
if self.params.returnTrainedModels:
trained_models.append(model_to_fit)
for predictedVarName in predicted_var_names:
log.info(f"Evaluation result for {predictedVarName}, fold {i}/{len(self.modelEvaluators)}: "
f"{eval_data.get_eval_stats(predicted_var_name=predictedVarName)}")
eval_data_list.append(eval_data)
test_indices_list.append(evaluator.test_data.outputs.index)
crossval_data = self._create_result_data(trained_models, eval_data_list, test_indices_list, predicted_var_names)
if tracking_context.is_enabled():
crossval_data.track_metrics(tracking_context)
return crossval_data

def _compute_metrics(self, model: VectorModel, **kwargs):
return self._compute_metrics_for_var_name(model, None)
Expand Down
6 changes: 2 additions & 4 deletions src/sensai/evaluation/eval_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ def perform_simple_evaluation(self, model: TModel,
if tracked_experiment is not None:
evaluator.set_tracked_experiment(tracked_experiment)
log.info(f"Evaluating {model} via {evaluator}")
if fit_model:
evaluator.fit_model(model)

def gather_results(result_data: VectorModelEvaluationData, res_writer, subtitle_prefix=""):
str_eval_results = ""
Expand All @@ -293,10 +291,10 @@ def gather_results(result_data: VectorModelEvaluationData, res_writer, subtitle_
self.create_plots(result_data, show_plots=show_plots, result_writer=res_writer,
subtitle_prefix=subtitle_prefix, tracking_context=trackingContext)

eval_result_data = evaluator.eval_model(model)
eval_result_data = evaluator.eval_model(model, fit=True)
gather_results(eval_result_data, result_writer)
if additional_evaluation_on_training_data:
eval_result_data_train = evaluator.eval_model(model, on_training_data=True)
eval_result_data_train = evaluator.eval_model(model, on_training_data=True, track=False)
additional_result_writer = result_writer.child_with_added_prefix("onTrain-") if result_writer is not None else None
gather_results(eval_result_data_train, additional_result_writer, subtitle_prefix="[onTrain] ")
return eval_result_data
Expand Down
11 changes: 8 additions & 3 deletions src/sensai/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,23 @@ def set_tracked_experiment(self, tracked_experiment: TrackedExperiment):
"""
super().set_tracked_experiment(tracked_experiment)

def eval_model(self, model: VectorModelBase, on_training_data=False, track=True) -> TEvalData:
def eval_model(self, model: Union[VectorModelBase, VectorModelFittableBase], on_training_data=False, track=True,
fit=False) -> TEvalData:
"""
Evaluates the given model
:param model: the model to evaluate
:param on_training_data: if True, evaluate on this evaluator's training data rather than the held-out test data
:param track: whether to track the evaluation metrics for the case where a tracked experiment was set on this object
:param fit: whether to fit the model before evaluating it (via this object's `fit_model` method); if enabled, the model
must support fitting
:return: the evaluation result
"""
data = self.training_data if on_training_data else self.test_data
result: VectorModelEvaluationData = self._eval_model(model, data)
with TrackingContext.from_optional_experiment(self.tracked_experiment if track else None, model=model) as trackingContext:
with self.begin_optional_tracking_context_for_model(model, track=track) as trackingContext:
if fit:
self.fit_model(model)
result: VectorModelEvaluationData = self._eval_model(model, data)
is_multiple_pred_vars = len(result.predicted_var_names) > 1
for pred_var_name in result.predicted_var_names:
metrics = result.get_eval_stats(pred_var_name).metrics_dict()
Expand Down
29 changes: 26 additions & 3 deletions src/sensai/tracking/mlflow_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from matplotlib import pyplot as plt

from .tracking_base import TrackedExperiment, TrackingContext
from .. import VectorModelBase
from ..util import logging


class MLFlowTrackingContext(TrackingContext):
Expand All @@ -18,45 +20,66 @@ def __init__(self, name: str, experiment: "MLFlowExperiment", run_id=None, descr

@staticmethod
def _metric_name(name: str):
result = re.sub(r"\[(.*?)\]", r"/\1", name) # replace "foo[bar]" with "foo/bar
result = re.sub(r"\[(.*?)\]", r"_\1", name) # replace "foo[bar]" with "foo_bar"
result = re.sub(r"[^a-zA-Z0-9-_. /]+", "_", result) # replace sequences of unsupported chars with underscore
return result

def _track_metrics(self, metrics: Dict[str, float]):
mlflow.log_metrics({self._metric_name(name): value for name, value in metrics.items()})
metrics = {self._metric_name(name): value for name, value in metrics.items()}
mlflow.log_metrics(metrics)

def track_figure(self, name: str, fig: plt.Figure):
mlflow.log_figure(fig, name + ".png")

def track_text(self, name: str, content: str):
mlflow.log_text(content, name + ".txt")

def track_tag(self, tag_name: str, tag_value: str):
mlflow.set_tag(tag_name, tag_value)

def _end(self):
mlflow.end_run()


class MLFlowExperiment(TrackedExperiment[MLFlowTrackingContext]):
def __init__(self, experiment_name: str, tracking_uri: str, additional_logging_values_dict=None,
context_prefix: str = ""):
context_prefix: str = "", add_log_to_all_contexts=False):
"""
:param experiment_name: the name of the experiment, which should be the same for all models of the same kind (i.e. all models evaluated
under the same conditions)
:param tracking_uri: the URI of the server (if any); use "" to track in the local file system
:param additional_logging_values_dict:
:param context_prefix: a prefix to add to all contexts that are created within the experiment. This can be used to add
an identifier of a certain execution/run, such that the actual context name passed to `begin_context` can be concise (e.g. just model name).
:param add_log_to_all_contexts: whether to enable in-memory logging and add the resulting log file to all tracking contexts that
are generated for this experiment upon context exit (or process termination if it is not cleanly closed)
"""
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name=experiment_name)
super().__init__(context_prefix=context_prefix, additional_logging_values_dict=additional_logging_values_dict)
self._run_name_to_id = {}
self.add_log_to_all_contexts = add_log_to_all_contexts
if self.add_log_to_all_contexts:
logging.add_memory_logger()

def _track_values(self, values_dict: Dict[str, Any]):
with mlflow.start_run():
mlflow.log_metrics(values_dict)

def _create_tracking_context(self, name: str, description: str) -> MLFlowTrackingContext:
run_id = self._run_name_to_id.get(name)
print(f"create {name}")
context = MLFlowTrackingContext(name, self, run_id=run_id, description=description)
self._run_name_to_id[name] = context.run.info.run_id
return context

def begin_context_for_model(self, model: VectorModelBase):
context = super().begin_context_for_model(model)
context.track_tag("ModelClass", model.__class__.__name__)
return context

def end_context(self, instance: MLFlowTrackingContext):
print(f"end {instance}")
if self.add_log_to_all_contexts:
instance.track_text("log", logging.get_memory_log())
super().end_context(instance)
39 changes: 35 additions & 4 deletions src/sensai/tracking/tracking_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Generic, TypeVar
from typing import Dict, Any, Optional, Generic, TypeVar, List

from matplotlib import pyplot as plt

Expand Down Expand Up @@ -28,6 +28,12 @@ def from_optional_experiment(experiment: Optional["TrackedExperiment"], model: O
else:
return experiment.begin_context(name, description)

def is_enabled(self):
"""
:return: True if tracking is enabled, i.e. whether results can be saved via this context
"""
return True

@abstractmethod
def _track_metrics(self, metrics: Dict[str, float]):
pass
Expand Down Expand Up @@ -70,11 +76,13 @@ def _end(self):
pass

def end(self):
self._end()
# first end the context in the experiment (which may add final stuff)
if self._isRunning:
if self._experiment is not None:
self._experiment.end_context(self)
self._isRunning = False
# then end the context for good
self._end()


class DummyTrackingContext(TrackingContext):
Expand All @@ -85,6 +93,9 @@ class DummyTrackingContext(TrackingContext):
def __init__(self, name):
super().__init__(name, None)

def is_enabled(self):
return False

def _track_metrics(self, metrics: Dict[str, float]):
pass

Expand All @@ -110,7 +121,7 @@ def __init__(self, context_prefix: str = "", additional_logging_values_dict=None
# TODO additional_logging_values_dict probably needs to be removed
self.instancePrefix = context_prefix
self.additionalLoggingValuesDict = additional_logging_values_dict
self._contexts = []
self._contexts: List[TContext] = []

@deprecated("Use a tracking context instead")
def track_values(self, values_dict: Dict[str, Any], add_values_dict: Dict[str, Any] = None):
Expand Down Expand Up @@ -151,14 +162,19 @@ def begin_context_for_model(self, model: VectorModelBase):
:param model: the model
:return: the context, which can subsequently be used to track information
"""
return self.begin_context(model.get_name(), str(model))
return self.begin_context(model.get_name(), model.pprints())

def end_context(self, instance: TContext):
running_instance = self._contexts[-1]
if instance != running_instance:
raise ValueError(f"Passed instance ({instance}) is not the currently running instance ({running_instance})")
self._contexts.pop()

def __del__(self):
# make sure all contexts that are still running are eventually closed
for c in reversed(self._contexts):
c.end()


class TrackingMixin(ABC):
_objectId2trackedExperiment = {}
Expand All @@ -172,3 +188,18 @@ def unset_tracked_experiment(self):
@property
def tracked_experiment(self) -> Optional[TrackedExperiment]:
return self._objectId2trackedExperiment.get(id(self))

def begin_optional_tracking_context_for_model(self, model: VectorModelBase, track: bool = True) -> TrackingContext:
"""
Begins a tracking context for the given model; the returned object is a context manager and therefore method should
preferably be used in a `with` statement.
This method can be called regardless of whether there actually is a tracked experiment (hence the term 'optional').
If there is no tracked experiment, calling methods on the returned object has no effect.
Furthermore, tracking can be disabled by passing `track=False` even if a tracked experiment is present.
:param model: the model for which to begin tracking
:paraqm track: whether tracking shall be enabled; if False, force use of a dummy context which performs no actual tracking even
if a tracked experiment is present
:return: a context manager that can be used to track results for the given model
"""
return TrackingContext.from_optional_experiment(self.tracked_experiment if track else None, model=model)
7 changes: 4 additions & 3 deletions src/sensai/util/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import threading
import time
from abc import abstractmethod, ABC
from typing import Any, Callable, Iterator, List, Optional, TypeVar, Generic
from pathlib import Path
from typing import Any, Callable, Iterator, List, Optional, TypeVar, Generic, Union

from .hash import pickle_hash
from .pickle import load_pickle, dump_pickle, setstate
Expand Down Expand Up @@ -723,7 +724,7 @@ def load(cls: T, path: str) -> T:


class PickleLoadSaveMixin(LoadSaveInterface):
def save(self, path: str, backend="pickle"):
def save(self, path: Union[str, Path], backend="pickle"):
"""
Saves the instance as pickle
Expand All @@ -733,7 +734,7 @@ def save(self, path: str, backend="pickle"):
dump_pickle(self, path, backend=backend)

@classmethod
def load(cls, path, backend="pickle"):
def load(cls, path: Union[str, Path], backend="pickle"):
"""
Loads a class instance from pickle
Expand Down
3 changes: 2 additions & 1 deletion src/sensai/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def add_file_logger(path):

def add_memory_logger() -> None:
"""
Enables in-memory logging, i.e. all log statements are written to a memory buffer and can later be read via function `get_memory_log()`
Enables in-memory logging (if it is not already enabled), i.e. all log statements are written to a memory buffer and can later be
read via function `get_memory_log()`
"""
global _memoryLogStream
if _memoryLogStream is not None:
Expand Down
5 changes: 4 additions & 1 deletion src/sensai/util/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
log = logging.getLogger(__name__)


def load_pickle(path, backend="pickle"):
def load_pickle(path: Union[str, Path], backend="pickle"):
if isinstance(path, Path):
path = str(path)

def read_file(f):
if backend == "pickle":
try:
Expand Down
11 changes: 6 additions & 5 deletions src/sensai/util/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,16 @@ class TagBuilder:
"""
Assists in building strings made up of components that are joined via a glue string
"""
def __init__(self, prefix="", glue="_"):
def __init__(self, *initial_components: str, glue="_"):
"""
:param prefix: an initial component to always include at the beginning
:param initial_components: initial components to always include at the beginning
:param glue: the glue string which joins components
"""
self.glue = glue
self.components = []
if prefix != "":
self.components.append(prefix)
self.components = list(initial_components)

def with_component(self, component: str):
self.components.append(component)

def with_conditional(self, cond: bool, component: str):
"""
Expand Down
4 changes: 2 additions & 2 deletions src/sensai/vector_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
TVectorRegressionModel = typing.TypeVar("TVectorRegressionModel", bound="VectorRegressionModel")


class VectorModelBase(ABC):
class VectorModelBase(ABC, ToStringMixin):
"""
Base class for vector models, which defines the fundamental prediction interface.
A vector model takes data frames as input, where each row represents a vector of information.
Expand Down Expand Up @@ -93,7 +93,7 @@ def __init__(self, original_input: pd.DataFrame, original_output: pd.DataFrame):
self.original_output = original_output


class VectorModel(VectorModelFittableBase, PickleLoadSaveMixin, ToStringMixin, ABC):
class VectorModel(VectorModelFittableBase, PickleLoadSaveMixin, ABC):
"""
Represents a model which uses data frames as inputs and outputs whose rows define individual data points.
Every data frame row represents a vector of information (one-dimensional array), hence the name of the model.
Expand Down

0 comments on commit c6f76f0

Please sign in to comment.