Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): add dataset to load/save with Ibis #560

Merged
merged 19 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2a38d58
feat(datasets): add dataset to load/save with Ibis
deepyaman Feb 21, 2024
7dfc850
build(datasets): include Ibis backend requirements
deepyaman Mar 2, 2024
19377c4
test(datasets): implement save and reload for Ibis
deepyaman Mar 4, 2024
f5a6b73
test(datasets): check `ibis.TableDataset.exists()`
deepyaman Mar 4, 2024
c29e9e1
test(datasets): test extra load and save args work
deepyaman Mar 4, 2024
ee1dc38
test(datasets): verify config and materializations
deepyaman Mar 4, 2024
0c0bc6b
chore(datasets): updated & standardized RELEASE.md
deepyaman Mar 4, 2024
9b8d2e8
Merge branch 'main' into feat/datasets/ibis-table-dataset
deepyaman Apr 8, 2024
bbec217
test(datasets): don't require network for examples
deepyaman Apr 8, 2024
7b3ae33
docs(datasets): add Python, YAML examples for Ibis
deepyaman Apr 8, 2024
1fcb01a
fix(datasets): make connection config key hashable
deepyaman Apr 9, 2024
14ecc8c
docs(datasets): expand on how `TableDataset` works
deepyaman Apr 9, 2024
31a23ad
revert(datasets): don't `json.dumps` to create key
deepyaman Apr 9, 2024
a4f1d67
fix(datasets): make connection config key hashable
deepyaman Apr 9, 2024
f085a46
chore(datasets): mark uncovered line with a pragma
deepyaman Apr 9, 2024
19380d3
Merge branch 'main' into feat/datasets/ibis-table-dataset
deepyaman Apr 9, 2024
3bbfa3b
test(datasets): add a test case for list in config
deepyaman Apr 9, 2024
4757839
test(datasets): add bug reported by Iñigo to suite
deepyaman Apr 9, 2024
46fe6bd
Merge branch 'main' into feat/datasets/ibis-table-dataset
elena-khaustova-qb Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
```bash
pip install kedro-datasets[pandas-parquetdataset]
```
* Remove `setup.py` and move to `pyproject.toml` completely for `kedro-datasets`.
* Removed `setup.py` and move to `pyproject.toml` completely for `kedro-datasets`.
* Added `NetCDFDataset` for loading and saving `*.nc` files.
* Added dataset to load/save with Ibis.

## Bug fixes and other changes
* If using MSSQL, `load_args:params` will be typecasted as tuple.
* Fixed bug with loading datasets from Hugging Face. Now allows passing parameters to the load_dataset function.
* Make `connection_args` argument optional when calling `create_connection()` in `sql_dataset.py`.
* Made `connection_args` argument optional when calling `create_connection()` in `sql_dataset.py`.

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:
Expand All @@ -23,8 +24,8 @@ Many thanks to the following Kedroids for contributing PRs to this release:
# Release 2.1.0
## Major features and improvements
* Added `MatlabDataset` which uses `scipy` to save and load `.mat` files.
* Extend preview feature for matplotlib, plotly and tracking datasets.
* Allow additional parameters for sqlalchemy engine when using sql datasets.
* Extended preview feature for matplotlib, plotly and tracking datasets.
* Allowed additional parameters for sqlalchemy engine when using sql datasets.

## Bug fixes and other changes
* Removed Windows specific conditions in `pandas.HDFDataset` extra dependencies
Expand All @@ -40,14 +41,14 @@ Many thanks to the following Kedroids for contributing PRs to this release:
* Removed Dataset classes ending with "DataSet", use the "Dataset" spelling instead.
* Added Hugging Face datasets `huggingface.HFDataset` and `huggingface.HFTransformerPipelineDataset`.
* Removed support for Python 3.7 and 3.8.
* Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html).
* Bump `s3fs` to latest calendar-versioned release.
* Added [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html) support for Spark- and Databricks-based datasets.
* Bumped `s3fs` to latest calendar-versioned release.
* `PartitionedDataset` and `IncrementalDataset` now both support versioning of the underlying dataset.

## Bug fixes and other changes
* Fixed bug with loading models saved with `TensorFlowModelDataset`.
* Make dataset parameters keyword-only.
* Correct pandas-gbq as py311 dependency.
* Made dataset parameters keyword-only.
* Corrected pandas-gbq as py311 dependency.

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:
Expand All @@ -65,7 +66,7 @@ Many thanks to the following Kedroids for contributing PRs to this release:
* Delayed backend connection for `pandas.SQLTableDataset`, `pandas.SQLQueryDataset`, and `snowflake.SnowparkTableDataset`. In practice, this means that a dataset's connection details aren't used (or validated) until the dataset is accessed. On the plus side, the cost of connection isn't incurred regardless of when or whether the dataset is used.

## Bug fixes and other changes
* Fix erroneous warning when using an cloud protocol file path with SparkDataSet on Databricks.
* Fixed erroneous warning when using an cloud protocol file path with SparkDataSet on Databricks.
* Updated `PickleDataset` to explicitly mention `cloudpickle` support.

