diff --git a/rel2graph/core/converter.py b/rel2graph/core/converter.py index 518d3b2..3dd9bd3 100644 --- a/rel2graph/core/converter.py +++ b/rel2graph/core/converter.py @@ -103,13 +103,20 @@ def commit_batch(to_create: Subgraph, to_merge: Subgraph) -> None: nodes_committed = 0 relationships_committed = 0 - # Creating does not rely on synchronous executions - if len(to_create.nodes) + len(to_create.relationships) > 0: + # Creating nodes does not rely on serialized executions + # If there are relationships to create, we need to serialize the creation + # TODO: We could split the creation of nodes and relationships into two separate branches, might be more efficient + # but considering that in almost all cases no relationships are created in the first loop, it's not worth it + if len(to_create.relationships) > 0: + with __process_config.graph_lock: + with __process_config.graph_driver.session() as session: + commit_wrap(lambda: create(to_create, session)) + relationships_committed += len(to_create.relationships) + elif len(to_create.nodes) > 0: with __process_config.graph_driver.session() as session: commit_wrap(lambda: create(to_create, session)) nodes_committed += len(to_create.nodes) - relationships_committed += len(to_create.relationships) - + # Merging nodes requires serialization (synchronous executions) between processes # Using locks to enforce this if len(to_merge.nodes) + len(to_merge.relationships) > 0: diff --git a/rel2graph/neo4j/__init__.py b/rel2graph/neo4j/__init__.py index 9c485f4..8f703ec 100644 --- a/rel2graph/neo4j/__init__.py +++ b/rel2graph/neo4j/__init__.py @@ -45,7 +45,7 @@ def pull(graph: Subgraph, session: Session): graph (Subgraph): The graph to create. session (Session): The `session `_ to use. """ - session.execute_write(graph.__db_pull__) + session.execute_read(graph.__db_pull__) def match_nodes(session: Session, *labels: List[str], **properties: dict): diff --git a/tests/integration/resources/schema_concurrency.yaml b/tests/integration/resources/schema_concurrency.yaml new file mode 100644 index 0000000..855a03e --- /dev/null +++ b/tests/integration/resources/schema_concurrency.yaml @@ -0,0 +1,8 @@ +ENTITY("Entity"): + NODE("Test"): + + uid = Entity.uid + + +ENTITY("Relationship"): + RELATIONSHIP(MATCH("Test", uid=Relationship.to), "FROM", MATCH("Test", uid=Relationship.from)): + MERGE_RELATIONSHIPS(RELATIONSHIP(MATCH("Test", uid=Relationship.from), "TO", MATCH("Test", uid=Relationship.to))): diff --git a/tests/integration/test_concurrency.py b/tests/integration/test_concurrency.py new file mode 100644 index 0000000..8e8125b --- /dev/null +++ b/tests/integration/test_concurrency.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Integration tests for testing the synchronous conversion of relations. Related to issue #20. + +authors: Julian Minder +""" + +import pytest + +import pandas as pd +import logging +import numpy as np + +from rel2graph import Converter, IteratorIterator, register_attribute_postprocessor, Attribute, register_subgraph_preprocessor, GlobalSharedState, register_subgraph_postprocessor +from rel2graph.utils import load_file +from rel2graph.relational_modules.pandas import PandasDataFrameIterator +from rel2graph.neo4j import match_relationships, push, pull +from rel2graph.common_modules import MERGE_RELATIONSHIPS + +from helpers import * + + +# As this issue is related to multiple workers, we repeat the test multiple times +@pytest.mark.parametrize('execution_number', range(10)) +def test_concurrent_relationships(execution_number, session, uri, auth): + schema = load_file("tests/integration/resources/schema_concurrency.yaml") + + entities = pd.DataFrame({"uid": range(40)}) + + # 120 relations between 20 entities + relations = pd.DataFrame({"from": list(range(20))*6, "to": [i+20 for i in range(20) for _ in range(6)]}) + unique_pairs = len(relations.drop_duplicates()) + print(unique_pairs) + + iterator = IteratorIterator([PandasDataFrameIterator(entities, "Entity"), PandasDataFrameIterator(relations, "Relationship")]) + + converter = Converter(schema, iterator, uri, auth, num_workers=12, batch_size=10) + converter(skip_relationships=False) + + + assert num_relationships(session) == 120+unique_pairs + assert num_nodes(session) == 40 \ No newline at end of file diff --git a/tests/integration/test_merge.py b/tests/integration/test_merge.py index 1ec4f10..056004a 100644 --- a/tests/integration/test_merge.py +++ b/tests/integration/test_merge.py @@ -58,7 +58,7 @@ def test_standart_same_resource(config, session, uri, auth): @pytest.mark.parametrize("config",[(1,True), (1, False) ,(5, False)]) -def test_merge_nodesasdf(config, session, uri, auth): +def test_merge_nodes(config, session, uri, auth): schema = """ ENTITY("Entity"): NODE("Entity") node: