Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
lazyframe
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 28, 2024
1 parent 44ae873 commit 62d113e
Show file tree
Hide file tree
Showing 13 changed files with 1,752 additions and 1,046 deletions.
17 changes: 11 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ concurrency:
on:
workflow_dispatch:
push:
branches:
- main
- master
pull_request:
branches:
- main
- master
release:
types:
- created
Expand All @@ -31,13 +38,12 @@ jobs:
- "3.9"
- "3.8"
dagster_version:
- "1.4.0"
- "1.5.0"
- "1.5.1"
- "1.6.0"
polars_version:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20.6"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v2
Expand Down Expand Up @@ -76,13 +82,12 @@ jobs:
- "3.9"
- "3.8"
dagster_version:
- "1.4.0"
- "1.5.0"
- "1.5.1"
- "1.6.0"
polars_version:
- "0.17.0"
- "0.18.0"
- "0.19.0"
- "0.20.6"
steps:
- name: Setup python for test ${{ matrix.py }}
uses: actions/setup-python@v2
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ Complete description of `dagster_polars` behavior for all supported type annotat
| Type annotation | Behavior |
| --- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `DataFrame` | read/write DataFrame. Raise error if it's not found in storage. |
| `LazyFrame` | read LazyFrame. Raise error if it's not found in storage. |
| `LazyFrame` | read/write LazyFrame. Raise error if it's not found in storage. |
| `Optional[DataFrame]` | read/write DataFrame. Skip if it's not found in storage or the output is `None`. |
| `Optional[LazyFrame]` | read LazyFrame. Skip if it's not found in storage |
| `Optional[LazyFrame]` | read/write LazyFrame. Skip if it's not found in storage |
| `DataFrameWithMetadata` | read/write DataFrame and metadata. Raise error if it's not found in storage. |
| `LazyFrameWithMetadata` | read LazyFrame and metadata. Raise error if it's not found in storage. |
| `LazyFrameWithMetadata` | read/write LazyFrame and metadata. Raise error if it's not found in storage. |
| `Optional[DataFrameWithMetadata]` | read/write DataFrame and metadata. Skip if it's not found in storage or the output is `None`. |
| `Optional[LazyFrameWithMetadata]` | read LazyFrame and metadata. Skip if it's not found in storage. |
| `Optional[LazyFrameWithMetadata]` | read/write LazyFrame and metadata. Skip if it's not found in storage. |
| `DataFramePartitions` | read multiple DataFrames as `Dict[str, DataFrame]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
| `LazyFramePartitions` | read multiple LazyFrames as `Dict[str, LazyFrame]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
| `DataFramePartitionsWithMetadata` | read multiple DataFrames and metadata as `Dict[str, Tuple[DataFrame, StorageMetadata]]`. Raise an error if any of thems is not found in storage, unless `"allow_missing_partitions"` input metadata is set to `True` |
Expand Down
102 changes: 72 additions & 30 deletions dagster_polars/io_managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import polars as pl
from dagster import (
ConfigurableIOManager,
EnvVar,
InitResourceContext,
InputContext,
MetadataValue,
Expand All @@ -27,9 +28,9 @@
from dagster import (
_check as check,
)
from dagster._annotations import experimental
from dagster._core.storage.upath_io_manager import is_dict_type
from pydantic.fields import Field, PrivateAttr
from pydantic import PrivateAttr
from pydantic.fields import Field

from dagster_polars.io_managers.utils import get_polars_metadata
from dagster_polars.types import (
Expand Down Expand Up @@ -129,13 +130,22 @@ def annotation_for_storage_metadata(annotation) -> bool:
return annotation_is_tuple_with_metadata(annotation)


@experimental
def _process_env_vars(config: Mapping[str, Any]) -> Dict[str, Any]:
out = {}
for key, value in config.items():
if isinstance(value, dict) and len(value) == 1 and next(iter(value.keys())) == "env":
out[key] = EnvVar(next(iter(value.values()))).get_value()
else:
out[key] = value
return out


class BasePolarsUPathIOManager(ConfigurableIOManager, UPathIOManager):
"""Base class for `dagster-polars` IOManagers.
Doesn't define a specific storage format.
To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the `dump_df_to_path` and `scan_df_from_path` methods.
To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the `write_to_path` and `scan_from_path` methods.
Features:
- All the features of :py:class:`~dagster.UPathIOManager` - works with local and remote filesystems (like S3), supports loading multiple partitions with respect to :py:class:`~dagster.PartitionMapping`, and more
Expand All @@ -146,20 +156,23 @@ class BasePolarsUPathIOManager(ConfigurableIOManager, UPathIOManager):
"""

base_dir: Optional[str] = Field(default=None, description="Base directory for storing files.")

_base_path: "UPath" = PrivateAttr()
cloud_storage_options: Optional[Mapping[str, Any]] = Field(
default=None, description="Storage authentication for cloud object store", alias="storage_options"
)
_base_path = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
from upath import UPath

sp = _process_env_vars(self.cloud_storage_options) if self.cloud_storage_options is not None else {}
self._base_path = (
UPath(self.base_dir)
UPath(self.base_dir, **sp)
if self.base_dir is not None
else UPath(check.not_none(context.instance).storage_directory())
)

@abstractmethod
def dump_df_to_path(
def write_df_to_path(
self,
context: OutputContext,
df: pl.DataFrame,
Expand All @@ -168,6 +181,16 @@ def dump_df_to_path(
):
...

@abstractmethod
def sink_df_to_path(
self,
context: OutputContext,
df: pl.LazyFrame,
path: "UPath",
metadata: Optional[StorageMetadata] = None,
):
...

@overload
@abstractmethod
def scan_df_from_path(
Expand Down Expand Up @@ -225,25 +248,53 @@ def load_input(self, context: InputContext) -> Union[Any, Dict[str, Any]]:
def dump_to_path(
self,
context: OutputContext,
obj: Union[pl.DataFrame, Optional[pl.DataFrame], Tuple[pl.DataFrame, Dict[str, Any]]],
obj: Union[
pl.DataFrame,
Optional[pl.DataFrame],
Tuple[pl.DataFrame, Dict[str, Any]],
pl.LazyFrame,
Optional[pl.LazyFrame],
Tuple[pl.LazyFrame, Dict[str, Any]],
],
path: "UPath",
partition_key: Optional[str] = None,
):
if annotation_is_typing_optional(context.dagster_type.typing_type) and (
obj is None or annotation_for_storage_metadata(context.dagster_type.typing_type) and obj[0] is None
typing_type = context.dagster_type.typing_type

if annotation_is_typing_optional(typing_type) and (
obj is None or annotation_for_storage_metadata(typing_type) and obj[0] is None
):
context.log.warning(self.get_optional_output_none_log_message(context, path))
return
else:
assert obj is not None, "output should not be None if it's type is not Optional"
if not annotation_for_storage_metadata(context.dagster_type.typing_type):
obj = cast(pl.DataFrame, obj)
df = obj
self.dump_df_to_path(context=context, df=df, path=path)
if not annotation_for_storage_metadata(typing_type):
if typing_type == pl.DataFrame:
obj = cast(pl.DataFrame, obj)
df = obj
self.write_df_to_path(context=context, df=df, path=path)
elif typing_type == pl.LazyFrame:
obj = cast(pl.LazyFrame, obj)
df = obj
self.sink_df_to_path(context=context, df=df, path=path)
else:
raise NotImplementedError
else:
obj = cast(Tuple[pl.DataFrame, Dict[str, Any]], obj)
df, metadata = obj
self.dump_df_to_path(context=context, df=df, path=path, metadata=metadata)
if not annotation_is_typing_optional(typing_type):
frame_type = get_args(typing_type)[0]
else:
frame_type = get_args(get_args(typing_type)[0])[0]

if frame_type == pl.DataFrame:
obj = cast(Tuple[pl.DataFrame, Dict[str, Any]], obj)
df, metadata = obj
self.write_df_to_path(context=context, df=df, path=path, metadata=metadata)
elif frame_type == pl.LazyFrame:
obj = cast(Tuple[pl.LazyFrame, Dict[str, Any]], obj)
df, metadata = obj
self.sink_df_to_path(context=context, df=df, path=path, metadata=metadata)
else:
raise NotImplementedError

def load_from_path(
self, context: InputContext, path: "UPath", partition_key: Optional[str] = None
Expand Down Expand Up @@ -306,7 +357,9 @@ def load_from_path(
else:
raise NotImplementedError(f"Can't load object for type annotation {context.dagster_type.typing_type}")

def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, MetadataValue]:
def get_metadata(
self, context: OutputContext, obj: Union[pl.DataFrame, pl.LazyFrame, None]
) -> Dict[str, MetadataValue]:
if obj is None:
return {"missing": MetadataValue.bool(True)}
else:
Expand All @@ -316,17 +369,6 @@ def get_metadata(self, context: OutputContext, obj: pl.DataFrame) -> Dict[str, M
df = obj
return get_polars_metadata(context, df) if df is not None else {"missing": MetadataValue.bool(True)}

@staticmethod
def get_storage_options(path: "UPath") -> dict:
storage_options = {}

try:
storage_options.update(path.storage_options.copy())
except AttributeError:
pass

return storage_options

def get_path_for_partition(
self, context: Union[InputContext, OutputContext], path: "UPath", partition: str
) -> "UPath":
Expand Down
Loading

0 comments on commit 62d113e

Please sign in to comment.