Skip to content

Commit

Permalink
Use shared memory to write partial DataFrames of features
Browse files Browse the repository at this point in the history
  • Loading branch information
GianlucaFicarelli committed Apr 9, 2024
1 parent 370ba6d commit ceec390
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 127 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ Bug Fixes
Improvements
~~~~~~~~~~~~

- Improve performance by using shared memory (or temp directory, if shared memory is not available) to write partial DataFrames of features.
Both the used memory and the execution time should be lower than before, when processing large DataFrames.
- Use zstd compression instead of snappy when writing parquet files.
- When ``repo`` is pickled, extract the DataFrames only if they aren't already stored in the cache.
- Filter features only when ``apply_filter`` is called to save some time.
- Improve logging in ``utils.timed()``.
- Improve logging in ``blueetl.utils.timed()``.
- Improve tests coverage.

Version 0.8.2
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
"Topic :: Scientific/Engineering :: Bio-Informatics",
]
dependencies = [
"blueetl-core>=0.1.0",
"blueetl-core>=0.2.2",
"bluepysnap>=1.0.7",
"click>=8",
"jsonschema>=4.0",
Expand Down
15 changes: 10 additions & 5 deletions src/blueetl/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,15 @@ def _initialize_cache(self) -> None:
self._dump_simulations_config()
self._dump_cached_checksums()

@_raise_if(locked=False)
def is_repo_cached(self, name: str) -> bool:
"""Return whether a specific repo dataframe is present in the cache."""
# the checksums have been checked in _initialize_cache/_delete_cached_repo_files,
# so they are not calculate again here
return bool(
self._cached_checksums["repo"].get(name) and self._repo_store.path(name).is_file()
)

@_raise_if(locked=False)
def load_repo(self, name: str) -> Optional[pd.DataFrame]:
"""Load a specific repo dataframe from the cache.
Expand All @@ -404,10 +413,7 @@ def load_repo(self, name: str) -> Optional[pd.DataFrame]:
Returns:
The loaded dataframe, or None if it's not cached.
"""
is_cached = bool(self._cached_checksums["repo"].get(name))
L.debug("The repository %s is cached: %s", name, is_cached)
# the checksums have been checked in _initialize_cache/_delete_cached_repo_files,
# so they are not calculate again here
is_cached = self.is_repo_cached(name)
return self._repo_store.load(name) if is_cached else None

@_raise_if(readonly=True)
Expand All @@ -431,7 +437,6 @@ def get_cached_features_checksums(
"""Return the cached features checksums, or an empty dict if the cache doesn't exist."""
config_checksum = features_config.checksum()
cached = self._cached_checksums["features"].get(config_checksum, {})
L.debug("The features %s are cached: %s", config_checksum[:8], bool(cached))
return cached

@_raise_if(locked=False)
Expand Down
2 changes: 1 addition & 1 deletion src/blueetl/external/bnac/calculate_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def calculate_features_multi(repo, key, df, params):
"smoothed_3ms_spike_times_max_normalised_hist_1ms_bin"
],
}
).rename_axis(BIN)
).rename_axis(index=BIN)

return {
"by_gid": by_gid,
Expand Down
6 changes: 4 additions & 2 deletions src/blueetl/extract/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ def from_simulations(
New instance.
"""

def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]:
def _func(
task_index: int, key: NamedTuple, df_list: list[pd.DataFrame]
) -> tuple[NamedTuple, pd.DataFrame]:
# pylint: disable=unused-argument
# executed in a subprocess
simulations_df, neurons_df, windows_df = df_list
simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]]
Expand All @@ -122,7 +125,6 @@ def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.
df_list=[simulations.df, neurons.df, windows.df],
groupby=[SIMULATION_ID, CIRCUIT_ID],
func=_func,
parallel=True,
)
df = smart_concat(all_df, ignore_index=True)
return cls(df, cached=False, filtered=False)
167 changes: 106 additions & 61 deletions src/blueetl/features.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""Features collection."""

import logging
import tempfile
from collections import Counter, defaultdict
from collections.abc import Iterator
from copy import deepcopy
from dataclasses import dataclass
from functools import cached_property
from functools import cached_property, partial
from pathlib import Path
from typing import Any, NamedTuple, Optional, Union

import pandas as pd
from blueetl_core.parallel import isolated
from blueetl_core.utils import smart_concat

from blueetl.cache import CacheManager
Expand All @@ -17,7 +20,15 @@
from blueetl.extract.feature import Feature
from blueetl.parallel import merge_filter
from blueetl.repository import Repository
from blueetl.utils import all_equal, ensure_dtypes, extract_items, import_by_string, timed
from blueetl.store.parquet import ParquetStore
from blueetl.utils import (
all_equal,
ensure_dtypes,
extract_items,
get_shmdir,
import_by_string,
timed,
)

L = logging.getLogger(__name__)

Expand Down Expand Up @@ -373,27 +384,30 @@ def _calculate_new(


def _func_wrapper(
task_index: int,
key: NamedTuple,
neurons_df: pd.DataFrame,
windows_df: pd.DataFrame,
report_df: pd.DataFrame,
repo: Repository,
features_config: FeaturesConfig,
) -> dict[str, pd.DataFrame]:
"""Call the user function for the specified key.
temp_folder: Path,
) -> None:
"""Call the user function for the specified key, and save the resulting DataFrames.
It should be executed in a subprocess.
Args:
task_index: incremental index.
key: namedtuple specifying the filter.
neurons_df: filtered neurons DataFrame.
windows_df: filtered windows DataFrame.
report_df: filtered report DataFrame.
repo: Repository instance.
features_config: features configuration.
Returns:
dict of features DataFrames.
temp_folder: path to the shared memory (recommended) or temp directory.
"""
L.debug("Calculating features for %s", key)
L.info("Calculating features for %s", key)
merged_df = neurons_df.merge(windows_df, how="left").merge(report_df, how="left")
# The params dict is deepcopied because it could be modified in the user function.
# It could happen even with multiprocessing, because joblib may process tasks in batch.
Expand All @@ -405,16 +419,20 @@ def _func_wrapper(
# verify and process the result
if not isinstance(features_dict, dict):
raise ValueError("The user function must return a dict of dataframes")
features_records = {}
for feature_group, result_df in features_dict.items():
if not isinstance(result_df, pd.DataFrame):
raise ValueError(f"Expected a DataFrame, not {type(result_df).__name__}")
# ignore the index if it's unnamed and with one level; this can be useful
# for example when the returned DataFrame has a RangeIndex to be dropped
drop = result_df.index.names == [None]
result_df = result_df.etl.add_conditions(conditions=key._fields, values=key, drop=drop)
features_records[feature_group + features_config.suffix] = result_df
return features_records
# the conversion to the desired dtype here is important to reduce memory usage and cpu time
result_df = ensure_dtypes(result_df)
output_dir = temp_folder / f"{feature_group}{features_config.suffix}"
if not output_dir.is_dir():
# the directory should be created in the first process
output_dir.mkdir(parents=True, exist_ok=True)
ParquetStore(output_dir).dump(result_df, name=f"{task_index:08d}")


def calculate_features(
Expand All @@ -430,78 +448,105 @@ def calculate_features(
features_configs_list: list of features configurations.
Returns:
list of dicts of features DataFrames, one item for each features config.
list of dicts of features DataFrames, one item for each features_config.
"""

def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> list[dict[str, pd.DataFrame]]:
def _func(
task_index: int, key: NamedTuple, df_list: list[pd.DataFrame], temp_folder: Path
) -> None:
"""Should be called in a subprocess to execute the wrapper function."""
neurons_df, windows_df, report_df = df_list
return [
for features_config_index, features_config in enumerate(features_configs_list):
_func_wrapper(
task_index=task_index,
key=key,
neurons_df=neurons_df,
windows_df=windows_df,
report_df=report_df,
repo=repo,
features_config=features_config,
temp_folder=temp_folder / str(features_config_index),
)
for features_config in features_configs_list
]

def _filter_by_value(df: pd.DataFrame, key: str, value: Any) -> pd.DataFrame:
"""Filter the DataFrame only if the specified value is not None or empty."""
return df.etl.q({key: value}) if value else df

def _concatenate_all(
it: Iterator[list[dict[str, pd.DataFrame]]]
) -> list[dict[str, pd.DataFrame]]:
"""Concatenate all the dataframes having the same feature_group label.
@isolated
def _merge_filter_wrapper(temp_folder: Path) -> None:
"""Execute merge_filter in an isolated subprocess.
Args:
it: iterator yielding lists of dict of DataFrames, where the number of lists is equal
to the number of groups determined by features_configs_key, and the number of dicts
in each list is equal to the number of FeaturesConfig in features_configs_list.
It's faster than running in the main process, for example the subprocess
can take 167 seconds instead of 266 seconds (needing more investigation).
Returns:
list of DataFrames obtained by the concatenation of the partial DataFrames.
With the current approach:
- repo is pickled and passed to the subprocess
- the needed dataframes are loaded from the cache in the subprocess
- this is faster than passing big dataframes as parameters, that should be pickled
in the main process and unpickled in the subprocess.
"""
tmp_result: list[dict[str, list[pd.DataFrame]]] = [
defaultdict(list) for _ in range(len(features_configs_list))
]
for n_group, lst in enumerate(it):
# lst is the list of dicts returned by _func, and it contains one dict for each config
assert len(lst) == len(tmp_result)
for n_config, df_dict in enumerate(lst):
# to concatenate across the groups the DataFrames contained in each dict,
# append tmp_df to the list holding all the other tmp_df of the same type
partial_result = tmp_result[n_config]
for feature_group, tmp_df in df_dict.items():
L.debug(
"Iterating over group=%s, config=%s, feature_group=%s",
n_group,
n_config,
feature_group,
)
partial_result[feature_group].append(tmp_df)
# finally, build the dicts of DataFrames in a single concat operation
return [
{
feature_group: ensure_dtypes(smart_concat(df_list))
for feature_group, df_list in dct.items()
}
for dct in tmp_result
]

key = features_configs_key
return _concatenate_all(
merge_filter(
df_list=[
_filter_by_value(repo.neurons.df, "neuron_class", value=key.neuron_classes),
_filter_by_value(repo.windows.df, "window", value=key.windows),
_filter_by_value(
repo.neurons.df,
key="neuron_class",
value=features_configs_key.neuron_classes,
),
_filter_by_value(
repo.windows.df,
key="window",
value=features_configs_key.windows,
),
repo.report.df,
],
groupby=key.groupby,
func=_func,
parallel=True,
groupby=features_configs_key.groupby,
func=partial(_func, temp_folder=temp_folder),
)
)

def _concatenate_all(temp_folder: Path) -> list[dict[str, pd.DataFrame]]:
"""Concatenate all the dataframes having the same feature_group label.
The DataFrames are loaded from parquet files contained in a folder structure like:
/temp_folder
├── 0
│ ├── by_gid
│ │ └── *.parquet
│ ├── by_gid_and_trial
│ │ └── *.parquet
│ ├── by_neuron_class
│ │ └── *.parquet
│ └── by_neuron_class_and_trial
│ └── *.parquet
├── 1
│ └── other_features
│ └── *.parquet
...
└── n
└── other_features
└── *.parquet
where each numerical folder corresponds to one entry in features_configs_list,
and contains one subfolder for each DataFrame to be created.
Returns:
list of DataFrames obtained by the concatenation of the partial DataFrames.
"""
result = []
# the numerical subdirectories must be processed in ascending order
for index_path in sorted(temp_folder.iterdir(), key=lambda p: int(p.name)):
d = {}
# the dataframes subdirectories can be processed in any order
for features_path in sorted(index_path.iterdir()):
df = ParquetStore(features_path).load()
df = ensure_dtypes(df)
d[features_path.name] = df
result.append(d)
return result

with tempfile.TemporaryDirectory(prefix="blueetl_", dir=get_shmdir()) as _temp_folder:
with timed(L.info, "Executing _merge_filter_wrapper"):
_merge_filter_wrapper(temp_folder=Path(_temp_folder))
with timed(L.info, "Executing _concatenate_all"):
return _concatenate_all(temp_folder=Path(_temp_folder))
Loading

0 comments on commit ceec390

Please sign in to comment.