diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 57add12..41b3b48 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,18 @@ Changelog ========= +Version 0.9.1 +------------- + +Improvements +~~~~~~~~~~~~ + +- Improve performance of report extraction and features calculation by writing partial DataFrames to the shared memory (or temp directory, if shared memory is not available). + Both the used memory and the execution time should be lower than before, when processing large DataFrames. +- Use zstd compression instead of snappy when writing parquet files. +- When ``repo`` is pickled, extract the DataFrames only if they aren't already stored in the cache. +- Remove fastparquet extra dependency. + Version 0.9.0 ------------- diff --git a/pyproject.toml b/pyproject.toml index 4334dea..e0f138d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,8 +41,6 @@ dynamic = ["version"] [project.optional-dependencies] extra = [ # extra requirements that may be dropped at some point - "fastparquet>=0.8.3,!=2023.1.0", # needed by pandas to read and write parquet files - "orjson", # faster json decoder used by fastparquet "tables>=3.6.1", # needed by pandas to read and write hdf files ] external = [ diff --git a/src/blueetl/extract/report.py b/src/blueetl/extract/report.py index c50f706..e858b4c 100644 --- a/src/blueetl/extract/report.py +++ b/src/blueetl/extract/report.py @@ -1,9 +1,12 @@ """Generic Report extractor.""" import logging +import tempfile from abc import ABCMeta, abstractmethod from dataclasses import dataclass -from typing import NamedTuple, Optional, TypeVar +from functools import partial +from pathlib import Path +from typing import Callable, NamedTuple, Optional, TypeVar import pandas as pd from blueetl_core.utils import smart_concat @@ -16,6 +19,8 @@ from blueetl.extract.simulations import Simulations from blueetl.extract.windows import Windows from blueetl.parallel import merge_filter +from blueetl.store.parquet import ParquetStore +from blueetl.utils import ensure_dtypes, get_shmdir, timed L = logging.getLogger(__name__) ReportExtractorT = TypeVar("ReportExtractorT", bound="ReportExtractor") @@ -96,33 +101,55 @@ def from_simulations( Returns: New instance. """ - - def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]: - # executed in a subprocess - simulations_df, neurons_df, windows_df = df_list - simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]] - assert simulation_id == key.simulation_id # type: ignore[attr-defined] - df_list = [] - for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]): - population = neuron_classes.df.etl.one( - circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class - )[POPULATION] - result_df = cls._load_values( - simulation=simulation, - population=population, - gids=df[GID], - windows_df=windows_df, + with tempfile.TemporaryDirectory(prefix="blueetl_", dir=get_shmdir()) as _temp_folder: + with timed(L.info, "Executing merge_filter "): + func = partial( + _merge_filter_func, + temp_folder=Path(_temp_folder), name=name, + neuron_classes_df=neuron_classes.df, + dataframe_builder=cls._load_values, + ) + merge_filter( + df_list=[simulations.df, neurons.df, windows.df], + groupby=[SIMULATION_ID, CIRCUIT_ID], + func=func, ) - result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key] - df_list.append(result_df) - return smart_concat(df_list, ignore_index=True) - - all_df = merge_filter( - df_list=[simulations.df, neurons.df, windows.df], - groupby=[SIMULATION_ID, CIRCUIT_ID], - func=_func, - parallel=True, + with timed(L.info, "Executing concatenation"): + df = ParquetStore(Path(_temp_folder)).load() + df = ensure_dtypes(df) + return cls(df, cached=False, filtered=False) + + +def _merge_filter_func( + task_index: int, + key: NamedTuple, + df_list: list[pd.DataFrame], + temp_folder: Path, + name: str, + neuron_classes_df: pd.DataFrame, + dataframe_builder: Callable[..., pd.DataFrame], +) -> None: + """Executed in a subprocess, write a partial DataFrame to temp_folder.""" + # pylint: disable=too-many-locals + simulations_df, neurons_df, windows_df = df_list + simulation_id, simulation = simulations_df.etl.one()[[SIMULATION_ID, SIMULATION]] + assert simulation_id == key.simulation_id # type: ignore[attr-defined] + df_list = [] + for inner_key, df in neurons_df.etl.groupby_iter([CIRCUIT_ID, NEURON_CLASS]): + population = neuron_classes_df.etl.one( + circuit_id=inner_key.circuit_id, neuron_class=inner_key.neuron_class + )[POPULATION] + result_df = dataframe_builder( + simulation=simulation, + population=population, + gids=df[GID], + windows_df=windows_df, + name=name, ) - df = smart_concat(all_df, ignore_index=True) - return cls(df, cached=False, filtered=False) + result_df[[SIMULATION_ID, *inner_key._fields]] = [simulation_id, *inner_key] + df_list.append(result_df) + result_df = smart_concat(df_list, ignore_index=True) + # the conversion to the desired dtype here is important to reduce memory usage and cpu time + result_df = ensure_dtypes(result_df) + ParquetStore(temp_folder).dump(result_df, name=f"{task_index:08d}") diff --git a/src/blueetl/features.py b/src/blueetl/features.py index e3d7df6..9fbf0e6 100644 --- a/src/blueetl/features.py +++ b/src/blueetl/features.py @@ -1,14 +1,17 @@ """Features collection.""" import logging +import tempfile from collections import Counter, defaultdict from collections.abc import Iterator from copy import deepcopy from dataclasses import dataclass -from functools import cached_property +from functools import cached_property, partial +from pathlib import Path from typing import Any, NamedTuple, Optional, Union import pandas as pd +from blueetl_core.parallel import isolated from blueetl_core.utils import smart_concat from blueetl.cache import CacheManager @@ -17,7 +20,15 @@ from blueetl.extract.feature import Feature from blueetl.parallel import merge_filter from blueetl.repository import Repository -from blueetl.utils import all_equal, ensure_dtypes, extract_items, import_by_string, timed +from blueetl.store.parquet import ParquetStore +from blueetl.utils import ( + all_equal, + ensure_dtypes, + extract_items, + get_shmdir, + import_by_string, + timed, +) L = logging.getLogger(__name__) @@ -236,7 +247,8 @@ def _process_features( if not f._cached or f._filtered: # pylint: disable=protected-access to_be_written[name] = f.to_pandas() if to_be_written: - self.cache_manager.dump_features(to_be_written, features_config=features_config) + with timed(L.info, "Writing cached features"): + self.cache_manager.dump_features(to_be_written, features_config=features_config) def _log_features(features: dict[str, Feature], n: int, tot: int, features_id) -> None: """Log a message about the features being processed.""" @@ -364,7 +376,7 @@ def _calculate_new( features_configs_list: list[FeaturesConfig], ) -> Iterator[tuple[FeaturesConfig, dict[str, Feature]]]: """Calculate new features and yield tuples.""" - results = calculate_features( + results = _calculate_features( repo=repo, features_configs_key=features_configs_key, features_configs_list=features_configs_list, @@ -377,28 +389,31 @@ def _calculate_new( yield features_config, features -def _func_wrapper( +def _user_func_wrapper( + task_index: int, key: NamedTuple, neurons_df: pd.DataFrame, windows_df: pd.DataFrame, report_df: pd.DataFrame, repo: Repository, features_config: FeaturesConfig, -) -> dict[str, pd.DataFrame]: - """Call the user function for the specified key. + temp_folder: Path, +) -> None: + """Call the user function for the specified key, and save the resulting DataFrames. + + It should be executed in a subprocess. Args: + task_index: incremental index. key: namedtuple specifying the filter. neurons_df: filtered neurons DataFrame. windows_df: filtered windows DataFrame. report_df: filtered report DataFrame. repo: Repository instance. features_config: features configuration. - - Returns: - dict of features DataFrames. + temp_folder: path to the shared memory (recommended) or temp directory. """ - L.debug("Calculating features for %s", key) + L.info("Calculating features for %s", key) merged_df = neurons_df.merge(windows_df, how="left").merge(report_df, how="left") # The params dict is deepcopied because it could be modified in the user function. # It could happen even with multiprocessing, because joblib may process tasks in batch. @@ -410,7 +425,6 @@ def _func_wrapper( # verify and process the result if not isinstance(features_dict, dict): raise ValueError("The user function must return a dict of dataframes") - features_records = {} for feature_group, result_df in features_dict.items(): if not isinstance(result_df, pd.DataFrame): raise ValueError(f"Expected a DataFrame, not {type(result_df).__name__}") @@ -418,11 +432,130 @@ def _func_wrapper( # for example when the returned DataFrame has a RangeIndex to be dropped drop = result_df.index.names == [None] result_df = result_df.etl.add_conditions(conditions=key._fields, values=key, drop=drop) - features_records[feature_group + features_config.suffix] = result_df - return features_records + # the conversion to the desired dtype here is important to reduce memory usage and cpu time + result_df = ensure_dtypes(result_df) + output_dir = temp_folder / f"{feature_group}{features_config.suffix}" + if not output_dir.is_dir(): + # the directory should be created in the first process + output_dir.mkdir(parents=True, exist_ok=True) + ParquetStore(output_dir).dump(result_df, name=f"{task_index:08d}") -def calculate_features( +def _merge_filter_func( + task_index: int, + key: NamedTuple, + df_list: list[pd.DataFrame], + temp_folder: Path, + repo: Repository, + features_configs_list: list[FeaturesConfig], +) -> None: + """Executed in a subprocess, call the wrapper function for each features_config.""" + neurons_df, windows_df, report_df = df_list + for features_config_index, features_config in enumerate(features_configs_list): + _user_func_wrapper( + task_index=task_index, + key=key, + neurons_df=neurons_df, + windows_df=windows_df, + report_df=report_df, + repo=repo, + features_config=features_config, + temp_folder=temp_folder / str(features_config_index), + ) + + +def _filter_by_value(df: pd.DataFrame, key: str, value: Any) -> pd.DataFrame: + """Filter the DataFrame only if the specified value is not None or empty.""" + return df.etl.q({key: value}) if value else df + + +@isolated +def _merge_filter_wrapper( + temp_folder: Path, + repo: Repository, + features_configs_key: FeaturesConfigKey, + features_configs_list: list[FeaturesConfig], +) -> None: + """Execute merge_filter in an isolated subprocess. + + It's faster than running in the main process, for example the subprocess + can take 167 seconds instead of 266 seconds (needing more investigation). + + With the current approach: + + - repo is pickled and passed to the subprocess + - the needed dataframes are loaded from the cache in the subprocess + - this is faster than passing big dataframes as parameters, that should be pickled + in the main process and unpickled in the subprocess. + """ + func = partial( + _merge_filter_func, + temp_folder=temp_folder, + features_configs_list=features_configs_list, + repo=repo, + ) + merge_filter( + df_list=[ + _filter_by_value( + repo.neurons.df, + key="neuron_class", + value=features_configs_key.neuron_classes, + ), + _filter_by_value( + repo.windows.df, + key="window", + value=features_configs_key.windows, + ), + repo.report.df, + ], + groupby=features_configs_key.groupby, + func=func, + ) + + +def _concatenate_all(temp_folder: Path) -> list[dict[str, pd.DataFrame]]: + """Concatenate all the dataframes having the same feature_group label. + + The DataFrames are loaded from parquet files contained in a folder structure like: + + /temp_folder + ├── 0 + │ ├── by_gid + │ │ └── *.parquet + │ ├── by_gid_and_trial + │ │ └── *.parquet + │ ├── by_neuron_class + │ │ └── *.parquet + │ └── by_neuron_class_and_trial + │ └── *.parquet + ├── 1 + │ └── other_features + │ └── *.parquet + ... + └── n + └── other_features + └── *.parquet + + where each numerical folder corresponds to one entry in features_configs_list, + and contains one subfolder for each DataFrame to be created. + + Returns: + list of DataFrames obtained by the concatenation of the partial DataFrames. + """ + result = [] + # the numerical subdirectories must be processed in ascending order + for index_path in sorted(temp_folder.iterdir(), key=lambda p: int(p.name)): + d = {} + # the dataframes subdirectories can be processed in any order + for features_path in sorted(index_path.iterdir()): + df = ParquetStore(features_path).load() + df = ensure_dtypes(df) + d[features_path.name] = df + result.append(d) + return result + + +def _calculate_features( repo: Repository, features_configs_key: FeaturesConfigKey, features_configs_list: list[FeaturesConfig], @@ -435,78 +568,15 @@ def calculate_features( features_configs_list: list of features configurations. Returns: - list of dicts of features DataFrames, one item for each features config. + list of dicts of features DataFrames, one item for each features_config. """ - - def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> list[dict[str, pd.DataFrame]]: - """Should be called in a subprocess to execute the wrapper function.""" - neurons_df, windows_df, report_df = df_list - return [ - _func_wrapper( - key=key, - neurons_df=neurons_df, - windows_df=windows_df, - report_df=report_df, + with tempfile.TemporaryDirectory(prefix="blueetl_", dir=get_shmdir()) as _temp_folder: + with timed(L.info, "Executing merge_filter"): + _merge_filter_wrapper( + temp_folder=Path(_temp_folder), repo=repo, - features_config=features_config, + features_configs_key=features_configs_key, + features_configs_list=features_configs_list, ) - for features_config in features_configs_list - ] - - def _filter_by_value(df: pd.DataFrame, key: str, value: Any) -> pd.DataFrame: - """Filter the DataFrame only if the specified value is not None or empty.""" - return df.etl.q({key: value}) if value else df - - def _concatenate_all( - it: Iterator[list[dict[str, pd.DataFrame]]] - ) -> list[dict[str, pd.DataFrame]]: - """Concatenate all the dataframes having the same feature_group label. - - Args: - it: iterator yielding lists of dict of DataFrames, where the number of lists is equal - to the number of groups determined by features_configs_key, and the number of dicts - in each list is equal to the number of FeaturesConfig in features_configs_list. - - Returns: - list of DataFrames obtained by the concatenation of the partial DataFrames. - """ - tmp_result: list[dict[str, list[pd.DataFrame]]] = [ - defaultdict(list) for _ in range(len(features_configs_list)) - ] - for n_group, lst in enumerate(it): - # lst is the list of dicts returned by _func, and it contains one dict for each config - assert len(lst) == len(tmp_result) - for n_config, df_dict in enumerate(lst): - # to concatenate across the groups the DataFrames contained in each dict, - # append tmp_df to the list holding all the other tmp_df of the same type - partial_result = tmp_result[n_config] - for feature_group, tmp_df in df_dict.items(): - L.debug( - "Iterating over group=%s, config=%s, feature_group=%s", - n_group, - n_config, - feature_group, - ) - partial_result[feature_group].append(tmp_df) - # finally, build the dicts of DataFrames in a single concat operation - return [ - { - feature_group: ensure_dtypes(smart_concat(df_list)) - for feature_group, df_list in dct.items() - } - for dct in tmp_result - ] - - key = features_configs_key - return _concatenate_all( - merge_filter( - df_list=[ - _filter_by_value(repo.neurons.df, "neuron_class", value=key.neuron_classes), - _filter_by_value(repo.windows.df, "window", value=key.windows), - repo.report.df, - ], - groupby=key.groupby, - func=_func, - parallel=True, - ) - ) + with timed(L.info, "Executing concatenation"): + return _concatenate_all(temp_folder=Path(_temp_folder)) diff --git a/src/blueetl/parallel.py b/src/blueetl/parallel.py index 152cecd..c269798 100644 --- a/src/blueetl/parallel.py +++ b/src/blueetl/parallel.py @@ -62,7 +62,7 @@ def _groups(df_list: list[pd.DataFrame], groupby: list[str]) -> pd.DataFrame: def _func_generator( df_list: list[pd.DataFrame], groupby: list[str], func: Callable -) -> Iterator[Callable[[], Any]]: +) -> Iterator[Callable]: """Yield functions to be executed in a subprocess.""" groups = _groups(df_list, groupby=groupby) caches = [CachedDataFrame(df) for df in df_list] @@ -76,9 +76,8 @@ def _func_generator( def merge_filter( df_list: list[pd.DataFrame], groupby: list[str], - func: Callable[[NamedTuple, list[pd.DataFrame]], Any], - parallel: bool = True, -) -> Iterator[Any]: + func: Callable[[int, NamedTuple, list[pd.DataFrame]], Any], +) -> list: """Merge the specified columns of the list of DataFrames, and call func for each combination. The merge operation is similar to a SQL left outer join. @@ -86,40 +85,41 @@ def merge_filter( Args: df_list: list of DataFrames. groupby: list of columns to consider across the DataFrames. - func: callback function accepting ``key: NamedTuple, df_list: list[pd.DataFrames]``, - executed for each calculated combination of columns. - parallel: True to call the callback function in subprocesses, False otherwise. + func: callback executed for each calculated combination of columns, with parameters: - Yields: - values returned by the callback function. + - task_index (int): task index. + - key (NamedTuple): key used to filter the DataFrames passed to each function call. + - df_list (list[pd.DataFrames]): list of DataFrames filtered by key. + + Returns: + list of values returned by the callback function. """ func_generator = _func_generator(df_list=df_list, groupby=groupby, func=func) - if parallel: - yield from run_parallel(Task(f) for f in func_generator) - else: - yield from (f() for f in func_generator) + task_generator = (Task(partial(f, task_index=i)) for i, f in enumerate(func_generator)) + return run_parallel(task_generator) def merge_groupby( - df_list: list[pd.DataFrame], groupby: list[str], parallel=True + df_list: list[pd.DataFrame], groupby: list[str] ) -> Iterator[tuple[NamedTuple, pd.DataFrame]]: """Merge a list of DataFrames, group by the given keys, and yield keys and groups. - The merge operation is similar to a SQL left outer join. - - If parallel is True, the dataframes are filtered in the main process and merged in subprocesses. - If parallel is False, the dataframes are merged in the same process. + The merge operation is similar to a SQL left outer join, but + the dataframes are filtered in the main process and merged in subprocesses. """ - def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]: + def _func( + task_index: int, key: NamedTuple, df_list: list[pd.DataFrame] + ) -> tuple[NamedTuple, pd.DataFrame]: + # pylint: disable=unused-argument # executed in a subprocess merged = df_list[0] for df in df_list[1:]: merged = merged.merge(df, how="left", copy=False) return key, merged - yield from merge_filter(df_list=df_list, groupby=groupby, func=_func, parallel=parallel) + yield from merge_filter(df_list=df_list, groupby=groupby, func=_func) def call_by_simulation( @@ -148,7 +148,9 @@ def call_by_simulation( list of results """ - def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd.DataFrame]: + def _func( + task_index: int, key: NamedTuple, df_list: list[pd.DataFrame] + ) -> tuple[NamedTuple, pd.DataFrame]: # pylint: disable=unused-argument # executed in a subprocess simulation_row = convert_row(df_list[0].reset_index()) @@ -167,6 +169,5 @@ def _func(key: NamedTuple, df_list: list[pd.DataFrame]) -> tuple[NamedTuple, pd. df_list=[simulations, *dataframes_to_filter.values()], groupby=[SIMULATION_ID, CIRCUIT_ID], func=_func, - parallel=True, ) ) diff --git a/src/blueetl/repository.py b/src/blueetl/repository.py index 270347c..af02fe8 100644 --- a/src/blueetl/repository.py +++ b/src/blueetl/repository.py @@ -49,16 +49,18 @@ def extract(self, name: str) -> ExtractorT: name: name of the dataframe. """ with timed(L.debug, f"Extracting {name}") as messages: - df = self._repo.cache_manager.load_repo(name) - if df is not None: + is_cached = self._repo.cache_manager.is_repo_cached(name) + if is_cached: + df = self._repo.cache_manager.load_repo(name) instance = self.extract_cached(df, name) else: instance = self.extract_new() assert instance is not None, "The extraction didn't return a valid instance." - is_cached = instance._cached # pylint: disable=protected-access + assert is_cached is instance._cached # pylint: disable=protected-access is_filtered = instance._filtered # pylint: disable=protected-access if not is_cached or is_filtered: - self._repo.cache_manager.dump_repo(df=instance.to_pandas(), name=name) + with timed(L.info, f"Writing cached {name}"): + self._repo.cache_manager.dump_repo(df=instance.to_pandas(), name=name) messages[:] = [f"Extracted {name}: {is_cached=} {is_filtered=} rows={len(instance.df)}"] return instance @@ -244,7 +246,7 @@ def __init__( def __getstate__(self) -> dict: """Get the object state when the object is pickled.""" - if not self.is_extracted(): + if not self.is_cached(): # ensure that the dataframes are extracted and stored to disk, # because we want to be able to use the cached data in the subprocesses. L.info("Extracting dataframes before serialization") @@ -335,7 +337,7 @@ def extract(self) -> None: self.check_extractions() def is_extracted(self) -> bool: - """Return True if all the dataframes have been extracted.""" + """Return True if all the dataframes have been extracted and loaded in memory.""" # note: the cached_property is stored as an attribute after it's accessed return all(name in self.__dict__ for name in self.names) @@ -344,6 +346,10 @@ def check_extractions(self) -> None: if not self.is_extracted(): raise RuntimeError("Not all the dataframes have been extracted") + def is_cached(self) -> bool: + """Return True if all the dataframes have been cached (loaded in memory or not).""" + return all(self.cache_manager.is_repo_cached(name) for name in self.names) + def missing_simulations(self) -> pd.DataFrame: """Return a DataFrame with the simulations ignored because of missing spikes. diff --git a/src/blueetl/store/base.py b/src/blueetl/store/base.py index bf15e32..e092a6f 100644 --- a/src/blueetl/store/base.py +++ b/src/blueetl/store/base.py @@ -26,7 +26,7 @@ def __init__(self, basedir: StrOrPath) -> None: basedir: base directory where the files should be stored. """ self._basedir = resolve_path(basedir) - L.info("Using class %s with basedir %s", self.__class__.__name__, self.basedir) + L.debug("Using class %s with basedir %s", self.__class__.__name__, self.basedir) @property def basedir(self) -> Path: @@ -51,12 +51,17 @@ def delete(self, name: str) -> None: self.path(name).unlink(missing_ok=True) def path(self, name: str) -> Path: - """Return the full path of the file with the given name and the class extension.""" - return self.basedir / f"{name}.{self.extension}" + """Return the full path of the file with the given name and the class extension. + + If name is empty, then return the base directory. + This can be useful when working with partitioned DataFrames and + directories containing multiple files. + """ + return self.basedir / f"{name}.{self.extension}" if name else self.basedir def checksum(self, name: str) -> Optional[str]: """Return a checksum of the file, or None if it doesn't exist.""" path = self.path(name) - if path.exists(): + if path.is_file(): return checksum(path) return None diff --git a/src/blueetl/store/parquet.py b/src/blueetl/store/parquet.py index 129f032..795bb88 100644 --- a/src/blueetl/store/parquet.py +++ b/src/blueetl/store/parquet.py @@ -1,9 +1,12 @@ """Parquet data store.""" import logging +from pathlib import Path from typing import Any, Optional import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq from blueetl.store.base import BaseStore from blueetl.types import StrOrPath @@ -12,6 +15,56 @@ L = logging.getLogger(__name__) +def _get_unified_schema(path: StrOrPath) -> pa.Schema: + """Infer and return the unified schema from all the parquet files in the given directory. + + Needed because pq.read_table() would infer the schema from just the first file, + unless a metadata file is used. See: + https://github.com/ueshin/apache-arrow/blob/0a2cf3ac/python/pyarrow/parquet.py#L724 + + This can cause problems when a column in the first file contains empty lists, + since they are considered as: + + times: list + child 0, element: null + + while in other files containing lists of floats they are considered as: + + times: list + child 0, element: double + + and with pyarrow 15.0.0 this would raise: + + ArrowNotImplementedError: + Unsupported cast from double to null using function cast_null + + Processing all the parquet files could require some time (around 4 seconds for 13824 files + stored in the shared memory /dev/shm), so it doesn't parse all the files if possible. + + If the performance are improved, we could replace the content of the function with: + + schemas = [pq.read_schema(file_path) for file_path in Path(path).iterdir()] + return pa.unify_schemas(schemas) + """ + with timed(L.debug, f"Inferring schema for {path}") as messages: + schemas = [] + null_list_type = pa.list_(pa.null()) + valid_fields: dict[str, bool] = {} + for file_path in Path(path).iterdir(): + all_valid = True + file_schema = pq.read_schema(file_path) + for field in file_schema: + if valid_fields.get(field.name, False) is False: + valid_fields[field.name] = not field.type.equals(null_list_type) + all_valid = all_valid and valid_fields[field.name] + schemas.append(file_schema) + if all_valid: + # break because the merged schema doesn't contain any list of nulls + break + messages.append(f"with {len(schemas)} loaded schemas") + return pa.unify_schemas(schemas) + + class ParquetStore(BaseStore): """Parquet data store.""" @@ -20,23 +73,12 @@ def __init__(self, basedir: StrOrPath) -> None: super().__init__(basedir=basedir) self._dump_options: dict[str, Any] = { "engine": "pyarrow", - # "engine": "fastparquet", - # "compression": "snappy", - # "index": None, - # "partition_cols": None, - # "storage_options": None, + # zstd typically provides a higher compression ratio than snappy, + # and the cpu time is similar + "compression": "zstd", } self._load_options: dict[str, Any] = { - # pyarrow (8.0.0, 9.0.0) may be affected by a memory leak, - # and it's slower than fastparquet when reading dataframes with columns - # containing lists encoded using the Dremel encoding. - # See https://issues.apache.org/jira/browse/ARROW-17399 - # However, using a different engine for writing and reading may be less safe. "engine": "pyarrow", - # "engine": "fastparquet", - # "columns": None, - # "storage_options": None, - # "use_nullable_dtypes": False, } @property @@ -44,20 +86,29 @@ def extension(self) -> str: """Return the file extension to be used with this specific data store.""" return "parquet" - def dump(self, df: pd.DataFrame, name: str) -> None: + def dump(self, df: pd.DataFrame, name: str, **kwargs) -> None: """Save a dataframe to file, using the given name and the class extension.""" path = self.path(name) # Unless the parameter "index" is explicitly enforced, ensure that RangeIndex # is converted to Int64Index in MultiIndexes with Pandas >= 1.5.0. # See https://github.com/apache/arrow/issues/33030 index = True if isinstance(df.index, pd.MultiIndex) else None - with timed(L.debug, f"Writing {name} to {path}"): - df.to_parquet(path=path, **{"index": index, **self._dump_options}) + with timed(L.debug, f"Writing {name or 'files'} to {path}"): + df.to_parquet(path=path, **{"index": index, **self._dump_options, **kwargs}) + + def load(self, name: str = "", **kwargs) -> Optional[pd.DataFrame]: + """Load a dataframe from file, using the given name and the class extension. - def load(self, name: str) -> Optional[pd.DataFrame]: - """Load a dataframe from file, using the given name and the class extension.""" + If name is empty, then consider and load all the files in the directory. + """ path = self.path(name) if not path.exists(): return None - with timed(L.debug, f"Reading {name} from {path}"): - return pd.read_parquet(path=path, **self._load_options) + if "schema" in kwargs: + schema = kwargs.pop("schema") + elif path.is_dir(): + schema = _get_unified_schema(path) + else: + schema = None + with timed(L.debug, f"Reading {name or 'files'} from {path}"): + return pd.read_parquet(path=path, **{"schema": schema, **self._load_options, **kwargs}) diff --git a/src/blueetl/utils.py b/src/blueetl/utils.py index 118ad4e..6529ac2 100644 --- a/src/blueetl/utils.py +++ b/src/blueetl/utils.py @@ -19,6 +19,8 @@ from blueetl.constants import DTYPES from blueetl.types import StrOrPath +L = logging.getLogger(__name__) + class CachedPropertyMixIn: """MixIn to be used with classes using cached_property to be skipped when pickled.""" @@ -116,11 +118,9 @@ def ensure_dtypes( df: original Pandas DataFrame. desired_dtypes: dict of names and desired dtypes. If None, the predefined dtypes are used. If the dict contains names not present in the columns or in the index, they are ignored. - In the index, any (u)int16 or (u)int32 dtype are considered as (u)int64, - since Pandas doesn't have a corresponding Index type for them. Returns: - A new DataFrame with the desired dtypes, or the same DataFrame if the columns are unchanged. + The same DataFrame is modified and returned, if it's possible to avoid making a copy. """ if desired_dtypes is None: desired_dtypes = DTYPES @@ -334,3 +334,15 @@ def copy_config(src: StrOrPath, dst: StrOrPath) -> None: config = load_yaml(src) config["simulation_campaign"] = resolve_path(src.parent, config["simulation_campaign"]) dump_yaml(dst, config, default_flow_style=None) + + +def get_shmdir() -> Optional[Path]: + """Return the shared memory directory, or None if not set.""" + shmdir = os.getenv("SHMDIR") + if not shmdir: + L.warning("SHMDIR should be set to the shared memory directory") + return None + shmdir = Path(shmdir) + if not shmdir.is_dir(): + raise RuntimeError("SHMDIR must be set to an existing directory") + return shmdir diff --git a/tests/unit/store/test_parquet.py b/tests/unit/store/test_parquet.py index 90058d1..0ef4979 100644 --- a/tests/unit/store/test_parquet.py +++ b/tests/unit/store/test_parquet.py @@ -1,5 +1,7 @@ +import pandas as pd import pytest from pandas.testing import assert_frame_equal +from pyarrow import ArrowNotImplementedError from blueetl.store import parquet as test_module @@ -10,56 +12,53 @@ "storable_df_with_unnamed_index", "storable_df_with_named_index", "storable_df_with_named_multiindex", - # fastparquet 0.8.1 fails to write DataFrames with MultiIndexes without names, - # but probably it's not a good idea to use them anyway. See the code at: - # https://github.com/dask/fastparquet/blob/34069fe2a41a7491e5b7b1f1b2cae9c41176f7b8/fastparquet/util.py#L140-L144 - # "storable_df_with_unnamed_multiindex", + "storable_df_with_unnamed_multiindex", ], ) +def test_dump_load_roundtrip(tmp_path, df, lazy_fixture): + df = lazy_fixture(df) + name = "myname" + + store = test_module.ParquetStore(tmp_path) + store.dump(df, name) + result = store.load(name) + + assert_frame_equal(result, df) + + @pytest.mark.parametrize( - "dump_options, load_options", + "df", [ - # test the configuration actually used - (None, None), - # test other possible configurations not used yet - ({"engine": "pyarrow", "index": None}, {"engine": "pyarrow"}), - ({"engine": "fastparquet", "index": None}, {"engine": "fastparquet"}), - pytest.param( - {"engine": "fastparquet", "index": True}, - {"engine": "fastparquet"}, - marks=pytest.mark.xfail( - reason="Fails because index names are different ('index', None)", - raises=AssertionError, - ), - ), - pytest.param( - {"engine": "pyarrow"}, - {"engine": "fastparquet"}, - marks=pytest.mark.xfail( - reason="Fails because column e is loaded as float64 instead of object", - raises=AssertionError, - ), - ), - pytest.param( - {"engine": "fastparquet"}, - {"engine": "pyarrow"}, - marks=pytest.mark.xfail( - reason="Fails because column h is loaded as bytes instead of object", - raises=AssertionError, - ), - ), + "storable_df_with_unnamed_index", + "storable_df_with_named_index", + "storable_df_with_named_multiindex", + "storable_df_with_unnamed_multiindex", ], ) -def test_dump_load_roundtrip(tmp_path, df, dump_options, load_options, lazy_fixture): +def test_dump_load_roundtrip_with_inferred_schema(tmp_path, df, lazy_fixture): df = lazy_fixture(df) - name = "myname" - store = test_module.ParquetStore(tmp_path) - if dump_options is not None: - store._dump_options = dump_options - if load_options is not None: - store._load_options = load_options + df1 = df.copy() + df2 = df.copy() + df1 = df1.etl.add_conditions(conditions=["extra_level"], values=[1]) + df2 = df2.etl.add_conditions(conditions=["extra_level"], values=[2]) - store.dump(df, name) - result = store.load(name) + df1.insert(loc=0, column="extra_columns", value=[[] for _ in range(len(df1))]) + df2.insert(loc=0, column="extra_columns", value=[[float(i)] for i in range(len(df2))]) + + df = pd.concat([df1, df2]) + + store = test_module.ParquetStore(tmp_path) + store.dump(df1, name="01") + store.dump(df2, name="02") + result = store.load() assert_frame_equal(result, df) + + # in pyarrow 15.0.0, the empty lists in the first DataFrame would cause: + # + # pyarrow.lib.ArrowNotImplementedError: + # Unsupported cast from double to null using function cast_null + # + # if the schema is not inferred correctly from both the files + with pytest.raises(ArrowNotImplementedError): + store.load(schema=None) diff --git a/tests/unit/test_features.py b/tests/unit/test_features.py index 95652c8..8685c9c 100644 --- a/tests/unit/test_features.py +++ b/tests/unit/test_features.py @@ -111,7 +111,7 @@ def test_calculate_features(repo): ), ] - result = test_module.calculate_features(repo, features_configs_key, features_configs_list) + result = test_module._calculate_features(repo, features_configs_key, features_configs_list) assert isinstance(result, list) assert len(result) == 1 diff --git a/tests/unit/test_parallel.py b/tests/unit/test_parallel.py index cacbc82..fbf27f0 100644 --- a/tests/unit/test_parallel.py +++ b/tests/unit/test_parallel.py @@ -91,7 +91,7 @@ def test_merge_groupby(): df_list = list(all_dataframes.values()) groupby = ["simulation_id", "circuit_id", "neuron_class"] - result = test_module.merge_groupby(df_list, groupby=groupby, parallel=False) + result = test_module.merge_groupby(df_list, groupby=groupby) expected = merge_groupby_classic(df_list, groupby=groupby) for result_item, expected_item in itertools.zip_longest(result, expected): diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index c59cb13..35ea3db 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -278,3 +278,17 @@ def test_copy_config(tmp_path): assert src_sim_campaign_path.is_absolute() is False assert dst_sim_campaign_path.is_absolute() is True assert (src.parent / src_sim_campaign_path).resolve() == dst_sim_campaign_path.resolve() + + +def test_get_shmdir(monkeypatch, tmp_path): + monkeypatch.setenv("SHMDIR", str(tmp_path)) + shmdir = test_module.get_shmdir() + assert shmdir == tmp_path + + monkeypatch.delenv("SHMDIR") + shmdir = test_module.get_shmdir() + assert shmdir is None + + monkeypatch.setenv("SHMDIR", str(tmp_path / "non-existent")) + with pytest.raises(RuntimeError, match="SHMDIR must be set to an existing directory"): + test_module.get_shmdir() diff --git a/tox.ini b/tox.ini index a9b6c9e..e003823 100644 --- a/tox.ini +++ b/tox.ini @@ -19,6 +19,9 @@ minversion = 4 setenv = # Run serially BLUEETL_JOBLIB_JOBS=1 +passenv = + SHMDIR + TMPDIR extras = all deps =