Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DataCatalog private methods with DataCatalog 2.0 API public methods #2274

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from kedro.io import DataCatalog
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to import this only as a fallback if there's an importError when we are in an older kedro version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need DataCatalog as the “baseline” import so our code is backward compatible with older Kedro (which doesn’t have KedroDataCatalog)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the line 7 be inside except ImportError at line 13 ? Normally that is how we were doing bc before. Wouldn't this raise error if DataCatalog is not available in future ?


try:
from kedro.io import KedroDataCatalog

IS_DATACATALOG_2 = True
except ImportError:
IS_DATACATALOG_2 = False

try:
# kedro 0.18.11 onwards
from kedro.io.core import DatasetError
Expand Down Expand Up @@ -91,7 +98,10 @@ def resolve_dataset_factory_patterns(

for dataset_name in datasets:
try:
catalog._get_dataset(dataset_name, suggest=False)
if IS_DATACATALOG_2 and isinstance(catalog, KedroDataCatalog):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not regarding this PR but I remember Elena mentioned resolve_dataset_factory_patterns will happen on kedro side. Do we still need to have this function here? Can we call this function only when it is not IS_DATACATALOG_2 ?

catalog.get(dataset_name)
else:
catalog._get_dataset(dataset_name, suggest=False)
except Exception: # noqa: BLE001 # pragma: no cover
continue

Expand Down
36 changes: 25 additions & 11 deletions package/kedro_viz/data_access/repositories/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
from typing import TYPE_CHECKING, Dict, Optional

from kedro.io import DataCatalog

try:
from kedro.io import KedroDataCatalog

IS_DATACATALOG_2 = True
except ImportError:
IS_DATACATALOG_2 = False

from packaging.version import parse

from kedro_viz.constants import KEDRO_VERSION
Expand Down Expand Up @@ -78,11 +86,14 @@ def layers_mapping(self): # noqa: PLR0912

self._layers_mapping = {}

# Temporary try/except block so the Kedro develop branch can work with Viz.
try:
datasets = self._catalog._data_sets
except Exception: # noqa: BLE001 # pragma: no cover
datasets = self._catalog._datasets
if IS_DATACATALOG_2 and isinstance(self._catalog, KedroDataCatalog):
datasets = self._catalog.list()
else:
# Temporary try/except block so the Kedro develop branch can work with Viz.
try:
datasets = self._catalog._data_sets
except Exception: # noqa: BLE001 # pragma: no cover
datasets = self._catalog._datasets

# Support for Kedro 0.18.x
if KEDRO_VERSION < parse("0.19.0"): # pragma: no cover
Expand All @@ -99,8 +110,11 @@ def layers_mapping(self): # noqa: PLR0912
self._layers_mapping[dataset_name] = layer

for dataset_name in datasets:
dataset = self._catalog._get_dataset(dataset_name)
metadata = getattr(dataset, "metadata", None)
if IS_DATACATALOG_2 and isinstance(self._catalog, KedroDataCatalog):
dataset = self._catalog.get(dataset_name)
else:
dataset = self._catalog._get_dataset(dataset_name)
metadata = getattr(dataset, "metadata", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we handling metadata in 2.0 ?

if not metadata:
continue
try:
Expand All @@ -121,11 +135,11 @@ def layers_mapping(self): # noqa: PLR0912
def get_dataset(self, dataset_name: str) -> Optional["AbstractDataset"]:
dataset_obj: Optional["AbstractDataset"]
try:
# Kedro 0.18.1 introduced the `suggest` argument to disable the expensive
# fuzzy-matching process.
if KEDRO_VERSION >= parse("0.18.1"):
if IS_DATACATALOG_2 and isinstance(self._catalog, KedroDataCatalog):
dataset_obj = self._catalog.get(dataset_name)
elif KEDRO_VERSION >= parse("0.18.1"):
dataset_obj = self._catalog._get_dataset(dataset_name, suggest=False)
else: # pragma: no cover
else:
dataset_obj = self._catalog._get_dataset(dataset_name)
except DatasetNotFoundError:
dataset_obj = MemoryDataset()
Expand Down
62 changes: 61 additions & 1 deletion package/tests/test_data_access/test_managers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Dict
from unittest.mock import call

import networkx as nx
import pytest
from kedro.io import DataCatalog, MemoryDataset
from kedro.io import DataCatalog, KedroDataCatalog, MemoryDataset
from kedro.io.core import DatasetError
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
Expand Down Expand Up @@ -636,3 +637,62 @@ def test_resolve_dataset_factory_patterns(
data_access_manager.resolve_dataset_factory_patterns(example_catalog, pipelines)

assert "model_inputs#csv" in new_catalog.as_dict().keys()

@pytest.mark.parametrize(
"is_data_catalog_2, use_kedro_data_catalog_cls",
[
(False, False),
(True, False),
],
)
def test_resolve_dataset_factory_patterns_fallback_private_api(
self, is_data_catalog_2, use_kedro_data_catalog_cls, mocker
):
mocker.patch(
"kedro_viz.data_access.managers.IS_DATACATALOG_2", is_data_catalog_2
)

# Create a plain DataCatalog
if use_kedro_data_catalog_cls:
from kedro.io import KedroDataCatalog

catalog = KedroDataCatalog({"test_dataset": MemoryDataset()})
else:
catalog = DataCatalog({"test_dataset": MemoryDataset()})

# Spy on the private method
spy_get_dataset = mocker.spy(catalog, "_get_dataset")

# pipeline has both "test_dataset" (input) and "test_output" (output)
pipelines = {
"test_pipeline": Pipeline([node(identity, "test_dataset", "test_output")])
}

manager = DataAccessManager()
manager.resolve_dataset_factory_patterns(catalog, pipelines)

# Expect exactly two calls: "test_dataset" and "test_output"
spy_get_dataset.assert_has_calls(
[call("test_dataset", suggest=False), call("test_output", suggest=False)],
any_order=True,
)
assert spy_get_dataset.call_count == 2

def test_resolve_dataset_factory_patterns_kedro_data_catalog(self, mocker):
mocker.patch("kedro_viz.data_access.managers.IS_DATACATALOG_2", True)

catalog = KedroDataCatalog({"my_ds": MemoryDataset()})
spy_public_get = mocker.spy(catalog, "get")

pipelines = {
"test_pipeline": Pipeline([node(identity, "my_ds", "test_output")])
}

manager = DataAccessManager()
manager.resolve_dataset_factory_patterns(catalog, pipelines)

# Expect calls for both input "my_ds" and output "test_output"
spy_public_get.assert_has_calls(
[call("my_ds"), call("test_output")], any_order=True
)
assert spy_public_get.call_count == 2
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from kedro.io.data_catalog import DataCatalog
from kedro.io import DataCatalog, MemoryDataset
from packaging.version import parse

from kedro_viz.data_access.repositories import CatalogRepository

Expand Down Expand Up @@ -67,3 +68,65 @@ def test_get_layer_mapping_from_metadata(self):
catalog = DataCatalog.from_config(catalog_config)
repo.set_catalog(catalog)
assert repo.get_layer_for_dataset("car") == "raw"


class TestDataCatalogRepositoryExtended:
def test_dataset_no_metadata(self):
"""
Covers lines where dataset has no 'metadata' attribute,
so the code in layers_mapping sees 'metadata' is None and skips logic.
"""
repo = CatalogRepository()
catalog_config = {
"cars@pandas": {
"type": "pandas.CSVDataset",
"filepath": "cars.csv",
# No 'metadata' here
}
}
catalog = DataCatalog.from_config(catalog_config)
repo.set_catalog(catalog)
# Should return None since no layer is found
assert repo.get_layer_for_dataset("cars") is None

@pytest.mark.parametrize(
"kedro_version_str, expected_layer, add_layers_attr, metadata_layer",
[
# Simulate old Kedro (< 0.19.0)
("0.18.9", None, True, None),
# Simulate new Kedro (>= 0.19.0)
("0.19.1", "my_layer", False, "my_layer"),
],
)
def test_layers_mapping_various_versions(
self, kedro_version_str, expected_layer, add_layers_attr, metadata_layer, mocker
):
mocker.patch(
"kedro_viz.data_access.repositories.catalog.KEDRO_VERSION",
parse(kedro_version_str),
)

repo = CatalogRepository()
if metadata_layer:
# For new Kedro: rely on metadata
catalog_config = {
"my_dataset": {
"type": "pandas.CSVDataset",
"filepath": "my.csv",
"metadata": {"kedro-viz": {"layer": metadata_layer}},
}
}
catalog = DataCatalog.from_config(catalog_config)
else:
# For old Kedro: no metadata
catalog = DataCatalog({"my_dataset": MemoryDataset()})

# Simulating old Kedro
if add_layers_attr:
catalog.layers = None

repo.set_catalog(catalog)
layers_map = repo.layers_mapping

# Now "my_dataset" should map to expected_layer
assert layers_map["my_dataset"] == expected_layer
Loading