Skip to content

Commit

Permalink
Improve SHACL2Flink SPARQL query performance
Browse files Browse the repository at this point in the history
SPARQL is used by the tools intensively to transform SHACL expressions
into SQL expressions. This is slowing down when large knowledge graphs or large
SHACL models are used.
In this PR two main goals are achieved:
(1) Move from rdflib native representation to faster Oxigraph
representation where possible
(2) Replace owlrl to be able to update rdflib, pyshacl etc to most recent versions. To
achieve that a simplified transitive-closure algorithm is applied because transitive closure
is needed to make "single-step" rdfs:subClassOf evaluations in Flink. This also led to
correction of demo knowledge.ttl since the class defintions where not correct (i.e. rdfs:class
insted of rdfs:Class)

Signed-off-by: marcel <[email protected]>
  • Loading branch information
wagmarcel authored and abhijith-hr committed Nov 7, 2024
1 parent 9557135 commit 70de03d
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 74 deletions.
5 changes: 2 additions & 3 deletions semantic-model/shacl2flink/create_knowledge_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import sys
import os
import rdflib
import owlrl
import argparse
import lib.utils as utils


def parse_args(args=sys.argv[1:]):
Expand All @@ -33,8 +33,7 @@ def parse_args(args=sys.argv[1:]):
def main(knowledgefile, outputfile):
h = rdflib.Graph()
h.parse(knowledgefile)
owlrl.DeductiveClosure(owlrl.OWLRL_Extension, rdfs_closure=True,
axiomatic_triples=True, datatype_axioms=True).expand(h)
h = utils.transitive_closure(h)
filename = os.path.dirname(os.path.abspath(knowledgefile)) + '/' + outputfile
h.serialize(destination=filename, format='turtle')

Expand Down
10 changes: 4 additions & 6 deletions semantic-model/shacl2flink/create_ngsild_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import argparse
import lib.utils as utils
import lib.configs as configs
import owlrl


def parse_args(args=sys.argv[1:]):
Expand Down Expand Up @@ -112,11 +111,11 @@ def main(shaclfile, knowledgefile, modelfile, output_folder='output'):
utils.create_output_folder(output_folder)
with open(os.path.join(output_folder, "ngsild-models.sqlite"), "w")\
as sqlitef:
g = Graph()
g = Graph(store="Oxigraph")
g.parse(shaclfile)
model = Graph()
model = Graph(store="Oxigraph")
model.parse(modelfile)
knowledge = Graph()
knowledge = Graph(store="Oxigraph")
knowledge.parse(knowledgefile)
attributes_model = model + g + knowledge

Expand Down Expand Up @@ -161,8 +160,7 @@ def main(shaclfile, knowledgefile, modelfile, output_folder='output'):
print(";", file=sqlitef)

# Create ngsild tables by sparql
owlrl.DeductiveClosure(owlrl.OWLRL_Extension, rdfs_closure=True, axiomatic_triples=True,
datatype_axioms=True).expand(knowledge)
knowledge = utils.transitive_closure(knowledge)
table_model = model + knowledge + g
qres = table_model.query(ngsild_tables_query_noinference)
tables = {}
Expand Down
8 changes: 3 additions & 5 deletions semantic-model/shacl2flink/create_ngsild_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lib.utils as utils
import lib.configs as configs
from ruamel.yaml.scalarstring import (SingleQuotedScalarString as sq)
import owlrl