## Community contributions
Expand All @@ -77,7 +78,7 @@ Many thanks to the following Kedroids for contributing PRs to this release:

# Release 1.7.1
## Bug fixes and other changes
* Pin `tables` version on `kedro-datasets` for Python < 3.8.
* Pinned `tables` version on `kedro-datasets` for Python < 3.8.

## Upcoming deprecations for Kedro-Datasets 2.0.0
* Renamed dataset and error classes, in accordance with the [Kedro lexicon](https://github.com/kedro-org/kedro/wiki/Kedro-documentation-style-guide#kedro-lexicon). Dataset classes ending with "DataSet" are deprecated and will be removed in 2.0.0.
Expand Down
11 changes: 11 additions & 0 deletions kedro-datasets/kedro_datasets/ibis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Provide data loading and saving functionality for Ibis's backends."""
from typing import Any

import lazy_loader as lazy

# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
TableDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"table_dataset": ["TableDataset"]}
)
193 changes: 193 additions & 0 deletions kedro-datasets/kedro_datasets/ibis/table_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""Provide data loading and saving functionality for Ibis's backends."""
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
from kedro.io import AbstractDataset, DatasetError

if TYPE_CHECKING:
from ibis import BaseBackend


class TableDataset(AbstractDataset[ir.Table, ir.Table]):
"""``TableDataset`` loads/saves data from/to Ibis table expressions.

Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:

.. code-block:: yaml

cars:
type: ibis.TableDataset
filepath: data/01_raw/company/cars.csv
file_format: csv
table_name: cars
connection:
backend: duckdb
database: company.db
load_args:
sep: ","
nullstr: "#NA"
save_args:
materialized: table

motorbikes:
type: ibis.TableDataset
table_name: motorbikes
connection:
backend: duckdb
database: company.db

Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
advanced_data_catalog_usage.html>`_:

.. code-block:: pycon

>>> import ibis
>>> from kedro_datasets.ibis import TableDataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add an local example here with some dummy dataframe / duckdb? I know there is a blog post, but just looking at the docs/docstring I think it's not that easy to get how an user should use this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd have both a yaml and python example.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fyi, when connecting to ibis there are two major "paradigms", either a filebased connection or a db connection. From my limited knowledge I think the configuration arguments are quite different.

For reference, I am currently connecting to mssql dbs like this:

conn_table = TableDataset(
    connection={
        "host": "xxx.sql.azuresynapse.net",
        "database": "xxx",
        "query": {"driver": "ODBC Driver 17 for SQL Server"},
        "backend": "mssql",
    },
    table_name="xxx",
    load_args={"schema": "xxx"},
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the documentation here should probably clarify what the different arguments map to in the ibis connection object.

in the case of a filepath being provided, it will map to the backend method read_{file_format}, e.g. https://ibis-project.org/backends/duckdb#ibis.backends.duckdb.Backend.read_parquet

otherwise it will get the table from ibis.{backend}.Backend.table where the Backend object is obtained through ibis.{backend}.connect e.g. https://ibis-project.org/backends/mssql

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot to add an example, it seems. 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the documentation here should probably clarify what the different arguments map to in the ibis connection object.

Added much more detail on this!

>>>
>>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = TableDataset(
... table_name="test",
... connection={"backend": "duckdb", "database": tmp_path / "file.db"},
... save_args={"materialized": "table"},
... )
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.execute().equals(reloaded.execute())

"""

DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {
"materialized": "view",
"overwrite": True,
}

_connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {}

def __init__( # noqa: PLR0913
self,
*,
filepath: str | None = None,
file_format: str | None = None,
table_name: str | None = None,
connection: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
) -> None:
"""Creates a new ``TableDataset`` pointing to a table (or file).

``TableDataset`` connects to the Ibis backend object constructed
from the connection configuration. The `backend` key provided in
the config can be any of the `supported backends <https://ibis-\
project.org/install>`_. The remaining dictionary entries will be
passed as arguments to the underlying ``connect()`` method (e.g.
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\
#ibis.duckdb.connect>`_).

If ``filepath`` and ``file_format`` are given, the corresponding
read method (e.g. `read_csv() <https://ibis-project.org/backends/\
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load
the file with the backend. Note that only the data is loaded; no
link to the underlying file exists past ``TableDataset.load()``.

If ``table_name`` is given (and ``filepath`` isn't), the dataset
establishes a connection to the relevant table for the execution
backend. Therefore, Ibis doesn't fetch data on load; all compute
is deferred until materialization, when the expression is saved.
In practice, this happens when another ``TableDataset`` instance
is saved, after running code defined across one more more nodes.

Args:
filepath: Path to a file to register as a table. Most useful
for loading data into your data warehouse (for testing).
file_format: Specifies the input file format for `filepath`.
table_name: The name of the table or view to read or create.
connection: Configuration for connecting to an Ibis backend.
load_args: Additional arguments passed to the Ibis backend's
`read_{file_format}` method.
save_args: Additional arguments passed to the Ibis backend's
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any docs we can link to that specify which values a user can supply to materialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of anywhere it's documented centrally (i.e. not on a backend-specific basis) in Ibis. @lostmygithubaccount any chance you know?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not, this is on my TODO list to add API docs for alongside the other read_* and to_* methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! @merelcht if it's OK then, we'll improve the documentation on the Ibis side, and then whenever that happens, we can make the dataset docs reference that.

`create_{materialized}` method. By default, ``ir.Table``
objects are materialized as views. To save a table using
a different materialization strategy, supply a value for
`materialized` in `save_args`.
"""
if filepath is None and table_name is None:
raise DatasetError(
"Must provide at least one of `filepath` or `table_name`."
)

