diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index e5508bd8a..e0cda2286 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -5,15 +5,16 @@ ```bash pip install kedro-datasets[pandas-parquetdataset] ``` -* Remove `setup.py` and move to `pyproject.toml` completely for `kedro-datasets`. +* Removed `setup.py` and move to `pyproject.toml` completely for `kedro-datasets`. * Added `NetCDFDataset` for loading and saving `*.nc` files. +* Added dataset to load/save with Ibis. * Remove support for `pandas < 2.0`. * Remove support for `pyspark < 3.0`. ## Bug fixes and other changes * If using MSSQL, `load_args:params` will be typecasted as tuple. * Fixed bug with loading datasets from Hugging Face. Now allows passing parameters to the load_dataset function. -* Make `connection_args` argument optional when calling `create_connection()` in `sql_dataset.py`. +* Made `connection_args` argument optional when calling `create_connection()` in `sql_dataset.py`. ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: @@ -25,8 +26,8 @@ Many thanks to the following Kedroids for contributing PRs to this release: # Release 2.1.0 ## Major features and improvements * Added `MatlabDataset` which uses `scipy` to save and load `.mat` files. -* Extend preview feature for matplotlib, plotly and tracking datasets. -* Allow additional parameters for sqlalchemy engine when using sql datasets. +* Extended preview feature for matplotlib, plotly and tracking datasets. +* Allowed additional parameters for sqlalchemy engine when using sql datasets. ## Bug fixes and other changes * Removed Windows specific conditions in `pandas.HDFDataset` extra dependencies @@ -42,14 +43,14 @@ Many thanks to the following Kedroids for contributing PRs to this release: * Removed Dataset classes ending with "DataSet", use the "Dataset" spelling instead. * Added Hugging Face datasets `huggingface.HFDataset` and `huggingface.HFTransformerPipelineDataset`. * Removed support for Python 3.7 and 3.8. -* Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html). -* Bump `s3fs` to latest calendar-versioned release. +* Added [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html) support for Spark- and Databricks-based datasets. +* Bumped `s3fs` to latest calendar-versioned release. * `PartitionedDataset` and `IncrementalDataset` now both support versioning of the underlying dataset. ## Bug fixes and other changes * Fixed bug with loading models saved with `TensorFlowModelDataset`. -* Make dataset parameters keyword-only. -* Correct pandas-gbq as py311 dependency. +* Made dataset parameters keyword-only. +* Corrected pandas-gbq as py311 dependency. ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: @@ -67,7 +68,7 @@ Many thanks to the following Kedroids for contributing PRs to this release: * Delayed backend connection for `pandas.SQLTableDataset`, `pandas.SQLQueryDataset`, and `snowflake.SnowparkTableDataset`. In practice, this means that a dataset's connection details aren't used (or validated) until the dataset is accessed. On the plus side, the cost of connection isn't incurred regardless of when or whether the dataset is used. ## Bug fixes and other changes -* Fix erroneous warning when using an cloud protocol file path with SparkDataSet on Databricks. +* Fixed erroneous warning when using an cloud protocol file path with SparkDataSet on Databricks. * Updated `PickleDataset` to explicitly mention `cloudpickle` support. ## Community contributions @@ -79,7 +80,7 @@ Many thanks to the following Kedroids for contributing PRs to this release: # Release 1.7.1 ## Bug fixes and other changes -* Pin `tables` version on `kedro-datasets` for Python < 3.8. +* Pinned `tables` version on `kedro-datasets` for Python < 3.8. ## Upcoming deprecations for Kedro-Datasets 2.0.0 * Renamed dataset and error classes, in accordance with the [Kedro lexicon](https://github.com/kedro-org/kedro/wiki/Kedro-documentation-style-guide#kedro-lexicon). Dataset classes ending with "DataSet" are deprecated and will be removed in 2.0.0. diff --git a/kedro-datasets/kedro_datasets/ibis/__init__.py b/kedro-datasets/kedro_datasets/ibis/__init__.py new file mode 100644 index 000000000..7e793c4e0 --- /dev/null +++ b/kedro-datasets/kedro_datasets/ibis/__init__.py @@ -0,0 +1,11 @@ +"""Provide data loading and saving functionality for Ibis's backends.""" +from typing import Any + +import lazy_loader as lazy + +# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901 +TableDataset: Any + +__getattr__, __dir__, __all__ = lazy.attach( + __name__, submod_attrs={"table_dataset": ["TableDataset"]} +) diff --git a/kedro-datasets/kedro_datasets/ibis/table_dataset.py b/kedro-datasets/kedro_datasets/ibis/table_dataset.py new file mode 100644 index 000000000..8d8bf770f --- /dev/null +++ b/kedro-datasets/kedro_datasets/ibis/table_dataset.py @@ -0,0 +1,193 @@ +"""Provide data loading and saving functionality for Ibis's backends.""" +from __future__ import annotations + +from copy import deepcopy +from typing import TYPE_CHECKING, Any, ClassVar + +import ibis.expr.types as ir +from kedro.io import AbstractDataset, DatasetError + +if TYPE_CHECKING: + from ibis import BaseBackend + + +class TableDataset(AbstractDataset[ir.Table, ir.Table]): + """``TableDataset`` loads/saves data from/to Ibis table expressions. + + Example usage for the + `YAML API `_: + + .. code-block:: yaml + + cars: + type: ibis.TableDataset + filepath: data/01_raw/company/cars.csv + file_format: csv + table_name: cars + connection: + backend: duckdb + database: company.db + load_args: + sep: "," + nullstr: "#NA" + save_args: + materialized: table + + motorbikes: + type: ibis.TableDataset + table_name: motorbikes + connection: + backend: duckdb + database: company.db + + Example usage for the + `Python API `_: + + .. code-block:: pycon + + >>> import ibis + >>> from kedro_datasets.ibis import TableDataset + >>> + >>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) + >>> + >>> dataset = TableDataset( + ... table_name="test", + ... connection={"backend": "duckdb", "database": tmp_path / "file.db"}, + ... save_args={"materialized": "table"}, + ... ) + >>> dataset.save(data) + >>> reloaded = dataset.load() + >>> assert data.execute().equals(reloaded.execute()) + + """ + + DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {} + DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = { + "materialized": "view", + "overwrite": True, + } + + _connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {} + + def __init__( # noqa: PLR0913 + self, + *, + filepath: str | None = None, + file_format: str | None = None, + table_name: str | None = None, + connection: dict[str, Any] | None = None, + load_args: dict[str, Any] | None = None, + save_args: dict[str, Any] | None = None, + ) -> None: + """Creates a new ``TableDataset`` pointing to a table (or file). + + ``TableDataset`` connects to the Ibis backend object constructed + from the connection configuration. The `backend` key provided in + the config can be any of the `supported backends `_. The remaining dictionary entries will be + passed as arguments to the underlying ``connect()`` method (e.g. + `ibis.duckdb.connect() `_). + + If ``filepath`` and ``file_format`` are given, the corresponding + read method (e.g. `read_csv() `_) is used to load + the file with the backend. Note that only the data is loaded; no + link to the underlying file exists past ``TableDataset.load()``. + + If ``table_name`` is given (and ``filepath`` isn't), the dataset + establishes a connection to the relevant table for the execution + backend. Therefore, Ibis doesn't fetch data on load; all compute + is deferred until materialization, when the expression is saved. + In practice, this happens when another ``TableDataset`` instance + is saved, after running code defined across one more more nodes. + + Args: + filepath: Path to a file to register as a table. Most useful + for loading data into your data warehouse (for testing). + file_format: Specifies the input file format for `filepath`. + table_name: The name of the table or view to read or create. + connection: Configuration for connecting to an Ibis backend. + load_args: Additional arguments passed to the Ibis backend's + `read_{file_format}` method. + save_args: Additional arguments passed to the Ibis backend's + `create_{materialized}` method. By default, ``ir.Table`` + objects are materialized as views. To save a table using + a different materialization strategy, supply a value for + `materialized` in `save_args`. + """ + if filepath is None and table_name is None: + raise DatasetError( + "Must provide at least one of `filepath` or `table_name`." + ) + + self._filepath = filepath + self._file_format = file_format + self._table_name = table_name + self._connection_config = connection + + # Set load and save arguments, overwriting defaults if provided. + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + self._materialized = self._save_args.pop("materialized") + + @property + def connection(self) -> BaseBackend: + def hashable(value): + if isinstance(value, dict): + return tuple((k, hashable(v)) for k, v in sorted(value.items())) + if isinstance(value, list): + return tuple(hashable(x) for x in value) + return value + + cls = type(self) + key = hashable(self._connection_config) + if key not in cls._connections: + import ibis + + config = deepcopy(self._connection_config) + backend = getattr(ibis, config.pop("backend")) + cls._connections[key] = backend.connect(**config) + + return cls._connections[key] + + def _load(self) -> ir.Table: + if self._filepath is not None: + if self._file_format is None: + raise NotImplementedError + + reader = getattr(self.connection, f"read_{self._file_format}") + return reader(self._filepath, self._table_name, **self._load_args) + else: + return self.connection.table(self._table_name) + + def _save(self, data: ir.Table) -> None: + if self._table_name is None: + raise DatasetError("Must provide `table_name` for materialization.") + + writer = getattr(self.connection, f"create_{self._materialized}") + writer(self._table_name, data, **self._save_args) + + def _describe(self) -> dict[str, Any]: + return { + "filepath": self._filepath, + "file_format": self._file_format, + "table_name": self._table_name, + "connection_config": self._connection_config, + "load_args": self._load_args, + "save_args": self._save_args, + "materialized": self._materialized, + } + + def _exists(self) -> bool: + return ( + self._table_name is not None and self._table_name in self.connection.tables + ) diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 97daca476..cb6ff1efd 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -49,6 +49,28 @@ huggingface-hfdataset = ["datasets", "huggingface_hub"] huggingface-hftransformerpipelinedataset = ["transformers"] huggingface = ["kedro-datasets[huggingface-hfdataset,huggingface-hftransformerpipelinedataset]"] +ibis-bigquery = ["ibis-framework[bigquery]"] +ibis-clickhouse = ["ibis-framework[clickhouse]"] +ibis-dask = ["ibis-framework[dask]"] +ibis-datafusion = ["ibis-framework[datafusion]"] +ibis-druid = ["ibis-framework[druid]"] +ibis-duckdb = ["ibis-framework[duckdb]"] +ibis-exasol = ["ibis-framework[exasol]"] +ibis-flink = ["ibis-framework", "apache-flink"] +ibis-impala = ["ibis-framework[impala]"] +ibis-mssql = ["ibis-framework[mssql]"] +ibis-mysql = ["ibis-framework[mysql]"] +ibis-oracle = ["ibis-framework[oracle]"] +ibis-pandas = ["ibis-framework[pandas]"] +ibis-polars = ["ibis-framework[polars]"] +ibis-postgres = ["ibis-framework[postgres]"] +ibis-pyspark = ["ibis-framework[pyspark]"] +ibis-risingwave = ["ibis-framework[risingwave]"] +ibis-snowflake = ["ibis-framework[snowflake]"] +ibis-sqlite = ["ibis-framework[sqlite]"] +ibis-trino = ["ibis-framework[trino]"] +ibis = ["ibis-framework"] + json-jsondataset = [] json = ["kedro-datasets[json-jsondataset]"] @@ -180,6 +202,7 @@ test = [ "geopandas>=0.6.0, <1.0", "hdfs>=2.5.8, <3.0", "holoviews>=1.13.0", + "ibis-framework[duckdb,examples]", "import-linter[toml]==1.2.6", "ipython>=7.31.1, <8.0", "Jinja2<3.1.0", diff --git a/kedro-datasets/tests/ibis/__init__.py b/kedro-datasets/tests/ibis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kedro-datasets/tests/ibis/test_table_dataset.py b/kedro-datasets/tests/ibis/test_table_dataset.py new file mode 100644 index 000000000..b7ee7baca --- /dev/null +++ b/kedro-datasets/tests/ibis/test_table_dataset.py @@ -0,0 +1,132 @@ +import duckdb +import ibis +import pytest +from kedro.io import DatasetError +from pandas.testing import assert_frame_equal + +from kedro_datasets.ibis import TableDataset + + +@pytest.fixture(scope="session") +def filepath_csv(tmp_path_factory): + path = (tmp_path_factory.mktemp("data") / "test.csv").as_posix() + ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}).to_csv(path) + return path + + +@pytest.fixture +def database(tmp_path): + return (tmp_path / "file.db").as_posix() + + +@pytest.fixture(params=[None]) +def connection_config(request, database): + return request.param or {"backend": "duckdb", "database": database} + + +@pytest.fixture +def table_dataset(connection_config, load_args, save_args): + return TableDataset( + table_name="test", + connection=connection_config, + load_args=load_args, + save_args=save_args, + ) + + +@pytest.fixture +def table_dataset_from_csv(filepath_csv, connection_config, load_args, save_args): + return TableDataset( + filepath=filepath_csv, + file_format="csv", + connection=connection_config, + load_args=load_args, + save_args=save_args, + ) + + +@pytest.fixture +def dummy_table(table_dataset_from_csv): + return table_dataset_from_csv.load() + + +class TestTableDataset: + def test_save_and_load(self, table_dataset, dummy_table, database): + """Test saving and reloading the data set.""" + table_dataset.save(dummy_table) + reloaded = table_dataset.load() + assert_frame_equal(dummy_table.execute(), reloaded.execute()) + + # Verify that the appropriate materialization strategy was used. + con = duckdb.connect(database) + assert not con.sql("SELECT * FROM duckdb_tables").fetchnumpy()["table_name"] + assert "test" in con.sql("SELECT * FROM duckdb_views").fetchnumpy()["view_name"] + + def test_exists(self, table_dataset, dummy_table): + """Test `exists` method invocation for both existing and + nonexistent data set.""" + assert not table_dataset.exists() + table_dataset.save(dummy_table) + assert table_dataset.exists() + + @pytest.mark.parametrize("load_args", [{"filename": True}], indirect=True) + def test_load_extra_params(self, table_dataset_from_csv, load_args): + """Test overriding the default load arguments.""" + assert "filename" in table_dataset_from_csv.load() + + @pytest.mark.parametrize("save_args", [{"materialized": "table"}], indirect=True) + def test_save_extra_params(self, table_dataset, save_args, dummy_table, database): + """Test overriding the default save arguments.""" + table_dataset.save(dummy_table) + + # Verify that the appropriate materialization strategy was used. + con = duckdb.connect(database) + assert ( + "test" in con.sql("SELECT * FROM duckdb_tables").fetchnumpy()["table_name"] + ) + assert not con.sql("SELECT * FROM duckdb_views").fetchnumpy()["view_name"] + + def test_no_filepath_or_table_name(connection_config): + pattern = r"Must provide at least one of `filepath` or `table_name`\." + with pytest.raises(DatasetError, match=pattern): + TableDataset(connection=connection_config) + + def test_save_no_table_name(self, table_dataset_from_csv, dummy_table): + pattern = r"Must provide `table_name` for materialization\." + with pytest.raises(DatasetError, match=pattern): + table_dataset_from_csv.save(dummy_table) + + @pytest.mark.parametrize( + ("connection_config", "key"), + [ + ( + {"backend": "duckdb", "database": "file.db", "extensions": ["spatial"]}, + ( + ("backend", "duckdb"), + ("database", "file.db"), + ("extensions", ("spatial",)), + ), + ), + # https://github.com/kedro-org/kedro-plugins/pull/560#discussion_r1536083525 + ( + { + "host": "xxx.sql.azuresynapse.net", + "database": "xxx", + "query": {"driver": "ODBC Driver 17 for SQL Server"}, + "backend": "mssql", + }, + ( + ("backend", "mssql"), + ("database", "xxx"), + ("host", "xxx.sql.azuresynapse.net"), + ("query", (("driver", "ODBC Driver 17 for SQL Server"),)), + ), + ), + ], + indirect=["connection_config"], + ) + def test_connection_config(self, mocker, table_dataset, connection_config, key): + """Test hashing of more complicated connection configuration.""" + mocker.patch(f"ibis.{connection_config['backend']}") + table_dataset.load() + assert key in table_dataset._connections