Skip to content

Commit

Permalink
Parallelize validate (#418)
Browse files Browse the repository at this point in the history
* parallelize validate

* lint

* multi-process?

* fix pickling error

* refac

* dummy implementation of validate_many in demo model

* validation parallelism global variable

* validate many is None for run_many usage

* rename get_shape_stuff to prepare_shapes
  • Loading branch information
ssssarah authored Aug 16, 2024
1 parent a1ae462 commit 244e900
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 110 deletions.
2 changes: 1 addition & 1 deletion kgforge/core/archetypes/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def validate(
# Replace None by self._validate_many to switch to optimized bulk validation.
run(
self._validate_one,
None,
self._validate_many,
data,
execute_actions=execute_actions_before,
exception=ValidationError,
Expand Down
7 changes: 2 additions & 5 deletions kgforge/specializations/models/demo_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def _validate_one(self, resource: Resource, type_: str, inference: str) -> None:
if reason is not None:
raise ValidationError(reason)

_validate_many = None

# Utils.

@staticmethod
Expand All @@ -124,11 +126,6 @@ def _generate_context(self) -> Dict:
def schema_id(self, type: str) -> str:
raise not_supported()

def _validate_many(
self, resources: List[Resource], type_: str, inference: str
) -> None:
raise not_supported()


class ModelLibrary:
"""Simulate a third-party library handling interactions with the data used by the model."""
Expand Down
169 changes: 87 additions & 82 deletions kgforge/specializations/models/rdf/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,53 +63,52 @@
ALL_COLLECTORS_MAP = {c.constraint(): c for c in ALL_COLLECTORS}


def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]:
"""traverses the Shape SACL properties to collect constrained properties
This function is injected to pyshacl Shape object in order to traverse the Shacl graph.
It will call a specific collector depending on the SHACL property present in the NodeShape
Args:
predecessors: list of nodes that have being traversed, used to break circular
recursion
Returns:
properties, attributes: Tuple(list,dict), the collected properties and attributes
respectively gathered from the collectors
"""

parameters = self.parameters()
properties = []
attributes = {}
done_collectors = set()
for param in iter(parameters):
if param in ALL_COLLECTORS_MAP:
constraint_collector = ALL_COLLECTORS_MAP[param]
if constraint_collector not in done_collectors:
c = constraint_collector(self)
predecessors.add(self.node)
props, attrs = c.collect(predecessors)
if attrs:
attributes.update(attrs)
if props:
properties.extend(props)
done_collectors.add(constraint_collector)
if predecessors:
predecessors.remove(self.node)
else:
# FIXME: there are some SHACL constrains that are not implemented
# raise IndexError(f"{param} not implemented!")
pass

return properties, attributes


class ShapeWrapper(Shape):
__slots__ = ("__dict__",)

def __init__(self, shape: Shape) -> None:
super().__init__(shape.sg, shape.node, shape._p, shape._path, shape.logger)

def traverse(self, predecessors: Set[URIRef]) -> Tuple[List, Dict]:
"""traverses the Shape SHACL properties to collect constrained properties
This function is injected to pyshacl Shape object in order to traverse the Shacl graph.
It will call a specific collector depending on the SHACL property present in the NodeShape
Args:
predecessors: list of nodes that have being traversed, used to break circular
recursion
Returns:
properties, attributes: Tuple(list,dict), the collected properties and attributes
respectively gathered from the collectors
"""

parameters = self.parameters()
properties = []
attributes = {}
done_collectors = set()
for param in iter(parameters):
if param in ALL_COLLECTORS_MAP:
constraint_collector = ALL_COLLECTORS_MAP[param]
if constraint_collector not in done_collectors:
c = constraint_collector(self)
predecessors.add(self.node)
props, attrs = c.collect(predecessors)
if attrs:
attributes.update(attrs)
if props:
properties.extend(props)
done_collectors.add(constraint_collector)
if predecessors:
predecessors.remove(self.node)
else:
# FIXME: there are some SHACL constrains that are not implemented
# raise IndexError(f"{param} not implemented!")
pass

return properties, attributes

def parameters(self):
return (
p
Expand All @@ -125,7 +124,7 @@ def __init__(self, graph: RDFDataset) -> None:
# the following line triggers the shape loading
self._shapes = self.shapes

def lookup_shape_from_node(self, node: URIRef) -> Shape:
def lookup_shape_from_node(self, node: URIRef) -> Optional[ShapeWrapper]:
"""Overwrite function to inject the transverse function for only to requested nodes.
Args:
Expand All @@ -138,12 +137,19 @@ def lookup_shape_from_node(self, node: URIRef) -> Shape:
shape = self._node_shape_cache[node]
except KeyError as ke:
raise ValueError(f"Unknown shape node id '{node}': {str(ke)}") from ke
if shape:
shape_wrapper = ShapeWrapper(self._node_shape_cache[node])
if not hasattr(shape_wrapper, "traverse"):
shape_wrapper.traverse = types.MethodType(traverse, shape_wrapper)
return shape_wrapper
return shape

return ShapeWrapper(shape)

@property
def shapes(self): # pyshacl implementation returns dict_values (not list). This cannot be pickled.
"""
:returns: [Shape]
:rtype: list(pyshacl.shape.Shape)
"""
if len(self._node_shape_cache) < 1:
self._build_node_shape_cache()
return list(self._node_shape_cache.values())


class RdfService:
Expand Down Expand Up @@ -193,7 +199,8 @@ def materialize(self, iri: URIRef) -> NodeProperties:
"""
raise NotImplementedError()

def validate(self, resource: Resource, type_: str, inference: str):
@staticmethod
def type_to_validate_against(resource: Resource, type_: str) -> str:
try:
if not resource.get_type() and not type_:
raise ValueError(
Expand All @@ -209,12 +216,25 @@ def validate(self, resource: Resource, type_: str, inference: str):
raise TypeError(
f"A single type should be provided for validation: {str(exc)}"
) from exc
shape_iri = self.get_shape_uriref_from_class_fragment(type_to_validate)
data_graph = as_graph(resource, False, self.context, None, None)
shape, shacl_graph, ont_graph = self.get_shape_graph(shape_iri)
conforms, report_graph, report_text = self._validate(
shape_iri, data_graph, shape, shacl_graph, inference, ont_graph

return type_to_validate

@staticmethod
def validate(
resource: Resource, shape: ShapeWrapper, shacl_graph: Graph, ont_graph: Graph, type_to_validate: str, inference: str, context: Context
) -> Tuple[bool, Graph, str]:

data_graph = as_graph(resource, False, context, None, None)

inplace = inference and inference != "none"
conforms, report_graph, report_text = validate(
data_graph=data_graph,
shacl_graph=shacl_graph,
ont_graph=ont_graph,
inference=inference,
inplace=inplace,
)

# when no schema target was found in the data (i.e no data was selected for validation)
# conforms is currently set to True by pyShacl. Here it is set to False so that
# the validation fails when type_to_validate is not present in the data
Expand Down Expand Up @@ -245,24 +265,6 @@ def validate(self, resource: Resource, type_: str, inference: str):
)
return conforms, report_graph, report_text

def _validate(
self,
iri: str,
data_graph: Graph,
shape: Shape,
shacl_graph: Graph,
inference: str,
ont_graph: Graph = None,
) -> Tuple[bool, Graph, str]:
inplace = inference and inference != "none"
return validate(
data_graph=data_graph,
shacl_graph=shacl_graph,
ont_graph=ont_graph,
inference=inference,
inplace=inplace,
)

@abstractmethod
def resolve_context(self, iri: str) -> Dict:
"""For a given IRI return its resolved context recursively"""
Expand Down Expand Up @@ -408,7 +410,7 @@ def _process_imported_resource(
imported_resource_uriref,
resource_to_named_graph_uriref,
collect_imported_ontology: bool,
):
) -> Tuple[Graph, Graph]:
imported_graph_id = resource_to_named_graph_uriref[imported_resource_uriref]
if imported_resource_uriref not in self._imported:
imported_resource_graph, imported_ont_graph = (
Expand Down Expand Up @@ -618,25 +620,28 @@ def _import_shape(self, node_shape_uriref: URIRef):
f"Failed to import the shape '{node_shape_uriref}': {str(e)}"
) from e

def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[Shape, Graph, Graph]:
def get_shape_graph(self, node_shape_uriref: URIRef) -> Tuple[ShapeWrapper, Graph, Graph]:
if node_shape_uriref not in self.shape_to_defining_resource:
raise ValueError(f"Unknown shape '{node_shape_uriref}'")

if self.shape_to_defining_resource[node_shape_uriref] not in self._imported:
return self._import_shape(node_shape_uriref)

try:
shape = self.get_shape_graph_wrapper().lookup_shape_from_node(
node_shape_uriref
)
except ValueError:
return self._import_shape(node_shape_uriref)
if self.shape_to_defining_resource[node_shape_uriref] in self._imported:
defining_resource = self.shape_to_defining_resource[node_shape_uriref]
shape_graph = self._dataset_graph.graph(
self._get_named_graph_from_shape(node_shape_uriref)
)
ont_graph = self._build_imported_ontology_graph(defining_resource)
else:
return self._import_shape(node_shape_uriref)
ont_graph = self._build_imported_ontology_graph(
self.shape_to_defining_resource[node_shape_uriref]
)

return shape, shape_graph, ont_graph

return shape, shape_graph, ont_graph
except ValueError:
return self._import_shape(node_shape_uriref)

def _build_imported_ontology_graph(self, resurce_uriref):
ont_graph = Graph()
Expand Down
2 changes: 1 addition & 1 deletion kgforge/specializations/models/rdf/store_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def resolve_context(self, iri: str) -> Dict:
def generate_context(self) -> Dict:
for shape_uriref, schema_uriref in self.shape_to_defining_resource.items():
if schema_uriref not in self._imported:
self._transitive_load_shape_graph(
self._transitive_load_resource_graph(
self._get_named_graph_from_shape(shape_uriref), schema_uriref
)
# reloads the shapes graph
Expand Down
Loading

0 comments on commit 244e900

Please sign in to comment.