Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement cache for predictions #1334

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4abcae2
refactor: rename pipeline cache to operation cache
DRMPN Aug 31, 2024
e2f76d8
chore: Add test-cache.py for benchmarking and debugging purposes
DRMPN Sep 10, 2024
fcab118
chore: add TODOs to insert the data_cache functionality
DRMPN Sep 10, 2024
5248af3
feat: Add DataCache class for storing and loading predictions
DRMPN Sep 10, 2024
ef90a68
feat: Add DataCacheDB class for caching predicted output using a rela…
DRMPN Sep 10, 2024
77e47ca
chore: add TODO to save the predictions
DRMPN Sep 10, 2024
cfaef4f
feat: change the logic to save the entire OutputData instead
DRMPN Nov 19, 2024
612b40e
feat: get/put pickled OutputData into SQL table
DRMPN Nov 19, 2024
8b68240
chore: modify test script to use generated dataset
DRMPN Nov 21, 2024
e86ec6a
chore: modify error message
DRMPN Nov 21, 2024
064ab27
feat: pass data_cache parameter down to store a prediction in DB
DRMPN Nov 21, 2024
6edef1f
feat: test access to the stored data
DRMPN Nov 21, 2024
5328fe9
chore: remove old .pyc files
DRMPN Nov 22, 2024
a9624d1
Merge remote-tracking branch 'origin/master' into DRMPN-better-caching
DRMPN Nov 22, 2024
20a401d
chore: add comment to remove redundant param
DRMPN Nov 22, 2024
0ed1309
fix: take blob column instead of str
DRMPN Nov 22, 2024
64734b3
fix: generate better dataset
DRMPN Nov 22, 2024
662c248
feat: load predicted data from cache to calculate loss function
DRMPN Nov 22, 2024
8a5b424
chore: decrease timeout for test script
DRMPN Nov 27, 2024
9fe700e
feat: add cache for pipeline metrics
DRMPN Nov 27, 2024
12b96ad
feat: add intermediate metrics' cache
DRMPN Nov 28, 2024
d75b8d9
feat: add fit/predict cache for a single node
DRMPN Nov 28, 2024
1f4980b
feat: add cache effectiveness metric
DRMPN Dec 12, 2024
a4718a4
feat: save cache effectiveness to csv file
DRMPN Dec 12, 2024
33e3234
feat: extract metrics cache to dictionary
DRMPN Dec 14, 2024
fa08705
fix: turn on prediction cache
DRMPN Dec 17, 2024
bab641f
chore: turn off fit cache
DRMPN Dec 17, 2024
54f2232
fix: check and grab metric's cache before fit
DRMPN Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fedot/api/api_utils/api_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fedot.api.api_utils.assumptions.assumptions_handler import AssumptionsHandler
from fedot.api.api_utils.params import ApiParams
from fedot.api.time import ApiTime
from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.composer.composer_builder import ComposerBuilder
from fedot.core.composer.gp_composer.gp_composer import GPComposer
Expand Down Expand Up @@ -50,6 +50,7 @@ def init_cache(self):
self.preprocessing_cache = PreprocessingCache(cache_dir)
# in case of previously generated singleton cache
self.preprocessing_cache.reset()
# TODO: data_cache

