Skip to content

Commit

Permalink
Merge pull request #121 from ihmeuw-msca/feat/pandas-by-default
Browse files Browse the repository at this point in the history
Rework: Changing polars to pandas
  • Loading branch information
blsmxiu47 authored Dec 23, 2024
2 parents c477814 + 8eb0364 commit de40176
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 91 deletions.
46 changes: 30 additions & 16 deletions src/onemod/fsutils/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,55 @@ def load(
self,
path: Path,
return_type: Literal[
"polars_dataframe", "polars_lazyframe", "pandas_dataframe"
] = "polars_dataframe",
"pandas_dataframe", "polars_dataframe", "polars_lazyframe"
] = "pandas_dataframe",
columns: list[str] | None = None,
id_subsets: dict[str, list] | None = None,
**options,
) -> pl.DataFrame | pl.LazyFrame | pd.DataFrame:
) -> pd.DataFrame | pl.DataFrame | pl.LazyFrame:
"""Load data with lazy loading and subset filtering. Polars and
Pandas options available for the type of the returned data object."""

if path.suffix not in self.io_dict:
raise ValueError(f"Unsupported data format for '{path.suffix}'")

polars_lf = self.io_dict[path.suffix].load_lazy(path, **options)
if return_type == "pandas_dataframe":
pandas_df = self.io_dict[path.suffix].load_eager(path, **options)
assert isinstance(
pandas_df, pd.DataFrame
), "Expected a pandas DataFrame"

if columns:
polars_lf = polars_lf.select(columns)
if columns:
pandas_df = pandas_df[columns]

if id_subsets:
for col, values in id_subsets.items():
polars_lf = polars_lf.filter(pl.col(col).is_in(values))
if id_subsets:
for col, values in id_subsets.items():
pandas_df = pandas_df[pandas_df[col].isin(values)]
pandas_df.reset_index(drop=True, inplace=True)

if return_type == "polars_dataframe":
return polars_lf.collect()
elif return_type == "polars_lazyframe":
return polars_lf
elif return_type == "pandas_dataframe":
return polars_lf.collect().to_pandas()
return pandas_df
elif return_type in ["polars_dataframe", "polars_lazyframe"]:
polars_lf = self.io_dict[path.suffix].load_lazy(path, **options)

if columns:
polars_lf = polars_lf.select(columns)

if id_subsets:
for col, values in id_subsets.items():
polars_lf = polars_lf.filter(pl.col(col).is_in(values))

if return_type == "polars_dataframe":
return polars_lf.collect()
elif return_type == "polars_lazyframe":
return polars_lf
else:
raise ValueError(
"Return type must be one of 'polars_dataframe', 'polars_lazyframe', or 'pandas_dataframe'"
)