field_query = """
Expand Down Expand Up @@ -65,12 +64,11 @@ def parse_args(args=sys.argv[1:]):
def main(shaclfile, knowledgefile, output_folder='output'):
yaml = ruamel.yaml.YAML()
utils.create_output_folder(output_folder)
g = Graph()
g = Graph(store="Oxigraph")
g.parse(shaclfile)
h = Graph()
h = Graph(store="Oxigraph")
h.parse(knowledgefile)
owlrl.DeductiveClosure(owlrl.OWLRL_Extension, rdfs_closure=True, axiomatic_triples=True,
datatype_axioms=True).expand(h)
h = utils.transitive_closure(h)
g += h
tables = {}
qres = g.query(field_query)
Expand Down
7 changes: 2 additions & 5 deletions semantic-model/shacl2flink/create_rdf_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import sys
import math
import hashlib
import owlrl
import ruamel.yaml
import rdflib
from lib import utils
Expand Down Expand Up @@ -124,11 +123,9 @@ def main(knowledgefile, namespace, output_folder='output'):
primary_key = ['subject', 'predicate', 'index']

# Create RDF statements to insert data
g = rdflib.Graph()
g = rdflib.Graph(store="Oxigraph")
g.parse(knowledgefile)
owlrl.DeductiveClosure(owlrl.OWLRL_Extension, rdfs_closure=True, axiomatic_triples=True,
datatype_axioms=True).expand(g)

g = utils.transitive_closure(g)
statementsets = create_statementset(g)
sqlstatements = ''
for statementset in statementsets:
Expand Down
28 changes: 14 additions & 14 deletions semantic-model/shacl2flink/lib/bgp_translation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,37 +352,37 @@ def create_ngsild_mappings(ctx, sorted_graph):
equivalence = []
variables = []
for key, value in ctx['classes'].items():
sparqlvalidationquery += f'?{key} rdfs:subClassOf <{value.toPython()}> .\n'
sparqlvalidationquery += f'<{value.toPython()}> rdfs:subClassOf ?{key} .\n'
sparqlvalidationquery += f'{{?{key} rdfs:subClassOf <{value.toPython()}> .\n'
sparqlvalidationquery += f'<{value.toPython()}> rdfs:subClassOf ?{key} .}}\n'
for entity in entity_variables.keys():
sparqlvalidationquery += f'?{entity}shapex sh:targetClass/rdfs:subClassOf* ?{entity} .\n'
sparqlvalidationquery += f'?{entity}shape sh:targetClass ?{entity} .\n'
sparqlvalidationquery += f'{{?{entity}shapex sh:targetClass/rdfs:subClassOf ?{entity} .\n'
sparqlvalidationquery += f'?{entity}shape sh:targetClass ?{entity} .}}\n'
variables.append(entity)
for s, p, o in sorted_graph.triples((entity, None, None)):
property_class = sorted_graph.value(o, ngsild['hasObject'])
if property_class is not None:
sparqlvalidationquery += f'?{s}shape sh:property [ sh:path <{p}> ; sh:property \
[ sh:path ngsild:hasObject; sh:class ?{property_class} ] ] .\n'
sparqlvalidationquery += f'{{?{s}shape sh:property [ sh:path <{p}> ; sh:property \
[ sh:path ngsild:hasObject; sh:class ?{property_class} ] ] .}}\n'
for property in property_variables:
variables.append(property)
sparqlvalidationquery += f'?{property}shapex sh:targetClass/rdfs:subClassOf* ?{property} .\n'
sparqlvalidationquery += f'?{property}shape sh:targetClass ?{property} .\n'
sparqlvalidationquery += f'{{?{property}shapex sh:targetClass/rdfs:subClassOf ?{property} .\n'
sparqlvalidationquery += f'?{property}shape sh:targetClass ?{property} .}}\n'
for s, p, o in sorted_graph.triples((None, ngsild['hasValue'], property)):
for p in sorted_graph.predicates(object=s):
sparqlvalidationquery += f'?{property}shape sh:property [ sh:path <{p}> ; ] .\n'
sparqlvalidationquery += f'{{?{property}shape sh:property [ sh:path <{p}> ; ] .}}\n'
for subj in sorted_graph.subjects(predicate=p, object=s):
if isinstance(subj, Variable):
sparqlvalidationquery += f'{subj.toPython()} rdfs:subClassOf* ?{property} .\n'
sparqlvalidationquery += f'{{{subj.toPython()} rdfs:subClassOf ?{property} .}}\n'
for property in time_variables:
variables.append(property)
sparqlvalidationquery += f'?{property}shapex sh:targetClass/rdfs:subClassOf* ?{property} .\n'
sparqlvalidationquery += f'?{property}shape sh:targetClass ?{property} .\n'
sparqlvalidationquery += f'{{?{property}shapex sh:targetClass/rdfs:subClassOf ?{property} .\n'
sparqlvalidationquery += f'?{property}shape sh:targetClass ?{property} .}}\n'
for s, p, o in sorted_graph.triples((None, ngsild['observedAt'], property)):
for p in sorted_graph.predicates(object=s):
sparqlvalidationquery += f'?{property}shape sh:property [ sh:path <{p}> ; ] .\n'
sparqlvalidationquery += f'{{?{property}shape sh:property [ sh:path <{p}> ; ] .}}\n'
for subj in sorted_graph.subjects(predicate=p, object=s):
if isinstance(subj, Variable):
sparqlvalidationquery += f'{subj.toPython()} rdfs:subClassOf ?{property}'
sparqlvalidationquery += f'{{{subj.toPython()} rdfs:subClassOf ?{property}}}'

query = basequery
for variable in variables:
Expand Down
7 changes: 3 additions & 4 deletions semantic-model/shacl2flink/lib/shacl_sparql_to_sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from rdflib import Graph
import owlrl
import os
import sys
import re
Expand Down Expand Up @@ -89,12 +88,12 @@ def translate(shaclfile, knowledgefile, prefixes):
(statementset, tables, views): statementset in yaml format
"""
g = Graph()
h = Graph()
g = Graph(store="Oxigraph")
h = Graph(store="Oxigraph")
g.parse(shaclfile)
h.parse(knowledgefile)
g += h
owlrl.RDFSClosure.RDFS_Semantics(g, axioms=True, daxioms=False, rdfs=True).closure()
g = utils.transitive_closure(g)
tables_all = []
statementsets = []
sqlite = ''
Expand Down
14 changes: 12 additions & 2 deletions semantic-model/shacl2flink/lib/sparql_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ def translate(ctx, elem):
elif elem.name == 'Builtin_NOTEXISTS':
return translate_notexists(ctx, elem)
elif elem.name == 'Distinct':
ctx['target_modifiers'].append('Distinct')
translate(ctx, elem.p)
return translate_distinct(ctx, elem)
elif elem.name == 'LeftJoin':
return translate_left_join(ctx, elem)
elif elem.name == 'Extend':
Expand Down Expand Up @@ -240,6 +239,13 @@ def translate(ctx, elem):
supported!')


