-
Notifications
You must be signed in to change notification settings - Fork 93
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(datasets): add dataset to load/save with Ibis (#560)
* feat(datasets): add dataset to load/save with Ibis Signed-off-by: Deepyaman Datta <[email protected]> * build(datasets): fix typos in definition of extras Signed-off-by: Deepyaman Datta <[email protected]> * build(datasets): include Ibis backend requirements Signed-off-by: Deepyaman Datta <[email protected]> * test(datasets): implement save and reload for Ibis Signed-off-by: Deepyaman Datta <[email protected]> * test(datasets): check `ibis.TableDataset.exists()` Signed-off-by: Deepyaman Datta <[email protected]> * test(datasets): test extra load and save args work Signed-off-by: Deepyaman Datta <[email protected]> * test(datasets): verify config and materializations Signed-off-by: Deepyaman Datta <[email protected]> --------- Signed-off-by: Deepyaman Datta <[email protected]>
- Loading branch information
Showing
6 changed files
with
370 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
"""Provide data loading and saving functionality for Ibis's backends.""" | ||
from typing import Any | ||
|
||
import lazy_loader as lazy | ||
|
||
# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 | ||
TableDataset: Any | ||
|
||
__getattr__, __dir__, __all__ = lazy.attach( | ||
__name__, submod_attrs={"table_dataset": ["TableDataset"]} | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
"""Provide data loading and saving functionality for Ibis's backends.""" | ||
from __future__ import annotations | ||
|
||
from copy import deepcopy | ||
from typing import TYPE_CHECKING, Any, ClassVar | ||
|
||
import ibis.expr.types as ir | ||
from kedro.io import AbstractDataset, DatasetError | ||
|
||
if TYPE_CHECKING: | ||
from ibis import BaseBackend | ||
|
||
|
||
class TableDataset(AbstractDataset[ir.Table, ir.Table]): | ||
"""``TableDataset`` loads/saves data from/to Ibis table expressions. | ||
Example usage for the | ||
`YAML API <https://kedro.readthedocs.io/en/stable/data/\ | ||
data_catalog_yaml_examples.html>`_: | ||
.. code-block:: yaml | ||
cars: | ||
type: ibis.TableDataset | ||
filepath: data/01_raw/company/cars.csv | ||
file_format: csv | ||
table_name: cars | ||
connection: | ||
backend: duckdb | ||
database: company.db | ||
load_args: | ||
sep: "," | ||
nullstr: "#NA" | ||
save_args: | ||
materialized: table | ||
motorbikes: | ||
type: ibis.TableDataset | ||
table_name: motorbikes | ||
connection: | ||
backend: duckdb | ||
database: company.db | ||
Example usage for the | ||
`Python API <https://kedro.readthedocs.io/en/stable/data/\ | ||
advanced_data_catalog_usage.html>`_: | ||
.. code-block:: pycon | ||
>>> import ibis | ||
>>> from kedro_datasets.ibis import TableDataset | ||
>>> | ||
>>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) | ||
>>> | ||
>>> dataset = TableDataset( | ||
... table_name="test", | ||
... connection={"backend": "duckdb", "database": tmp_path / "file.db"}, | ||
... save_args={"materialized": "table"}, | ||
... ) | ||
>>> dataset.save(data) | ||
>>> reloaded = dataset.load() | ||
>>> assert data.execute().equals(reloaded.execute()) | ||
""" | ||
|
||
DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} | ||
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = { | ||
"materialized": "view", | ||
"overwrite": True, | ||
} | ||
|
||
_connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {} | ||
|
||
def __init__( # noqa: PLR0913 | ||
self, | ||
*, | ||
filepath: str | None = None, | ||
file_format: str | None = None, | ||
table_name: str | None = None, | ||
connection: dict[str, Any] | None = None, | ||
load_args: dict[str, Any] | None = None, | ||
save_args: dict[str, Any] | None = None, | ||
) -> None: | ||
"""Creates a new ``TableDataset`` pointing to a table (or file). | ||
``TableDataset`` connects to the Ibis backend object constructed | ||
from the connection configuration. The `backend` key provided in | ||
the config can be any of the `supported backends <https://ibis-\ | ||
project.org/install>`_. The remaining dictionary entries will be | ||
passed as arguments to the underlying ``connect()`` method (e.g. | ||
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\ | ||
#ibis.duckdb.connect>`_). | ||
If ``filepath`` and ``file_format`` are given, the corresponding | ||
read method (e.g. `read_csv() <https://ibis-project.org/backends/\ | ||
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load | ||
the file with the backend. Note that only the data is loaded; no | ||
link to the underlying file exists past ``TableDataset.load()``. | ||
If ``table_name`` is given (and ``filepath`` isn't), the dataset | ||
establishes a connection to the relevant table for the execution | ||
backend. Therefore, Ibis doesn't fetch data on load; all compute | ||
is deferred until materialization, when the expression is saved. | ||
In practice, this happens when another ``TableDataset`` instance | ||
is saved, after running code defined across one more more nodes. | ||
Args: | ||
filepath: Path to a file to register as a table. Most useful | ||
for loading data into your data warehouse (for testing). | ||
file_format: Specifies the input file format for `filepath`. | ||
table_name: The name of the table or view to read or create. | ||
connection: Configuration for connecting to an Ibis backend. | ||
load_args: Additional arguments passed to the Ibis backend's | ||
`read_{file_format}` method. | ||
save_args: Additional arguments passed to the Ibis backend's | ||
`create_{materialized}` method. By default, ``ir.Table`` | ||
objects are materialized as views. To save a table using | ||
a different materialization strategy, supply a value for | ||
`materialized` in `save_args`. | ||
""" | ||
if filepath is None and table_name is None: | ||
raise DatasetError( | ||
"Must provide at least one of `filepath` or `table_name`." | ||
) | ||
|
||
self._filepath = filepath | ||
self._file_format = file_format | ||
self._table_name = table_name | ||
self._connection_config = connection | ||
|
||
# Set load and save arguments, overwriting defaults if provided. | ||
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) | ||
if load_args is not None: | ||
self._load_args.update(load_args) | ||
|
||
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) | ||
if save_args is not None: | ||
self._save_args.update(save_args) | ||
|
||
self._materialized = self._save_args.pop("materialized") | ||
|
||
@property | ||
def connection(self) -> BaseBackend: | ||
def hashable(value): | ||
if isinstance(value, dict): | ||
return tuple((k, hashable(v)) for k, v in sorted(value.items())) | ||
if isinstance(value, list): | ||
return tuple(hashable(x) for x in value) | ||
return value | ||
|
||
cls = type(self) | ||
key = hashable(self._connection_config) | ||
if key not in cls._connections: | ||
import ibis | ||
|
||
config = deepcopy(self._connection_config) | ||
backend = getattr(ibis, config.pop("backend")) | ||
cls._connections[key] = backend.connect(**config) | ||
|
||
return cls._connections[key] | ||
|
||
def _load(self) -> ir.Table: | ||
if self._filepath is not None: | ||
if self._file_format is None: | ||
raise NotImplementedError | ||
|
||
reader = getattr(self.connection, f"read_{self._file_format}") | ||
return reader(self._filepath, self._table_name, **self._load_args) | ||
else: | ||
return self.connection.table(self._table_name) | ||
|
||
def _save(self, data: ir.Table) -> None: | ||
if self._table_name is None: | ||
raise DatasetError("Must provide `table_name` for materialization.") | ||
|
||
writer = getattr(self.connection, f"create_{self._materialized}") | ||
writer(self._table_name, data, **self._save_args) | ||
|
||
def _describe(self) -> dict[str, Any]: | ||
return { | ||
"filepath": self._filepath, | ||
"file_format": self._file_format, | ||
"table_name": self._table_name, | ||
"connection_config": self._connection_config, | ||
"load_args": self._load_args, | ||
"save_args": self._save_args, | ||
"materialized": self._materialized, | ||
} | ||
|
||
def _exists(self) -> bool: | ||
return ( | ||
self._table_name is not None and self._table_name in self.connection.tables | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Oops, something went wrong.