def obtain_model(self, train_data: InputData) -> Tuple[Pipeline, Sequence[Pipeline], OptHistory]:
""" Function for composing FEDOT pipeline model """
Expand Down Expand Up @@ -125,7 +126,7 @@ def compose_pipeline(self, train_data: InputData, initial_assumption: Sequence[P
.with_optimizer(self.params.get('optimizer'))
.with_optimizer_params(parameters=self.params.optimizer_params)
.with_metrics(self.metrics)
.with_cache(self.pipelines_cache, self.preprocessing_cache)
.with_cache(self.pipelines_cache, self.preprocessing_cache) # TODO: data_cache
.with_graph_generation_param(self.params.graph_generation_params)
.build())

Expand Down
3 changes: 2 additions & 1 deletion fedot/api/api_utils/assumptions/assumptions_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from fedot.api.api_utils.assumptions.assumptions_builder import AssumptionsBuilder
from fedot.api.api_utils.presets import change_preset_based_on_initial_fit
from fedot.api.time import ApiTime
from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.data.data import InputData
from fedot.core.data.data_split import train_test_data_setup
Expand Down Expand Up @@ -74,6 +74,7 @@ def fit_assumption_and_check_correctness(self,
pipelines_cache.save_pipeline(pipeline)
if preprocessing_cache is not None:
preprocessing_cache.add_preprocessor(pipeline)
# TODO: data_cache

pipeline.predict(data_test)
self.log.info('Initial pipeline was fitted successfully')
Expand Down
2 changes: 1 addition & 1 deletion fedot/core/caching/base_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from golem.core.log import default_log
from golem.utilities.singleton_meta import SingletonMeta

from fedot.core.caching.pipelines_cache_db import OperationsCacheDB
from fedot.core.caching.operations_cache_db import OperationsCacheDB
from fedot.core.caching.preprocessing_cache_db import PreprocessingCacheDB


Expand Down
105 changes: 105 additions & 0 deletions fedot/core/caching/data_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import sqlite3
from typing import TYPE_CHECKING, List, Optional, Union

import numpy as np

from fedot.core.caching.base_cache import BaseCache
from fedot.core.caching.data_cache_db import DataCacheDB
from fedot.core.data.data import OutputData

if TYPE_CHECKING:
from fedot.core.pipelines.pipeline import Pipeline


class DataCache(BaseCache):
"""
Stores/loads predictions to increase performance of calculations.

:param cache_dir: path to the place where cache files should be stored.
"""

def __init__(self, cache_dir: Optional[str] = None, custom_pid=None):
super().__init__(DataCacheDB(cache_dir, custom_pid))

def save_prediction(self, prediction: np.ndarray, uid: str):
"""
Save the prediction for a given UID.

:param prediction (np.ndarray): The prediction to be saved.
:param uid (str): The unique identifier for the prediction.
"""
try:
self._db.add_prediction([(uid, prediction)])
except Exception as ex:
unexpected_exc = not (
isinstance(ex, sqlite3.DatabaseError) and "disk is full" in str(ex)
)
self.log.warning(
f"Predictions can not be saved: {ex}. Continue",
exc=ex,
raise_if_test=unexpected_exc,
)

def load_prediction(self, uid: str) -> np.ndarray:
"""
Load the prediction data for the given unique identifier.
:param uid (str): The unique identifier of the prediction data.
:return np.ndarray: The loaded prediction data.
"""
predict = self._db.get_prediction(uid)
# TODO: restore OutputData from predict
return predict

def save_data(
self,
pipeline: "Pipeline",
outputData: OutputData,
fold_id: Optional[int] = None,
):
"""
Save the pipeline data to the cache.

:param pipeline: The pipeline data to be cached.
:type pipeline: Pipeline
:param outputData: The output data to be saved.
:type outputData: OutputData
:param fold_id: Optional part of the cache item UID (can be used to specify the number of CV fold).
:type fold_id: Optional[int]
"""
uid = self._create_uid(pipeline, fold_id)
# TODO: save OutputData as a whole to the cache
self.save_prediction(outputData.predict, uid)

def try_load_data(
self, pipeline: "Pipeline", fold_id: Optional[int] = None
) -> OutputData:
# create parameter dosctring
"""
Try to load data for the given pipeline and fold ID.

:param pipeline (Pipeline): The pipeline for which to load the data.
:param fold_id (Optional[int]): The fold ID for which to load the data. Defaults to None.
:return OutputData: The loaded data.
"""
# TODO: implement loading of pipeline data
uid = self._create_uid(pipeline, fold_id)
self.load_prediction(uid)

def _create_uid(
self,
pipeline: "Pipeline",
fold_id: Optional[int] = None,
) -> str:
"""
Generate a unique identifier for a pipeline.

:param pipeline (Pipeline): The pipeline for which the unique identifier is generated.
:param fold_id (Optional[int]): The fold ID (default: None).
:return str: The unique identifier generated for the pipeline.
"""
base_uid = ""
for node in pipeline.nodes:
base_uid += f"{node.descriptive_id}_"
if fold_id is not None:
base_uid += f"{fold_id}"
return base_uid
90 changes: 90 additions & 0 deletions fedot/core/caching/data_cache_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pickle
import sqlite3
from contextlib import closing
from os import getpid
from typing import List, Optional, Tuple, TypeVar

import numpy as np

from fedot.core.caching.base_cache_db import BaseCacheDB


class DataCacheDB(BaseCacheDB):
"""
Database for `DataCache` class.
Includes low-level idea of caching predicted output using relational database.

:param cache_dir: path to the place where cache files should be stored.
"""

def __init__(self, cache_dir: Optional[str] = None, custom_pid=None):
super().__init__("prediction", cache_dir)
self._init_db()

def add_prediction(self, uid_val_lst: List[Tuple[str, np.ndarray]]):
"""
Adds operation score to DB table via its uid

:param uid_val_lst: list of pairs (uid -> prediction) to be saved
"""
try:
with closing(sqlite3.connect(self.db_path)) as conn:
with conn:
Comment on lines +30 to +31
Copy link
Collaborator

@kasyanovse kasyanovse Oct 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему бы не реализовать DataCacheDB как синглтон, подключаясь к БД один раз при инициализации (реинициализации в новых инстансах питона в многопотоке)?

cur = conn.cursor()
pickled = [
(
uid,
sqlite3.Binary(pickle.dumps(val, pickle.HIGHEST_PROTOCOL)),
)
for uid, val in uid_val_lst
]
cur.executemany(
f"INSERT OR IGNORE INTO {self._main_table} VALUES (?, ?);",
pickled,
)
except sqlite3.Error as e:
print(f"SQLite error: {e}")

def get_prediction(self, uids: List[str]) -> List[Optional[np.ndarray]]:
"""
Maps given uids to operations from DB and puts None if is not present.

:param uids: list of operations uids to be mapped

:return retrieved: list of operations taken from DB table with None where it wasn't present
"""
try:
with closing(sqlite3.connect(self.db_path)) as conn:
with conn:
cur = conn.cursor()
placeholders = ",".join("?" for _ in uids)
query = (
f"SELECT id, prediction FROM {self._main_table} "
f"WHERE id IN ({placeholders})"
)
cur.execute(query, uids)
results = {row[0]: pickle.loads(row[1]) for row in cur.fetchall()}
retrieved = [results.get(uid) for uid in uids]
return retrieved
except sqlite3.Error as e:
print(f"SQLite error: {e}")
return [None] * len(uids)

def _init_db(self):
"""
Initializes DB working table.
"""
try:
with closing(sqlite3.connect(self.db_path)) as conn:
with conn:
cur = conn.cursor()
cur.execute(
(
f"CREATE TABLE IF NOT EXISTS {self._main_table} ("
"id TEXT PRIMARY KEY,"
"prediction BLOB"
");"
)
)
except sqlite3.Error as e:
print(f"SQLite error: {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from golem.utilities.data_structures import ensure_wrapped_in_sequence

from fedot.core.caching.base_cache import BaseCache
from fedot.core.caching.pipelines_cache_db import OperationsCacheDB
from fedot.core.caching.operations_cache_db import OperationsCacheDB
from fedot.core.pipelines.node import PipelineNode

if TYPE_CHECKING:
Expand Down
4 changes: 3 additions & 1 deletion fedot/core/composer/composer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from golem.core.optimisers.optimizer import AlgorithmParameters, GraphGenerationParams, GraphOptimizer
from golem.utilities.data_structures import ensure_wrapped_in_sequence

from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.composer.composer import Composer
from fedot.core.composer.gp_composer.gp_composer import GPComposer
Expand Down Expand Up @@ -55,6 +55,7 @@ def __init__(self, task: Task):

self.pipelines_cache: Optional[OperationsCache] = None
self.preprocessing_cache: Optional[PreprocessingCache] = None
# TODO: self.data_cache: Optional[DataCache] = None

def with_composer(self, composer_cls: Optional[Type[Composer]]):
if composer_cls is not None:
Expand Down Expand Up @@ -100,6 +101,7 @@ def with_cache(self, pipelines_cache: Optional[OperationsCache] = None,
preprocessing_cache: Optional[PreprocessingCache] = None):
self.pipelines_cache = pipelines_cache
self.preprocessing_cache = preprocessing_cache
# TODO: self.data_cache = data_cache
return self

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion fedot/core/composer/gp_composer/gp_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from golem.core.optimisers.graph import OptGraph
from golem.core.optimisers.optimizer import GraphOptimizer

from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.composer.composer import Composer
from fedot.core.data.data import InputData
Expand Down
8 changes: 6 additions & 2 deletions fedot/core/optimisers/objective/data_objective_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from golem.core.optimisers.objective.objective import Objective, to_fitness
from golem.core.optimisers.objective.objective_eval import ObjectiveEvaluate

from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.data.data import InputData
from fedot.core.operations.model import Model
Expand Down Expand Up @@ -49,6 +49,7 @@ def __init__(self,
self._validation_blocks = validation_blocks
self._pipelines_cache = pipelines_cache
self._preprocessing_cache = preprocessing_cache
# TODO: self._data_cache = data_cache
self._log = default_log(self)
self._do_unfit = do_unfit

Expand Down Expand Up @@ -111,7 +112,7 @@ def prepare_graph(self, graph: Pipeline, train_data: InputData,

# load preprocessing
graph.try_load_from_cache(self._pipelines_cache, self._preprocessing_cache, fold_id)
graph.fit(
predicted_train = graph.fit(
train_data,
n_jobs=n_jobs,
time_constraint=self._time_constraint
Expand All @@ -121,6 +122,9 @@ def prepare_graph(self, graph: Pipeline, train_data: InputData,
self._pipelines_cache.save_pipeline(graph, fold_id)
if self._preprocessing_cache is not None:
self._preprocessing_cache.add_preprocessor(graph, fold_id)
# TODO:
# if self._data_cache is not None:
# self._data_cache.save_data(graph, predicted_train, fold_id)

return graph

Expand Down
5 changes: 5 additions & 0 deletions fedot/core/pipelines/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def fit(self, input_data: InputData) -> OutputData:

input_data = self._get_input_data(input_data=input_data, parent_operation='fit')

# TODO: try load from cache

if self.fitted_operation is None:
with Timer() as t:
self.fitted_operation, operation_predict = self.operation.fit(params=self._parameters,
Expand Down Expand Up @@ -233,6 +235,9 @@ def predict(self, input_data: InputData, output_mode: str = 'default') -> Output
data=input_data,
output_mode=output_mode)
self.inference_time_in_seconds = round(t.seconds_from_start, 3)

# TODO: save predict to cache

return operation_predict

def get_data_from_node(self) -> dict:
Expand Down
5 changes: 4 additions & 1 deletion fedot/core/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from golem.utilities.serializable import Serializable
from golem.visualisation.graph_viz import NodeColorType

from fedot.core.caching.pipelines_cache import OperationsCache
from fedot.core.caching.operations_cache import OperationsCache
from fedot.core.caching.preprocessing_cache import PreprocessingCache
from fedot.core.data.data import InputData, OutputData
from fedot.core.data.multi_modal import MultiModalData
Expand Down Expand Up @@ -252,6 +252,7 @@ def try_load_from_cache(self, cache: Optional[OperationsCache], preprocessing_ca
cache.try_load_into_pipeline(self, fold_id)
if preprocessing_cache is not None:
preprocessing_cache.try_load_preprocessor(self, fold_id)
# TODO: data_cache

def predict(self, input_data: Union[InputData, MultiModalData], output_mode: str = 'default') -> OutputData:
"""Runs the predict process in all of the pipeline nodes starting with root
Expand All @@ -270,6 +271,8 @@ def predict(self, input_data: Union[InputData, MultiModalData], output_mode: str
OutputData: values predicted on the provided ``input_data``
"""

# TODO: data_cache

if not self.is_fitted:
ex = 'Pipeline is not fitted yet'
self.log.error(ex)
Expand Down
Loading
Loading