def translate_distinct(ctx, elem):
ctx['target_modifiers'].append('Distinct')
translate(ctx, elem.p)
elem['target_sql'] = elem.p['target_sql']
elem['where'] = elem.p['where']


def translate_unary_not(ctx, elem):
expression = translate(ctx, elem.expr)
return f" NOT ({expression}) "
Expand Down Expand Up @@ -608,7 +614,11 @@ def remap_join_constraint_to_where(node):


def copy_context(ctx):
# avoid deep copy of graph. it is not needed and creates problems with oxigraph
graph = ctx['g']
ctx_copy = copy.deepcopy(ctx)
# copy graph manually into the new structure
ctx_copy['g'] = graph
ctx_copy['target_sql'] = ''
ctx_copy['target_modifiers'] = []
ctx_copy['sql_tables'] = ctx['sql_tables']
Expand Down
67 changes: 67 additions & 0 deletions semantic-model/shacl2flink/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import os
import re
import rdflib
from rdflib import RDFS, OWL, RDF, Graph, Literal, XSD
from urllib.parse import urlparse
from enum import Enum
from collections import deque


class WrongSparqlStructure(Exception):
Expand Down Expand Up @@ -594,3 +596,68 @@ def split_statementsets(statementsets, max_map_size):
grouped_strings.append(current_group)

return grouped_strings


# This creates a transitive closure of all OWL.TransitiveProperty elements given in the ontology
# plus rdfs:subClassOf. In addition is makes sure that every rdfs:Class and owl:Class are reflexive
def transitive_closure(g):
closure_graph = Graph()
closure_graph += g

# Ensure rdfs:subClassOf is defined as an OWL.TransitiveProperty if it is not already defined
if (RDFS.subClassOf, RDF.type, OWL.TransitiveProperty) not in closure_graph:
closure_graph.add((RDFS.subClassOf, RDF.type, OWL.TransitiveProperty))

# Handle subClassOf separately
# Add reflexive subClassOf relationships for all classes
for s in closure_graph.subjects(predicate=RDFS.subClassOf):
if (s, RDFS.subClassOf, s) not in closure_graph:
closure_graph.add((s, RDFS.subClassOf, s))

# Add reflexive subClassOf relationships for every element of type rdfs:Class and owl:Class
for s in closure_graph.subjects(predicate=RDF.type, object=RDFS.Class):
if (s, RDFS.subClassOf, s) not in closure_graph:
closure_graph.add((s, RDFS.subClassOf, s))
for s in closure_graph.subjects(predicate=RDF.type, object=OWL.Class):
if (s, RDFS.subClassOf, s) not in closure_graph:
closure_graph.add((s, RDFS.subClassOf, s))

# Handle other transitive properties
transitive_properties = set(closure_graph.subjects(predicate=RDF.type, object=OWL.TransitiveProperty))
for prop in transitive_properties:
# Use a queue for BFS for each transitive property
queue = deque(closure_graph.triples((None, prop, None)))
visited = set(queue)

while queue:
s1, _, o1 = queue.popleft()

# Find all objects that o1 is related to via the same property
for _, _, o2 in closure_graph.triples((o1, prop, None)):
if (s1, prop, o2) not in visited:
# Add new inferred triple
closure_graph.add((s1, prop, o2))
queue.append((s1, prop, o2))
visited.add((s1, prop, o2))

# Handle generalization of rdf:Bag/rdf:Container
for bag in closure_graph.subjects(predicate=RDF.type, object=RDF.Bag):
# Add rdf:Bag and rdfs:Container types
closure_graph.add((bag, RDF.type, RDFS.Container))

