Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hard skip empty datasets nodes in dataservices #3285

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Improve datasets' API perfs (do not fetch resources from Mongo, add /api/2/datasets endpoint…) [#3279](https://github.com/opendatateam/udata/pull/3279)
- Use YEARS_OF_INACTIVITY_BEFORE_DELETION all around in code [#3287](https://github.com/opendatateam/udata/pull/3287)
- Skip empty datasets nodes referenced in dataservices before creating an harvest job item [#3285](https://github.com/opendatateam/udata/pull/3285)

## 10.1.3 (2025-03-14)

Expand Down
2 changes: 2 additions & 0 deletions udata/core/dataset/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,8 @@ def dataset_from_rdf(graph: Graph, dataset=None, node=None, remote_url_prefix: s

dataset.title = rdf_value(d, DCT.title)
if not dataset.title:
# If the dataset is externaly defined (so without title and just with a link to the dataset XML)
# we should have skipped it way before in :ExcludeExternalyDefinedDataset
raise HarvestSkipException("missing title on dataset")

# Support dct:abstract if dct:description is missing (sometimes used instead)
Expand Down
2 changes: 1 addition & 1 deletion udata/harvest/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def process_dataset(self, remote_id: str, **kwargs):
]
self.save_job()

def is_done(self) -> bool:
def has_reached_max_items(self) -> bool:
"""Should be called after process_dataset to know if we reach the max items"""
return self.max_items and len(self.job.items) >= self.max_items

Expand Down
48 changes: 42 additions & 6 deletions udata/harvest/backends/dcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from udata.core.dataservices.rdf import dataservice_from_rdf
from udata.core.dataset.rdf import dataset_from_rdf
from udata.harvest.models import HarvestItem
from udata.harvest.models import HarvestError, HarvestItem
from udata.i18n import gettext as _
from udata.rdf import (
DCAT,
Expand Down Expand Up @@ -77,9 +77,19 @@ def inner_harvest(self):
self.process_one_datasets_page(page_number, page)
serialized_graphs.append(page.serialize(format=fmt, indent=None))

# We do a second pass to have all datasets in memory and attach datasets
# to dataservices. It could be better to be one pass of graph walking and
# then one pass of attaching datasets to dataservices.
for page_number, page in self.walk_graph(self.source.url, fmt):
self.process_one_dataservices_page(page_number, page)

if not self.dryrun and self.has_reached_max_items():
# We have reached the max_items limit. Warn the user that all the datasets may not be present.
error = HarvestError(
message=f"{self.max_items} max items reached, not all datasets/dataservices were retrieved"
)
self.job.errors.append(error)

# The official MongoDB document size in 16MB. The default value here is 15MB to account for other fields in the document (and for difference between * 1024 vs * 1000).
max_harvest_graph_size_in_mongo = current_app.config.get(
"HARVEST_MAX_CATALOG_SIZE_IN_MONGO"
Expand Down Expand Up @@ -146,25 +156,51 @@ def walk_graph(self, url: str, fmt: str) -> Generator[tuple[int, Graph], None, N
break

yield page_number, subgraph
if self.is_done():
if self.has_reached_max_items():
return

page_number += 1

def process_one_datasets_page(self, page_number: int, page: Graph):
for node in page.subjects(RDF.type, DCAT.Dataset):
remote_id = page.value(node, DCT.identifier)
if self.is_dataset_external_to_this_page(page, node):
continue

self.process_dataset(remote_id, page_number=page_number, page=page, node=node)

if self.is_done():
if self.has_reached_max_items():
return

def is_dataset_external_to_this_page(self, page: Graph, node) -> bool:
# In dataservice nodes we have `servesDataset` or `hasPart` that can contains nodes
# with type=dataset. We don't want to process them because these nodes are empty (they
# only contains a link to the dataset definition).
# These datasets are either present in the catalog in previous or next pages or
# external from the catalog we are currently harvesting (so we don't want to harvest them).
# First we thought of skipping them inside `dataset_from_rdf` (see :ExcludeExternalyDefinedDataset)
# but it creates a lot of "fake" items in the job and raising problems (reaching the max harvest item for
# example and not getting to the "real" datasets/dataservices in subsequent pages)
# So to prevent creating a lot of useless items in the job we first thought about checking to see if there is no title and
# if `isPrimaryTopicOf` is present. But it may be better to check if the only link of the node with the current page is a
# `servesDataset` or `hasPart`. If it's the case, the node is only present in a dataservice. (maybe we could also check that
# the `_other_node` is a dataservice?)
# `isPrimaryTopicOf` is the tag present in the first harvester raising the problem, it may exists other
# values of the same sort we need to check here.
# This is not dangerous because we check for missing title in `dataset_from_rdf` later so we would have skipped
# this dataset anyway.

predicates = [link_type for (_other_node, link_type) in page.subject_predicates(node)]
return len(predicates) == 1 and (
predicates[0] == DCAT.servesDataset or predicates[0] == DCT.hasPart
)

def process_one_dataservices_page(self, page_number: int, page: Graph):
for node in page.subjects(RDF.type, DCAT.DataService):
remote_id = page.value(node, DCT.identifier)
self.process_dataservice(remote_id, page_number=page_number, page=page, node=node)

if self.is_done():
if self.has_reached_max_items():
return

def inner_process_dataset(self, item: HarvestItem, page_number: int, page: Graph, node):
Expand Down Expand Up @@ -266,7 +302,7 @@ def walk_graph(self, url: str, fmt: str) -> Generator[tuple[int, Graph], None, N
subgraph.parse(data=ET.tostring(child), format=fmt)

yield page_number, subgraph
if self.is_done():
if self.has_reached_max_items():
return

next_record = self.next_record_if_should_continue(start, search_results)
Expand Down Expand Up @@ -375,7 +411,7 @@ def walk_graph(self, url: str, fmt: str) -> Generator[tuple[int, Graph], None, N
raise ValueError("Failed to fetch CSW content")

yield page_number, subgraph
if self.is_done():
if self.has_reached_max_items():
return

next_record = self.next_record_if_should_continue(start, search_results)
Expand Down
2 changes: 1 addition & 1 deletion udata/harvest/tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def inner_harvest(self):
mock_initialize.send(self)
for i in range(self.config.get("count", DEFAULT_COUNT)):
self.process_dataset(str(i))
if self.is_done():
if self.has_reached_max_items():
return

def inner_process_dataset(self, item: HarvestItem):
Expand Down
4 changes: 2 additions & 2 deletions udata/harvest/tests/test_base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ class FakeBackend(BaseBackend):
def inner_harvest(self):
for remote_id in self.source.config.get("dataset_remote_ids", []):
self.process_dataset(remote_id)
if self.is_done():
if self.has_reached_max_items():
return

for remote_id in self.source.config.get("dataservice_remote_ids", []):
self.process_dataservice(remote_id)
if self.is_done():
if self.has_reached_max_items():
return

def inner_process_dataset(self, item: HarvestItem):
Expand Down