Skip to content

Commit

Permalink
fix: support explicit transactions in migrations and fix mention count (
Browse files Browse the repository at this point in the history
#140)

* fix: support explicit transactions in migrations

* fix: entity count
  • Loading branch information
ClemDoum authored Jan 3, 2024
1 parent d8211dc commit 6030312
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 18 deletions.
4 changes: 2 additions & 2 deletions neo4j-app/neo4j_app/core/neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
migration_v_0_3_0_tx,
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_6_0,
migration_v_0_7_0_tx,
)

Expand Down Expand Up @@ -46,7 +46,7 @@
V_0_6_0 = Migration(
version="0.6.0",
label="Add mention counts to named entity document relationships",
migration_fn=migration_v_0_6_0_tx,
migration_fn=migration_v_0_6_0,
)
V_0_7_0 = Migration(
version="0.7.0",
Expand Down
16 changes: 13 additions & 3 deletions neo4j-app/neo4j_app/core/neo4j/migrations/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from datetime import datetime
from distutils.version import StrictVersion
from enum import Enum, unique
from typing import Any, Callable, List, Optional, Sequence
from inspect import signature
from typing import Any, Callable, List, Optional, Sequence, Union

import neo4j
from neo4j.exceptions import ConstraintError
Expand All @@ -32,7 +33,9 @@

logger = logging.getLogger(__name__)

MigrationFn = Callable[[neo4j.AsyncTransaction], Coroutine]
TransactionFn = Callable[[neo4j.AsyncTransaction], Coroutine]
ExplicitTransactionFn = Callable[[neo4j.Session], Coroutine]
MigrationFn = Union[TransactionFn, ExplicitTransactionFn]

_MIGRATION_TIMEOUT_MSG = """Migration timeout expired !
Please check that a migration is indeed in progress. If the application is in a \
Expand Down Expand Up @@ -118,7 +121,14 @@ async def _migrate_with_lock(
)
# Then run to migration
logger.debug("Acquired write lock for %s !", migration.label)
await project_session.execute_write(migration.migration_fn)
sig = signature(migration.migration_fn)
first_param = list(sig.parameters)[0]
if first_param == "tx":
await project_session.execute_write(migration.migration_fn)
elif first_param == "sess":
await migration.migration_fn(project_session)
else:
raise ValueError(f"Invalid migration function: {migration.migration_fn}")
# Finally free the lock
await registry_session.execute_write(
complete_migration_tx,
Expand Down
16 changes: 12 additions & 4 deletions neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
MIGRATION_VERSION,
NE_APPEARS_IN_DOC,
NE_ID,
NE_IDS,
NE_MENTION_COUNT,
NE_MENTION_NORM,
NE_NODE,
NE_OFFSETS,
PROJECT_NAME,
PROJECT_NODE,
TASK_CREATED_AT,
Expand Down Expand Up @@ -55,8 +55,13 @@ async def migration_v_0_5_0_tx(tx: neo4j.AsyncTransaction):
await _create_email_user_and_domain_indexes(tx)


async def migration_v_0_6_0_tx(tx: neo4j.AsyncTransaction):
await _add_mention_count_to_named_entity_relationship(tx)
async def migration_v_0_6_0(sess: neo4j.AsyncSession):
query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE})
CALL {{
WITH rel
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_IDS})
}} IN TRANSACTIONS OF 10000 ROWS"""
await sess.run(query)


async def migration_v_0_7_0_tx(tx: neo4j.AsyncTransaction):
Expand Down Expand Up @@ -168,7 +173,10 @@ async def _create_email_user_and_domain_indexes(tx: neo4j.AsyncTransaction):

async def _add_mention_count_to_named_entity_relationship(tx: neo4j.AsyncTransaction):
query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE})
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_OFFSETS})"""
CALL {{
WITH rel
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_IDS})
}} IN TRANSACTIONS OF 10000 ROWS"""
await tx.run(query)