# Collect all rdf:_n properties (e.g., rdf:_1, rdf:_2, etc.)
members = []
for p, o in closure_graph.predicate_objects(subject=bag):
if p.startswith(str(RDF) + "_"):
members.append(o)
# Ensure all values are xsd:string literals
if not isinstance(o, Literal) or o.datatype != XSD.string:
closure_graph.set((bag, p, Literal(str(o), datatype=XSD.string)))

# Add rdfs:member relationships
if members:
closure_graph.add((bag, RDFS.member, Literal(members[0], datatype=XSD.string)))
for member in members[1:]:
closure_graph.add((bag, RDFS.member, Literal(member, datatype=XSD.string)))

return closure_graph
9 changes: 5 additions & 4 deletions semantic-model/shacl2flink/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
rdflib==6.2.0
owlrl==6.0.2
pyshacl==0.20.0
rdflib==7.1.1
pyshacl==0.29.0
ruamel.yaml==0.17.21
click==8.1.3
Jinja2==3.1.3
setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
apache-flink==1.17.1
apache-flink==1.17.2
oxrdflib==0.4.0

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX iff: <https://industry-fusion.com/types/v0.9/>

iff:entity a iff:class ;
a rdfs:class .
a rdfs:Class .
iff:machine a iff:class ;
a rdfs:class .
a rdfs:Class .
iff:filter rdfs:subClassOf iff:machine ;
a iff:class .
iff:plasmacutter rdfs:subClassOf iff:cutter ;
Expand All @@ -22,7 +22,7 @@ iff:operationSchedule rdfs:subClassOf iff:linkedEntity ;
a iff:class .
iff:maintenanceInterval rdfs:subClassOf iff:linkedEntity ;
a iff:class .
iff:machineState a rdfs:class .
iff:machineState a rdfs:Class .
iff:state_OFF a iff:machineState .
iff:state_OFF iff:stateValidFor iff:filter, iff:cutter .
iff:state_ON a iff:machineState .
Expand Down Expand Up @@ -50,7 +50,7 @@ iff:WC1 iff:moreExpensiveThan iff:WC0 .
iff:WC1 rdf:_n ("1.4301" "1.4302" "1.4303" "1.4304" "1.4305") .
iff:WC2 rdf:_n ("1.3301" "1.3302" "1.3303" "1.3304" "1.3305") .
iff:WC3 rdf:_n ("1.5301" "1.5302" "1.5303" "1.5304" "1.5305") .
iff:Severity a rdfs:class .
iff:Severity a rdfs:Class .
iff:severityWarning a iff:Severity .
iff:severityWarning rdfs:label 'warning' .
iff:severityMajor a iff:Severity .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ iff:StateOnFilterShape
PREFIX iff: <https://industry-fusion.com/types/v0.9/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT $this ?v1 ?pc ?v2
SELECT DISTINCT $this ?v1 ?pc ?v2
where {
$this iff:state [ <https://uri.etsi.org/ngsi-ld/hasValue> ?v1 ] .
?pc rdf:type iff:plasmacutter .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ iff:plasmacutter_test rdfs:subClassOf iff:cutter_test ;
### End of Test Content

iff:entity a iff:class ;
a rdfs:class .
a rdfs:Class .
iff:machine a iff:class ;
a rdfs:class .
a rdfs:Class .
iff:filter rdfs:subClassOf iff:machine ;
a iff:class .
iff:plasmacutter rdfs:subClassOf iff:cutter ;
Expand All @@ -29,11 +29,11 @@ iff:filterCartridge rdfs:subClassOf iff:entity ;
a iff:class .


iff:scheduleEntity a iff:class .
iff:scheduleEntity a iff:class, rdfs:Class .
iff:oeeTemplate rdfs:subClassOf iff:scheduleEntity .

### Machine states
iff:machineState a rdfs:class .
iff:machineState a rdfs:Class .
iff:state_OFF a iff:machineState .
iff:state_OFF iff:stateValidFor iff:filter, iff:cutter .
iff:state_ON a iff:machineState .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@


@patch('create_knowledge_closure.rdflib')
@patch('create_knowledge_closure.owlrl')
def test_main(mock_owlrl, mock_rdflib, tmp_path):
@patch('create_knowledge_closure.utils')
def test_main(mock_utils, mock_rdflib, tmp_path, monkeypatch):
def identity(val):
return val
monkeypatch.setattr(mock_utils, "transitive_closure", identity)
create_knowledge_closure.main('kms/knowledge.ttl', 'knowledge_closure.ttl')
Loading

0 comments on commit 70de03d

Please sign in to comment.