diff --git a/rel2graph/core/converter.py b/rel2graph/core/converter.py index 5a74190..518d3b2 100644 --- a/rel2graph/core/converter.py +++ b/rel2graph/core/converter.py @@ -268,7 +268,7 @@ def cleanup_process_state(): class Converter: """The converter handles the whole conversion pipeline. """ - def __init__(self, schema: str, iterator: ResourceIterator, neo4j_uri: str, neo4j_auth: Auth, num_workers: int = max(mp.cpu_count()-2, 1), serialize: bool = False, batch_size: int = 5000) -> None: + def __init__(self, schema: str, iterator: ResourceIterator, neo4j_uri: str, neo4j_auth: Auth, num_workers: int = None, serialize: bool = False, batch_size: int = 5000) -> None: """Initialises a converter. Note that this is a singleton and only the most recent instantiation is valid. Args: @@ -281,8 +281,13 @@ def __init__(self, schema: str, iterator: ResourceIterator, neo4j_uri: str, neo4 and committed to the graph in the same order as they are returned by the iterator. Note that you can't set both serialize to true and set num_workers > 1. (default: False) batch_size: The batch size for the parallel processing. (default: 5000) """ - if serialize and num_workers > 1: - raise ValueError("You can't use serialization and parallel processing (num_workers > 1) at the same time.") + if serialize: + if num_workers is not None and num_workers > 1: + raise ValueError("You can't use serialization and parallel processing (num_workers > 1) at the same time.") + else: + num_workers = 1 + elif num_workers is None: + num_workers = mp.cpu_count() - 2 # Verify connection to neo4j self._neo4j_uri = neo4j_uri diff --git a/rel2graph/core/factories/matcher.py b/rel2graph/core/factories/matcher.py index b62533e..5b946fd 100644 --- a/rel2graph/core/factories/matcher.py +++ b/rel2graph/core/factories/matcher.py @@ -89,7 +89,7 @@ def match(self, resource: Resource) -> List[Node]: else: value = "r" clause = _match_clause("n", (tuple(parsed_labels), *keys), value) - clause, params = cypher_join("UNWIND $data AS r", clause, "RETURN LABELS(n) as labels, n as properties, elementId(n) as identity", data=[values]) + clause, params = cypher_join("UNWIND $data AS r", clause, "RETURN LABELS(n) as labels, n as properties, id(n) as identity", data=[values]) with Matcher.graph_driver.session() as session: match_list = session.run(clause, **params).data() diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index b637250..44c931a 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -55,16 +55,18 @@ def num_relationships(session): def get_nodes(session, labels=[]): if not isinstance(labels, list) and not isinstance(labels, tuple): labels = [labels] - res = session.run("MATCH (n{}) RETURN LABELS(n) as labels, n as properties, elementId(n) as identity".format(":" + ":".join(labels) if len(labels) else "")).data() + # TODO: id() is deprecated, in the future we need to move to something else + res = session.run("MATCH (n{}) RETURN LABELS(n) as labels, n as properties, id(n) as identity".format(":" + ":".join(labels) if len(labels) else "")).data() match_list = [Node.from_dict(r['labels'], r['properties'], identity=r["identity"]) for r in res] return match_list def get_relationships(session, types=[]): if not isinstance(types, list) and not isinstance(types, tuple): types = [types] - res = session.run("""MATCH (a)-[r{}]->(b) RETURN TYPE(r) as type, PROPERTIES(r) as properties, elementId(r) as identity, - LABELS(a) as start_labels, a as start_properties, elementId(a) as start, - LABELS(b) as end_labels, b as end_properties, elementId(b) as end + # TODO: id() is deprecated, in the future we need to move to something else + res = session.run("""MATCH (a)-[r{}]->(b) RETURN TYPE(r) as type, PROPERTIES(r) as properties, id(r) as identity, + LABELS(a) as start_labels, a as start_properties, id(a) as start, + LABELS(b) as end_labels, b as end_properties, id(b) as end """.format(":" + "|".join(types) if len(types) else "")).data() match_list = [Relationship(Node.from_dict(r['start_labels'], r['start_properties'], identity=r["start"]),