Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: (not recommended) Track engine and storage format at DataFrame/Series level #7425

Draft
wants to merge 1 commit into
base: hybrid-execution
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class DataFrameVariable(EnvironmentVariable, type=str):
def get(cls) -> Any:
return super().get()

class Engine(DataFrameVariable, type=str):
class Engine(EnvironmentVariable, type=str):
"""Distribution engine to run queries by."""

varname = "MODIN_ENGINE"
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,7 @@ def astype(self, col_dtypes, errors: str = "raise"):
new_dtypes = self_dtypes.copy()
# Update the new dtype series to the proper pandas dtype
new_dtype = pandas.api.types.pandas_dtype(dtype)
if self._getEngineConfig().get() == "Dask" and hasattr(dtype, "_is_materialized"):
if self._engine == "Dask" and hasattr(dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = dtype._materialize_categories()

Expand Down Expand Up @@ -1736,7 +1736,7 @@ def astype_builder(df):
if not (col_dtypes == self_dtypes).all():
new_dtypes = self_dtypes.copy()
new_dtype = pandas.api.types.pandas_dtype(col_dtypes)
if self._getEngineConfig().get() == "Dask" and hasattr(new_dtype, "_is_materialized"):
if self._engine == "Dask" and hasattr(new_dtype, "_is_materialized"):
# FIXME: https://github.com/dask/distributed/issues/8585
_ = new_dtype._materialize_categories()
if isinstance(new_dtype, pandas.CategoricalDtype):
Expand Down
6 changes: 6 additions & 0 deletions modin/numpy/arr.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class array(object):
provide functionality.
"""

# TODO(hybrid-execution): save the possible engine / storage format values
# to a constant and restrict the types of _engine and _storage_format to
# those values
_engine: str
_storage_format: str

def __init__(
self,
object=None,
Expand Down
42 changes: 25 additions & 17 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pickle as pkl
import re
import warnings
from functools import cached_property
from functools import cached_property, wraps
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -193,6 +193,17 @@ def _get_repr_axis_label_indexer(labels, num_for_repr):
[] if back_repr_num == 0 else list(all_positions[-back_repr_num:])
)

def _ensure_engine_and_storage_format_set_post_init(init):
@wraps(init)
def wrapped_init(self, *args, **kwargs):
init(self, *args, **kwargs)
assert self._engine is not None, (
f"Internal error: must initialize {type(self)} with engine"
)
assert self._engine is not None, (
f"Internal error: must initialize {type(self)} with storage format"
)
return wrapped_init

@_inherit_docstrings(pandas.DataFrame, apilink=["pandas.DataFrame", "pandas.Series"])
class BasePandasDataset(ClassLogger):
Expand All @@ -209,20 +220,15 @@ class BasePandasDataset(ClassLogger):
_pandas_class = pandas.core.generic.NDFrame
_query_compiler: BaseQueryCompiler
_siblings: list[BasePandasDataset]
_engine_override: Engine = None
_storage_override: StorageFormat = None
# TODO(hybrid-execution): save the possible engine / storage format values
# to a constant and restrict the types of _engine and _storage_format to
# those values
_engine: str = None
_storage_format: str = None

def _getEngineConfig(self) -> Engine:
if self._engine_override is not None:
return self._engine_override
else:
return Engine

def _getStorageConfig(self) -> Engine:
if self._storage_override is not None:
return self._storage_override
else:
return StorageFormat
engine = property(lambda self: self._engine)

storage_format = property(lambda self: self._storage_format)

@cached_property
def _is_dataframe(self) -> bool:
Expand All @@ -242,7 +248,7 @@ def _is_dataframe(self) -> bool:

@abc.abstractmethod
def _create_or_update_from_compiler(
self, new_query_compiler: BaseQueryCompiler, inplace: bool = False
self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False
) -> Self | None:
"""
Return or update a ``DataFrame`` or ``Series`` with given `new_query_compiler`.
Expand Down Expand Up @@ -317,7 +323,7 @@ def _build_repr_df(
indexer = row_indexer
return self.iloc[indexer]._query_compiler.to_pandas()

def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None:
def _update_inplace(self, new_query_compiler: BaseQueryCompiler, new_storage_format: str, new_engine: str) -> None:
"""
Update the current DataFrame inplace.

Expand All @@ -331,6 +337,8 @@ def _update_inplace(self, new_query_compiler: BaseQueryCompiler) -> None:
for sib in self._siblings:
sib._query_compiler = new_query_compiler
old_query_compiler.free()
self._storage_format = new_storage_format
self._new_engine = new_engine

def _validate_other(
self,
Expand Down Expand Up @@ -538,7 +546,7 @@ def _binary_op(self, op, other, **kwargs) -> Self:
if not self._is_dataframe and op in series_specialize_list:
op = "series_" + op
new_query_compiler = getattr(self._query_compiler, op)(other, **kwargs)
return self._create_or_update_from_compiler(new_query_compiler)
return self._create_or_update_from_compiler(new_query_compiler, self.engine, self.storage_format)

def _default_to_pandas(self, op, *args, reason: str = None, **kwargs):
"""
Expand Down
39 changes: 31 additions & 8 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from pandas.io.formats.info import DataFrameInfo
from pandas.util._validators import validate_bool_kwarg

from modin.config import PersistentPickle
from modin.config import PersistentPickle, StorageFormat, Engine
from modin.error_message import ErrorMessage
from modin.logging import disable_logging
from modin.pandas import Categorical
Expand All @@ -72,7 +72,7 @@
)

from .accessor import CachedAccessor, SparseFrameAccessor
from .base import _ATTRS_NO_LOOKUP, BasePandasDataset
from .base import _ATTRS_NO_LOOKUP, BasePandasDataset, _ensure_engine_and_storage_format_set_post_init
from .groupby import DataFrameGroupBy
from .iterator import PartitionIterator
from .series import Series
Expand Down Expand Up @@ -137,6 +137,7 @@ class DataFrame(BasePandasDataset):

_pandas_class = pandas.DataFrame

@_ensure_engine_and_storage_format_set_post_init
def __init__(
self,
data=None,
Expand All @@ -145,6 +146,8 @@ def __init__(
dtype=None,
copy=None,
query_compiler: BaseQueryCompiler = None,
engine: str = None,
storage_format: str = None
) -> None:
from modin.numpy import array

Expand All @@ -153,6 +156,8 @@ def __init__(
self._siblings = []
if isinstance(data, (DataFrame, Series)):
self._query_compiler = data._query_compiler.copy()
self._engine = data.engine
self._storage_format = data.storage_format
if index is not None and any(i not in data.index for i in index):
raise NotImplementedError(
"Passing non-existant columns or index values to constructor not"
Expand Down Expand Up @@ -185,22 +190,26 @@ def __init__(
self._query_compiler = data.loc[index, columns]._query_compiler
elif isinstance(data, array):
self._query_compiler = data._query_compiler.copy()
self._engine = data.engine
self._storage_format = data.storage_format
if copy is not None and not copy:
data._add_sibling(self)
if columns is not None and not isinstance(columns, pandas.Index):
columns = pandas.Index(columns)
if columns is not None:
obj_with_new_columns = self.set_axis(columns, axis=1, copy=False)
self._update_inplace(obj_with_new_columns._query_compiler)
self._update_inplace(obj_with_new_columns._query_compiler, obj_with_new_columns.engine, obj_with_new_columns.storage_format)
if index is not None:
obj_with_new_index = self.set_axis(index, axis=0, copy=False)
self._update_inplace(obj_with_new_index._query_compiler)
self._update_inplace(obj_with_new_index._query_compiler, obj_with_new_index.engine, obj_with_new_index.storage_format)
if dtype is not None:
casted_obj = self.astype(dtype, copy=False)
self._query_compiler = casted_obj._query_compiler
# Check type of data and use appropriate constructor
elif query_compiler is None:
distributed_frame = from_non_pandas(data, index, columns, dtype)
self._storage_format = StorageFormat.get()
self._engine = Engine.get()
if distributed_frame is not None:
self._query_compiler = distributed_frame._query_compiler
return
Expand All @@ -227,6 +236,11 @@ def __init__(
data = {key: value for key, value in data.items() if key in columns}

if len(data) and all(isinstance(v, Series) for v in data.values()):
if (
len(set(v.storage_format for v in data.values())) > 1 or
len(set(v.engine for v in data.values())) > 1
):
raise NotImplementedError('multiple executions in input data')
from .general import concat

new_qc = concat(
Expand Down Expand Up @@ -258,7 +272,16 @@ def __init__(
)
self._query_compiler = from_pandas(pandas_df)._query_compiler
else:
assert engine is not None, (
"When initializing dataframe with query compiler, must provide Engine"
)
assert storage_format is not None, (
"When initializing dataframe with query compiler, must provide Engine"
)
self._query_compiler = query_compiler
self._engine = engine
self._storage_format = storage_format


def __repr__(self) -> str:
"""
Expand Down Expand Up @@ -2641,7 +2664,7 @@ def __setattr__(self, key, value) -> None:
# __dict__
# - `_siblings`, which Modin initializes before it appears in __dict__
# before it appears in __dict__.
if key in ("_query_compiler", "_siblings") or key in self.__dict__:
if key in ("_query_compiler", "_siblings", "_engine", "_storage_format") or key in self.__dict__:
pass
# we have to check for the key in `dir(self)` first in order not to trigger columns computation
elif key not in dir(self) and key in self:
Expand Down Expand Up @@ -2983,7 +3006,7 @@ def reindex_like(
)

def _create_or_update_from_compiler(
self, new_query_compiler, inplace=False
self, new_query_compiler: BaseQueryCompiler, new_engine: str, new_storage_format: str, inplace: bool = False
) -> Union[DataFrame, None]:
"""
Return or update a ``DataFrame`` with given `new_query_compiler`.
Expand All @@ -3004,9 +3027,9 @@ def _create_or_update_from_compiler(
new_query_compiler, self._query_compiler.__class__.__bases__
), "Invalid Query Compiler object: {}".format(type(new_query_compiler))
if not inplace:
return self.__constructor__(query_compiler=new_query_compiler)
return self.__constructor__(query_compiler=new_query_compiler, engine=new_engine, storage_format=new_storage_format)
else:
self._update_inplace(new_query_compiler=new_query_compiler)
self._update_inplace(new_query_compiler=new_query_compiler, new_storage_format=new_storage_format, new_engine=new_engine)

def _get_numeric_data(self, axis: int) -> DataFrame:
"""
Expand Down
20 changes: 12 additions & 8 deletions modin/pandas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ def _get_pandas_object_from_qc_view(
common ``Indexer`` object or range and ``np.ndarray`` only.
"""
if ndim == 2:
return self.df.__constructor__(query_compiler=qc_view)
return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)
if isinstance(self.df, Series) and not row_scalar:
return self.df.__constructor__(query_compiler=qc_view)
return self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)

if isinstance(self.df, Series):
axis = 0
Expand All @@ -400,7 +400,7 @@ def _get_pandas_object_from_qc_view(
else 1 if col_scalar or col_multiindex_full_lookup else 0
)

res_df = self.df.__constructor__(query_compiler=qc_view)
res_df = self.df.__constructor__(query_compiler=qc_view, engine=self.df.engine, storage_format=self.df.storage_format)
return res_df.squeeze(axis=axis)

def _setitem_positional(self, row_lookup, col_lookup, item, axis=None):
Expand Down Expand Up @@ -573,7 +573,8 @@ def _handle_boolean_masking(self, row_loc, col_loc):
extra_log=f"Only ``modin.pandas.Series`` boolean masks are acceptable, got: {type(row_loc)}",
)
masked_df = self.df.__constructor__(
query_compiler=self.qc.getitem_array(row_loc._query_compiler)
query_compiler=self.qc.getitem_array(row_loc._query_compiler),
engine=self.df.engine, storage_format=self.df.storage_format
)
if isinstance(masked_df, Series):
assert col_loc == slice(None)
Expand Down Expand Up @@ -789,8 +790,10 @@ def _loc(df):
df.loc[key] = item
return df

result_df = self.df._default_to_pandas(_loc)
self.df._update_inplace(
new_query_compiler=self.df._default_to_pandas(_loc)._query_compiler
new_query_compiler=result_df._query_compiler,
engine=result_df.engine, storage_format=result_df.storage_format
)
return
row_loc, col_loc, ndims = self._parse_row_and_column_locators(key)
Expand All @@ -807,7 +810,7 @@ def _loc(df):
if is_scalar(row_loc) or len(row_loc) == 1:
index = self.qc.index.insert(len(self.qc.index), row_loc)
self.qc = self.qc.reindex(labels=index, axis=0, fill_value=0)
self.df._update_inplace(new_query_compiler=self.qc)
self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format )
self._set_item_existing_loc(row_loc, col_loc, item)
else:
self._set_item_existing_loc(row_loc, col_loc, item)
Expand Down Expand Up @@ -845,7 +848,7 @@ def _setitem_with_new_columns(self, row_loc, col_loc, item):
if not common_label_loc[i]:
columns = columns.insert(len(columns), col_loc[i])
self.qc = self.qc.reindex(labels=columns, axis=1, fill_value=np.nan)
self.df._update_inplace(new_query_compiler=self.qc)
self.df._update_inplace(new_query_compiler=self.qc, engine=self.df.engine, storage_format=self.df.storage_format )
self._set_item_existing_loc(row_loc, np.array(col_loc), item)

def _set_item_existing_loc(self, row_loc, col_loc, item):
Expand Down Expand Up @@ -1061,7 +1064,8 @@ def _iloc(df):
return df

self.df._update_inplace(
new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler
new_query_compiler=self.df._default_to_pandas(_iloc)._query_compiler,
engine=self.df.engine, storage_format=self.df.storage_format
)
return
row_loc, col_loc, _ = self._parse_row_and_column_locators(key)
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from pandas.io.parsers import TextFileReader
from pandas.io.parsers.readers import _c_parser_defaults

from modin.config import ModinNumpy
from modin.config import ModinNumpy, StorageFormat, Engine
from modin.error_message import ErrorMessage
from modin.logging import ClassLogger, enable_logging
from modin.utils import (
Expand Down Expand Up @@ -991,7 +991,7 @@ def from_pandas(df) -> DataFrame:
"""
from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher

return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df))
return ModinObjects.DataFrame(query_compiler=FactoryDispatcher.from_pandas(df), engine=Engine.get(), storage_format=StorageFormat.get())


def from_arrow(at) -> DataFrame:
Expand Down
Loading
Loading