def dump(
self,
obj: pl.DataFrame | pl.LazyFrame | pd.DataFrame,
obj: pd.DataFrame | pl.DataFrame | pl.LazyFrame,
path: Path,
**options,
) -> None:
Expand Down
8 changes: 4 additions & 4 deletions src/onemod/fsutils/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def load(
*fparts: str,
key: str,
return_type: Literal[
"polars_dataframe", "polars_lazyframe", "pandas_dataframe"
] = "polars_dataframe",
"pandas_dataframe", "polars_dataframe", "polars_lazyframe"
] = "pandas_dataframe",
columns: list[str] | None = None,
id_subsets: dict[str, list] | None = None,
**options,
Expand All @@ -32,7 +32,7 @@ def load(
Parameters
----------
return_type : {'polars_dataframe', 'polars_lazyframe', 'pandas_dataframe'}, optional
return_type : {'pandas_dataframe', 'polars_dataframe', 'polars_lazyframe'}, optional
Return type of loaded data object, applicable only for data files.
columns : list of str, optional
Specific columns to load, applicable only for data files.
Expand Down Expand Up @@ -63,7 +63,7 @@ def load(
def dump(self, obj: Any, *fparts: str, key: str, **options) -> None:
"""Dump data or config files based on object type and key."""
path = self.get_full_path(*fparts, key=key)
if isinstance(obj, (pl.DataFrame, pl.LazyFrame, pd.DataFrame)):
if isinstance(obj, (pd.DataFrame, pl.DataFrame, pl.LazyFrame)):
return self.data_loader.dump(obj, path, **options)
else:
return self.config_loader.dump(obj, path, **options)
6 changes: 3 additions & 3 deletions src/onemod/fsutils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ class DataIO(ABC):
"""Bridge class that unifies the I/O for different data file types."""

fextns: tuple[str, ...] = ("",)
dtypes: tuple[Type, ...] = (pl.DataFrame, pl.LazyFrame, pd.DataFrame)
dtypes: tuple[Type, ...] = (pd.DataFrame, pl.DataFrame, pl.LazyFrame)

def load_eager(
self,
fpath: Path | str,
backend: Literal["polars", "pandas"] = "polars",
backend: Literal["pandas", "polars"] = "pandas",
**options,
) -> pl.DataFrame | pd.DataFrame:
) -> pd.DataFrame | pl.DataFrame:
"""Load data from given path."""
fpath = Path(fpath)
if fpath.suffix not in self.fextns:
Expand Down
13 changes: 8 additions & 5 deletions src/onemod/stage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pathlib import Path
from typing import Any, Literal

from polars import DataFrame
from pandas import DataFrame
from pydantic import BaseModel, ConfigDict, Field, validate_call

import onemod.stage as onemod_stages
Expand Down Expand Up @@ -474,14 +474,14 @@ def create_stage_subsets(
f"{self.name} does not have a groupby attribute"
)

lf = self.dataif.load(
df = self.dataif.load(
key=data_key,
columns=list(self.groupby),
id_subsets=id_subsets,
return_type="polars_lazyframe",
return_type="pandas_dataframe",
)

subsets_df = create_subsets(self.groupby, lf.collect().to_pandas())
subsets_df = create_subsets(self.groupby, df)
self._subset_ids = set(subsets_df["subset_id"].to_list())

self.dataif.dump(subsets_df, "subsets.csv", key="output")
Expand All @@ -500,7 +500,10 @@ def create_stage_params(self) -> None:
"""Create stage parameter sets from config."""
params = create_params(self.config)
if params is not None:
self._crossby = set(params.drop("param_id").columns)
if "param_id" not in params.columns:
raise KeyError("Parameter set ID column 'param_id' not found")

self._crossby = set(params.columns) - {"param_id"}
self._param_ids = set(params["param_id"])
self.dataif.dump(params, "parameters.csv", key="output")

Expand Down
7 changes: 3 additions & 4 deletions src/onemod/stage/model_stages/rover_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import warnings

import pandas as pd
import polars as pl
from loguru import logger
from modrover.api import Rover

Expand Down Expand Up @@ -52,8 +51,8 @@ def fit(self, subset_id: int, *args, **kwargs) -> None:
"""
# Load data and filter by subset
logger.info(f"Loading {self.name} data subset {subset_id}")
data = self.get_stage_subset(subset_id).filter(
pl.col(self.config["test_column"]) == 0
data = self.get_stage_subset(subset_id).query(
f"{self.config['test_column']} == 0"
)

if len(data) > 0:
Expand All @@ -71,7 +70,7 @@ def fit(self, subset_id: int, *args, **kwargs) -> None:

# Fit submodel
submodel.fit(
data=data.to_pandas(),
data=data,
strategies=list(self.config.strategies),
top_pct_score=self.config.top_pct_score,
top_pct_learner=self.config.top_pct_learner,
Expand Down
2 changes: 1 addition & 1 deletion src/onemod/stage/model_stages/spxmod_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def _get_submodel_data(
"""Load submodel data."""
# Load data and filter by subset
logger.info(f"Loading {self.name} data subset {subset_id}")
data = self.get_stage_subset(subset_id).to_pandas()
data = self.get_stage_subset(subset_id)

# Add spline basis to data
spline_vars = []
Expand Down
26 changes: 13 additions & 13 deletions src/onemod/utils/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@
from itertools import product
from typing import Any

import polars as pl
from pandas import DataFrame

from onemod.config import ModelConfig


def create_params(config: ModelConfig) -> pl.DataFrame | None:
def create_params(config: ModelConfig) -> DataFrame | None:
"""Create parameter sets from crossby."""
param_dict = {
param_name: param_values
for param_name in config.crossable_params
if isinstance(param_values := config[param_name], set)
}
if len(param_dict) == 0:
if not param_dict:
return None

crossby = list(param_dict.keys())
params = pl.DataFrame(
[list(param_set) for param_set in product(*param_dict.values())],
schema=crossby,
orient="row",
params = DataFrame(
list(product(*param_dict.values())), columns=list(param_dict.keys())
)
params["param_id"] = params.index

params = params.with_row_index(name="param_id")
return params.select(["param_id", *crossby])
return params[["param_id", *param_dict.keys()]]


def get_params(params: pl.DataFrame, param_id: int) -> dict[str, Any]:
params = params.filter(pl.col("param_id") == param_id).drop("param_id")
return {str(col): params[col][0] for col in params.columns}
def get_params(params: DataFrame, param_id: int) -> dict[str, Any]:
params = params.query("param_id == @param_id").drop(columns=["param_id"])
return {
str(param_name): param_value.item()
for param_name, param_value in params.items()
}
29 changes: 14 additions & 15 deletions src/onemod/utils/subsets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Functions for working with groupby and subsets."""

import pandas as pd
import polars as pl


def create_subsets(groupby: set[str], data: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -17,11 +16,11 @@ def create_subsets(groupby: set[str], data: pd.DataFrame) -> pd.DataFrame:


def get_subset(
data: pl.DataFrame,
subsets: pl.DataFrame,
data: pd.DataFrame,
subsets: pd.DataFrame,
subset_id: int,
id_names: list[str] | None = None,
) -> pl.DataFrame:
) -> pd.DataFrame:
"""Get data subset by subset_id."""
id_subsets = get_id_subsets(subsets, subset_id)
if id_names is not None:
Expand All @@ -31,21 +30,21 @@ def get_subset(
return filter_data(data, id_subsets)


def get_id_subsets(subsets: pl.DataFrame, subset_id: int) -> dict:
def get_id_subsets(subsets: pd.DataFrame, subset_id: int) -> dict:
"""Get ID names and values that define a data subset."""
return (
subsets.filter(pl.col("subset_id") == subset_id)
.drop("subset_id")
.to_dict(as_series=False)
subsets.query("subset_id == @subset_id")
.drop(columns=["subset_id"])
.to_dict(orient="list")
)


def filter_data(
data: pl.DataFrame, id_subsets: dict[str, set[int]]
) -> pl.DataFrame:
data: pd.DataFrame, id_subsets: dict[str, set[int]]
) -> pd.DataFrame:
"""Filter data by ID subsets."""
filter_expr = pl.lit(True)
for key, value in id_subsets.items():
filter_expr &= pl.col(key).is_in(value)

return data.filter(filter_expr)
return data.query(
" & ".join(
[f"{key}.isin({value})" for key, value in id_subsets.items()]
)
).reset_index(drop=True)
3 changes: 1 addition & 2 deletions tests/helpers/dummy_stages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import polars as pl
from pydantic import Field

from onemod.config import (
Expand Down Expand Up @@ -249,7 +248,7 @@ class MultiplyByTwoStage(ModelStage):
def run(self, subset_id: int, *args, **kwargs) -> None:
"""Run MultiplyByTwoStage."""
df = self.get_stage_subset(subset_id)
df = df.with_columns((pl.col("value") * 2).alias("value"))
df["value"] = df["value"] * 2
self.dataif.dump(df, "data.parquet", key="output")

def fit(self) -> None:
Expand Down
14 changes: 7 additions & 7 deletions tests/integration/test_integration_pipeline_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest.mock import patch

import polars as pl
import pandas as pd
import pytest
from tests.helpers.dummy_pipeline import get_expected_args, setup_dummy_pipeline
from tests.helpers.dummy_stages import MultiplyByTwoStage, assert_stage_logs
Expand Down Expand Up @@ -118,8 +118,8 @@ def test_missing_dependency_error(small_input_data, test_base_dir, method):
subset_stage_names = {"covariate_selection"}

with pytest.raises(
ValueError,
match="Required input to stage 'covariate_selection' is missing. Missing output from upstream dependency 'preprocessing'.",
FileNotFoundError,
match=f"Stage covariate_selection input items do not exist: {{'data': '{test_base_dir}/preprocessing/data.parquet'}}",
):
dummy_pipeline.evaluate(method=method, stages=subset_stage_names)

Expand Down Expand Up @@ -150,8 +150,8 @@ def test_invalid_id_subsets_keys(small_input_data, test_base_dir, method):
def test_evaluate_with_id_subsets(test_base_dir, sample_data):
"""Test that Pipeline.evaluate() correctly evaluates single stage with id_subsets."""
sample_input_data = test_base_dir / "test_input_data.parquet"
df = pl.DataFrame(sample_data)
df.write_parquet(sample_input_data)
df = pd.DataFrame(sample_data)
df.to_parquet(sample_input_data)

test_pipeline = Pipeline(
name="dummy_pipeline",
Expand All @@ -171,14 +171,14 @@ def test_evaluate_with_id_subsets(test_base_dir, sample_data):

# Ensure input data is as expected for the test
assert sample_input_data.exists()
input_df = pl.read_parquet(sample_input_data)
input_df = pd.read_parquet(sample_input_data)
assert input_df.shape == (4, 4)

test_pipeline.evaluate(method="run", id_subsets={"age_group_id": [1]})

# Verify that output only contains rows with specified subset(s) for age_group_id
output_df = test_stage.dataif.load("data.parquet", key="output")
assert output_df.select(pl.col("age_group_id")).n_unique() == 1
assert output_df["age_group_id"].nunique() == 1
assert output_df.shape == (1, 4)


Expand Down
Loading

0 comments on commit de40176

Please sign in to comment.