self._filepath = filepath
self._file_format = file_format
self._table_name = table_name
self._connection_config = connection

# Set load and save arguments, overwriting defaults if provided.
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)

self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

self._materialized = self._save_args.pop("materialized")

@property
def connection(self) -> BaseBackend:
def hashable(value):
if isinstance(value, dict):
return tuple((k, hashable(v)) for k, v in sorted(value.items()))
if isinstance(value, list):
return tuple(hashable(x) for x in value)
return value

cls = type(self)
key = hashable(self._connection_config)
if key not in cls._connections:
import ibis

config = deepcopy(self._connection_config)
backend = getattr(ibis, config.pop("backend"))
cls._connections[key] = backend.connect(**config)

return cls._connections[key]

def _load(self) -> ir.Table:
if self._filepath is not None:
if self._file_format is None:
raise NotImplementedError

reader = getattr(self.connection, f"read_{self._file_format}")
return reader(self._filepath, self._table_name, **self._load_args)
else:
return self.connection.table(self._table_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently when loading tables thru the mssql backend I am doing conn.table(table_name="XXX", schema="A").

an easy solution would be to add the load args into the conn.table call and specify the schema as a load_arg, but imo this is suboptimal, as the write and save schema should usually (in my experience, my assertion could be wrong) be the same

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gforsyth not sure if you have any insight here wrt the upcoming changes in ibis around schema and db hierarchy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so starting in 9.0, schema will be deprecated as a kwarg. We will be using the word "database" to refer to a collection of tables and the word "catalog" to refer to a collection of "database".

For specifying only the database, you would do:

conn.table(table_name="XX", database="A")

if the database is located in a catalog (like, say dbo or sys) you would do either of:

conn.table(table_name="XX", database="dbo.A")

or

conn.table(table_name="XX", database=("dbo", "A"))

(schema will still work in 9.0, it will just raise a FutureWarning)


def _save(self, data: ir.Table) -> None:
if self._table_name is None:
raise DatasetError("Must provide `table_name` for materialization.")

writer = getattr(self.connection, f"create_{self._materialized}")
writer(self._table_name, data, **self._save_args)

def _describe(self) -> dict[str, Any]:
return {
"filepath": self._filepath,
"file_format": self._file_format,
"table_name": self._table_name,
"connection_config": self._connection_config,
"load_args": self._load_args,
"save_args": self._save_args,
"materialized": self._materialized,
}

def _exists(self) -> bool:
return (
self._table_name is not None and self._table_name in self.connection.tables
)
23 changes: 23 additions & 0 deletions kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@ huggingface-hfdataset = ["datasets", "huggingface_hub"]
huggingface-hftransformerpipelinedataset = ["transformers"]
huggingface = ["kedro-datasets[huggingface-hfdataset,huggingface-hftransformerpipelinedataset]"]

ibis-bigquery = ["ibis-framework[bigquery]"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could do a CI script to keep this bit in sync

ibis-clickhouse = ["ibis-framework[clickhouse]"]
ibis-dask = ["ibis-framework[dask]"]
ibis-datafusion = ["ibis-framework[datafusion]"]
ibis-druid = ["ibis-framework[druid]"]
ibis-duckdb = ["ibis-framework[duckdb]"]
ibis-exasol = ["ibis-framework[exasol]"]
ibis-flink = ["ibis-framework", "apache-flink"]
ibis-impala = ["ibis-framework[impala]"]
ibis-mssql = ["ibis-framework[mssql]"]
ibis-mysql = ["ibis-framework[mysql]"]
ibis-oracle = ["ibis-framework[oracle]"]
ibis-pandas = ["ibis-framework[pandas]"]
ibis-polars = ["ibis-framework[polars]"]
ibis-postgres = ["ibis-framework[postgres]"]
ibis-pyspark = ["ibis-framework[pyspark]"]
ibis-risingwave = ["ibis-framework[risingwave]"]
ibis-snowflake = ["ibis-framework[snowflake]"]
ibis-sqlite = ["ibis-framework[sqlite]"]
ibis-trino = ["ibis-framework[trino]"]
ibis = ["ibis-framework"]

json-jsondataset = []
json = ["kedro-datasets[json-jsondataset]"]

Expand Down Expand Up @@ -180,6 +202,7 @@ test = [
"geopandas>=0.6.0, <1.0",
"hdfs>=2.5.8, <3.0",
"holoviews>=1.13.0",
"ibis-framework[duckdb,examples]",
"import-linter[toml]==1.2.6",
"ipython>=7.31.1, <8.0",
"Jinja2<3.1.0",
Expand Down
Empty file.
Loading
Loading