Expand Down
2 changes: 1 addition & 1 deletion neo4j-app/neo4j_app/core/neo4j/named_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def import_named_entity_rows(
rel.{NE_EXTRACTORS} = apoc.coll.toSet(\
rel.{NE_EXTRACTORS} + row.{NE_EXTRACTOR}),
rel.{NE_OFFSETS} = apoc.coll.toSet(rel.{NE_OFFSETS} + row.{NE_OFFSETS})
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_OFFSETS})
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_IDS})
WITH mention, doc, row
CALL apoc.do.case(
[
Expand Down
16 changes: 14 additions & 2 deletions neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,28 @@ async def _create_indexes_tx(tx: neo4j.AsyncTransaction):
await tx.run(index_query_1)


async def _create_indexes(sess: neo4j.AsyncSession):
index_query_0 = "CREATE INDEX index0 IF NOT EXISTS FOR (n:Node) ON (n.attribute0)"
await sess.run(index_query_0)
index_query_1 = "CREATE INDEX index1 IF NOT EXISTS FOR (n:Node) ON (n.attribute1)"
await sess.run(index_query_1)


async def _drop_constraint_tx(tx: neo4j.AsyncTransaction):
drop_index_query = "DROP INDEX index0 IF EXISTS"
await tx.run(drop_index_query)


# noinspection PyTypeChecker
_MIGRATION_0 = Migration(
version="0.2.0",
label="create index and constraint",
migration_fn=_create_indexes_tx,
)
# noinspection PyTypeChecker
_MIGRATION_0_EXPLICIT = Migration(
version="0.2.0",
label="create index and constraint",
migration_fn=_create_indexes,
)
_MIGRATION_1 = Migration(
version="0.3.0",
label="drop constraint",
Expand All @@ -75,6 +85,8 @@ async def _drop_constraint_tx(tx: neo4j.AsyncTransaction):
([], set(), set()),
# Single
([_MIGRATION_0], {"index0", "index1"}, set()),
# Single as explicit_transaction
([_MIGRATION_0_EXPLICIT], {"index0", "index1"}, set()),
# Multiple ordered
([_MIGRATION_0, _MIGRATION_1], {"index1"}, {"index0"}),
# Multiple unordered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
migration_v_0_3_0_tx,
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_6_0,
migration_v_0_7_0_tx,
)

Expand Down Expand Up @@ -105,11 +105,11 @@ async def test_migration_v_0_5_0_tx(neo4j_test_session: neo4j.AsyncSession):

async def test_migration_v_0_6_0_tx(neo4j_test_session: neo4j.AsyncSession):
# Given
create_path = """CREATE (:NamedEntity)-[:APPEARS_IN {offsets: [0, 1]}
create_path = """CREATE (:NamedEntity)-[:APPEARS_IN {mentionIds: ['id-0', 'id-1']}
]->(:Document)"""
await neo4j_test_session.run(create_path)
# When
await neo4j_test_session.execute_write(migration_v_0_6_0_tx)
await migration_v_0_6_0(neo4j_test_session)
# Then
match_path = "MATCH (:NamedEntity)-[rel:APPEARS_IN]->(:Document) RETURN rel"
res = await neo4j_test_session.run(match_path)
Expand Down
10 changes: 8 additions & 2 deletions neo4j-app/neo4j_app/tests/core/neo4j/test_name_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ async def test_import_named_entities_should_update_named_entity(
transaction_batch_size = 3
ents = list(make_named_entities(n=num_ents))
query = """
CREATE (n:NamedEntity {id: 'named-entity-0', offsets: [1, 2], documentId: 'doc-0'})
CREATE (n:NamedEntity {
id: 'named-entity-0', mentionIds: ['id-0', 'id-1'], documentId: 'doc-0'
})
"""
await neo4j_test_session.run(query)

Expand All @@ -104,7 +106,11 @@ async def test_import_named_entities_should_update_named_entity(
res = await neo4j_test_session.run(query)
ent = await res.single()
ent = dict(ent["ent"])
expected_ent = {"id": "named-entity-0", "offsets": [1, 2], "documentId": "doc-0"}
expected_ent = {
"id": "named-entity-0",
"mentionIds": ["id-0", "id-1"],
"documentId": "doc-0",
}
assert ent == expected_ent


Expand Down
2 changes: 1 addition & 1 deletion neo4j-app/neo4j_app/tests/core/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ async def test_should_aggregate_named_entities_attributes_on_relationship(
"offsets": [0, 1, 2],
"mentionExtractors": ["core-nlp", "spacy"],
"mentionIds": ["named-entity-1", "named-entity-2"],
"mentionCount": 3,
"mentionCount": 2,
"extractorLanguage": "en",
},
]
Expand Down

0 comments on commit 6030312

Please sign in to comment.