From 755b9c2c247ff6101ba5ea6a6e399a716e42f817 Mon Sep 17 00:00:00 2001 From: Alfred Rubin Date: Mon, 11 Sep 2023 15:04:04 +0200 Subject: [PATCH] realigning the branches --- rdflib_neo4j/neo4j.py | 176 ------------------------------------ rdflib_neo4j/neo4jcypher.py | 169 ---------------------------------- 2 files changed, 345 deletions(-) delete mode 100644 rdflib_neo4j/neo4j.py delete mode 100644 rdflib_neo4j/neo4jcypher.py diff --git a/rdflib_neo4j/neo4j.py b/rdflib_neo4j/neo4j.py deleted file mode 100644 index 0c6adc2..0000000 --- a/rdflib_neo4j/neo4j.py +++ /dev/null @@ -1,176 +0,0 @@ -from rdflib.store import Store -from rdflib import URIRef, Literal, BNode -from neo4j import GraphDatabase -from neo4j import WRITE_ACCESS - -__all__ = ["N10sNeo4jStore"] - -class N10sNeo4jStore(Store): - - context_aware = False - formula_aware = True - transaction_aware = True - graph_aware = True - __TRIPLE_COUNT_QUERY = "call { match (n:Resource) return sum(size(keys(n)) - 1) + sum(size(labels(n)) - 1) as ct " \ - "union match (:Resource)-[r]->() return count(r) as ct } return sum(ct) as tripleCount" - - def __init__(self, config=None, identifier=None): - self.config = config - self.inbatch = False - self.tripleBuffer = [] - self.bufferMaxSize = 10000 - super(N10sNeo4jStore, self).__init__(config) - self.__namespace = {} - self.__prefix = {} - self.__open = False - - - def open(self, config, create=False): - self.driver = GraphDatabase.driver(config['uri'], auth=(config['auth']['user'], config['auth']['pwd'])) - self.session = self.driver.session(database=config.get('database','neo4j'), default_access_mode=WRITE_ACCESS) - result = self.session.run("call n10s.graphconfig.show() yield param, value return count(*) as params") - storeReady = next((True for x in result if x["params"] > 0), False) - print('store ready:' + str(storeReady)) - self.__open = storeReady - - #if read access only... no need to check the GraphConfig is present - - def is_open(self): - return self.__open - - def close(self, commit_pending_transaction=False): - self.driver.close() - self.__open = False - - def destroy(self, configruation): - print("destroying the store") - - def __serialise(self, triple): - (subject, predicate, object) = triple - subject = "bnode://" + subject if isinstance(subject, BNode) else subject - object = "bnode://" + object if isinstance(object, BNode) else object - if isinstance(object, Literal): - lang = object.language or None - datatype = object.datatype or None - if (lang): - serialisedTriple = "<{s}> <{p}> \"{o}\"@{l} .".format(s=subject, p=predicate, o=object, l=lang) - else: - serialisedTriple = "<{s}> <{p}> \"{o}\"{dtprefix}{dt}{dtsuffix} ."\ - .format(s=subject, p=predicate, o=object, dtprefix="^^<" * bool(datatype),dt=datatype if datatype else "",dtsuffix=">" * bool(datatype)) - - else: - serialisedTriple = "<{s}> <{p}> <{o}> .".format(s=subject, p=predicate, o=object) - - return serialisedTriple - - def add(self, triple, context, quoted=False): - assert self.__open, "The Store must be open." - assert context != self, "Can not add triple directly to store" - Store.add(self, triple, context, quoted) - if self.inbatch: - self.tripleBuffer.append(self.__serialise(triple)) - if len(self.tripleBuffer)>= self.bufferMaxSize: - self.__flushBuffer() - else: - result = self.session.run("CALL n10s.rdf.import.inline($rdf,'N-Triples')", rdf=self.__serialise(triple)).single() - if(result["terminationStatus"]) == "KO": - raise Exception("Could not persist triple in Neo4j: ", result["extraInfo"]) - - #self.refreshNamespaces() - - def remove(self, triple, context, txn=None): - assert self.__open, "The Store must be open." - Store.remove(self, triple, context) - - for result in self.triples(triple): - (spo,ctx) = result - result= self.session.run("CALL n10s.rdf.delete.inline($rdf,'N-Triples')", rdf=self.__serialise(spo)).single() - if (result["terminationStatus"]) == "KO": - raise Exception("Could not delete triple from Neo4j: ", result["extraInfo"]) - - def refreshNamespaces(self): - nsresults = self.session.run("call n10s.nsprefixes.list()") - for x in nsresults: - self.__namespace[x["prefix"]] = x["namespace"] - self.__prefix[x["namespace"]] = x["prefix"] - - - def triples(self, triple_pattern, context=None): - assert self.__open, "The Store must be open." - (subject, predicate, object) = triple_pattern - if isinstance(object, Literal): - lang = object.language or None - datatype = object.datatype or None - result = self.session.run("call n10s.rdf.export.spo($spat, $ppat, $opat, $lit, $dt, $lan) " - "yield subject, predicate, object, isLiteral, literalType", - spat = subject, ppat = predicate, opat = object, lit = True, dt = datatype, lan = lang) - else: - result = self.session.run("call n10s.rdf.export.spo($spat, $ppat, $opat) " - "yield subject, predicate, object, isLiteral, literalType", - spat=subject, ppat=predicate, opat=object) - - for record in result: - yield (URIRef(record["subject"]),URIRef(record["predicate"]), - Literal(record["object"], datatype=record["literalType"]) if record["isLiteral"] else URIRef(record["object"])), context #lang=, - - def query(self, query, initNs, initBindings, queryGraph, **kwargs): - assert self.__open, "The Store must be open." - result = self.session.run("call n10s.rdf.export.cypher($cypher, $params)" - "yield subject, predicate, object, isLiteral, literalType", - cypher=query, params=initBindings) - for record in result: - yield (URIRef(record["subject"]),URIRef(record["predicate"]), - Literal(record["object"], datatype=record["literalType"]) if record["isLiteral"] else URIRef(record["object"])) - - def add_graph(self, graph): - self.session.run("CALL n10s.rdf.import.inline($therdf,'Turtle')", - therdf=graph.serialize(format="turtle").decode("utf-8")) - - def __len__(self, context=None): - #this is fine for RDF imported data, but this should also work with LPG (look at GraphConfig) - result = self.session.run(N10sNeo4jStore.__TRIPLE_COUNT_QUERY) - return next((x["tripleCount"] for x in result), 0) - - def bind(self, prefix, namespace): - assert self.__open, "The Store must be open." - if prefix != '': - nsresults = self.session.run("call n10s.nsprefixes.add($pref,$ns)", pref = prefix, ns = namespace) - else: - nsresults = [] - - for x in nsresults: - self.__namespace[x["prefix"]] = x["namespace"] - self.__prefix[x["namespace"]] = x["prefix"] - - - def namespace(self, prefix): - return self.__namespace.get(prefix, None) - - def prefix(self, namespace): - return self.__prefix.get(namespace, None) - - def namespaces(self): - for prefix, namespace in self.__namespace.items(): - yield prefix, namespace - - def __flushBuffer(self): - assert self.__open, "The Store must be open." - print("Flushing {bufferSize} buffered Triples to DB".format(bufferSize=len(self.tripleBuffer))) - self.session.run("CALL n10s.rdf.import.inline($rdf,'N-Triples')", rdf='\n'.join(self.tripleBuffer)) - self.tripleBuffer = [] - - def startBatchedWrite(self, bufferSize = 10000): - assert self.__open, "The Store must be open." - self.inbatch = True - self.bufferMaxSize = bufferSize - print("start batch process. Triples will be buffered and flushed in batches of {bufferSize}".format(bufferSize=bufferSize)) - - def endBatch(self): - if self.inbatch: - assert self.__open, "The Store must be open." - if len(self.tripleBuffer)>0: - self.__flushBuffer() - self.inbatch = False - self.bufferMaxSize = 10000 - - diff --git a/rdflib_neo4j/neo4jcypher.py b/rdflib_neo4j/neo4jcypher.py deleted file mode 100644 index 6d45774..0000000 --- a/rdflib_neo4j/neo4jcypher.py +++ /dev/null @@ -1,169 +0,0 @@ -from rdflib.store import Store -from rdflib import Literal, RDF, URIRef -from neo4j import GraphDatabase -from neo4j import WRITE_ACCESS -import logging -from decimal import Decimal - -__all__ = ["CypherNeo4jStore"] - -class CypherNeo4jStore(Store): - - context_aware = True - formula_aware = True - transaction_aware = True - graph_aware = True - - def __init__(self, config=None, identifier=None): - self.config = config - self.inbatch = False - self.queryBuffer = {} - self.paramBuffer = {} - self.bufferMaxSize = 10000 - self.bufferActualSize = 0 - super(CypherNeo4jStore, self).__init__(config) - self.__namespace = {} - self.__prefix = {} - self.__open = False - - - def open(self, config, create=False): - self.driver = GraphDatabase.driver(config['uri'], auth=(config['auth']['user'], config['auth']['pwd'])) - - self.session = self.driver.session(database=config.get('database','neo4j'), default_access_mode=WRITE_ACCESS) - - # test connectivity to backend and check that constraint on :Resource(uri) is present - constraint_check = """ - show constraints yield * - where type = "UNIQUENESS" - and entityType = "NODE" - and labelsOrTypes = ["Resource"] - and properties = ["uri"] - return count(*) = 1 as constraint_found - """ - result = self.session.run(constraint_check) - constraint_found = next((True for x in result if x["constraint_found"]), False) - print("Uniqueness constraint on :Resource(uri) {yes_or_no}found. {suffix}" - .format(yes_or_no = "" if constraint_found else "not ", - suffix = "" if constraint_found else "Run the following command on the Neo4j DB: " - "CREATE CONSTRAINT n10s_unique_uri FOR (r:Resource) REQUIRE r.uri IS UNIQUE")) - self.__open = True - - def is_open(self): - return self.__open - - def close(self, commit_pending_transaction=False): - self.session.close() - self.driver.close() - self.__open = False - - def destroy(self, configruation): - print("destroying the store.") - - def add(self, triple, context=None, quoted=False): - assert self.__open, "The Store must be open." - assert context != self, "Can not add triple directly to store" - (subject, predicate, object) = triple - - self.bufferActualSize += 1 - - if isinstance(object, Literal): - # 'special' datatypes are converted to strings and lang tags are lost (for now) - # also multivalued props are overwritten - lang = object.language or None - - #python driver does not support decimal params - value = float(object.toPython()) if type(object.toPython()) == Decimal else object.toPython() - - prop_key = "prop_" + shorten(predicate) - if (prop_key not in self.paramBuffer.keys()): - self.paramBuffer[prop_key] = [{"uri": subject, "val": value }] - self.queryBuffer[prop_key] = "unwind $params as pair " \ - "merge (x:Resource {{ uri:pair.uri }}) " \ - "set x.`{propname}` = pair.val".format( - propname=shorten(predicate)) - else: - self.paramBuffer[prop_key].append({"uri": subject, "val": value}) - - elif (predicate == RDF.type): - - type_key = "type_" + shorten(object) - if (type_key not in self.paramBuffer.keys()): - self.paramBuffer[type_key] = [subject] - self.queryBuffer[type_key] = "unwind $params as uri " \ - "merge (r:Resource {{ uri: uri }}) set r:`{type}`".format( - type=shorten(object)) - else: - self.paramBuffer[type_key].append(subject) - - else: - rel_key = "rel_" + shorten(predicate) - if (rel_key not in self.paramBuffer.keys()): - self.paramBuffer[rel_key] = [{"uri": subject, "val": object}] - self.queryBuffer[rel_key] = "unwind $params as pair " \ - "merge (from:Resource {{ uri:pair.uri }}) " \ - "merge (to:Resource {{ uri:pair.val }}) " \ - "merge (from)-[:`{propname}`]->(to) ".format( - propname=shorten(predicate)) - else: - self.paramBuffer[rel_key].append({"uri": subject, "val": object}) - - if self.inbatch: - if self.bufferActualSize>= self.bufferMaxSize: - self.__flushBuffer() - else: - self.__flushBuffer() - - def remove(self, triple, context=None, txn=None): - return "this is a streamer no state, no triple removal" - - - def __len__(self, context=None): - # no triple state, jsut a streamer - return 0 - - def __flushBuffer(self): - assert self.__open, "The Store must be open." - - for key in self.queryBuffer.keys(): - try: - self.session.run(self.queryBuffer[key], params = self.paramBuffer[key]) - except TypeError: - print("query:",self.queryBuffer[key],"params:",self.paramBuffer[key]) - - self.bufferActualSize = 0 - - def startBatchedWrite(self, bufferSize = 10000): - assert self.__open, "The Store must be open." - self.inbatch = True - self.bufferMaxSize = bufferSize - logging.info("starting import. Batch size {bufferSize}".format(bufferSize=bufferSize)) - - def endBatchedWrite(self): - if self.inbatch: - assert self.__open, "The Store must be open." - if self.bufferActualSize >0: - self.__flushBuffer() - self.inbatch = False - logging.info("batch import done") - -def getLocalPart(uri): - pos = -1 - pos = uri.rfind('#') - if pos < 0: - pos = uri.rfind('/') - if pos < 0: - pos = uri.rindex(':') - return uri[pos + 1:] - -def getNamespacePart(uri): - pos = -1 - pos = uri.rfind('#') - if pos < 0: - pos = uri.rfind('/') - if pos < 0: - pos = uri.rindex(':') - return uri[0:pos + 1] - -def shorten(uri): - return getLocalPart(str(uri))