Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add document nodes create at and modified at attributes #135

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions neo4j-app/neo4j_app/app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ def tasks_router() -> APIRouter:

@router.post("/tasks", response_model=Task)
async def _create_task(project: str, job: TaskJob) -> Response:
task_task_manager = lifespan_task_manager()
task_manager = lifespan_task_manager()
event_publisher = lifespan_event_publisher()
task_id = job.task_id
if task_id is None:
task_id = job.generate_task_id()
task = job.to_task(task_id=task_id)
try:
await task_task_manager.enqueue(task, project)
await task_manager.enqueue(task, project)
except TaskAlreadyExists:
return Response(task.id, status_code=200)
except TaskQueueIsFull as e:
Expand All @@ -55,9 +55,9 @@ async def _create_task(project: str, job: TaskJob) -> Response:

@router.post("/tasks/{task_id}/cancel", response_model=Task)
async def _cancel_task(project: str, task_id: str) -> Task:
task_task_manager = lifespan_task_manager()
task_manager = lifespan_task_manager()
try:
cancelled = await task_task_manager.cancel(task_id=task_id, project=project)
cancelled = await task_manager.cancel(task_id=task_id, project=project)
except UnknownTask as e:
raise HTTPException(status_code=404, detail=e.args[0]) from e
return cancelled
Expand Down
21 changes: 21 additions & 0 deletions neo4j-app/neo4j_app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
DOC_NODE = "Document"
DOC_CONTENT_LENGTH = "contentLength"
DOC_CONTENT_TYPE = "contentType"
DOC_CREATED_AT = "createdAt"
DOC_DIRNAME = "dirname"
DOC_ID = "id"
DOC_ID_CSV = f"ID({DOC_NODE})"
DOC_EXTRACTION_DATE = "extractionDate"
DOC_METADATA = "metadata"
DOC_MODIFIED_AT = "modifiedAt"
DOC_PATH = "path"
DOC_URL_SUFFIX = "urlSuffix"
DOC_ROOT_ID = "rootDocument"
Expand All @@ -20,12 +23,30 @@
DOC_CONTENT_TYPE: {},
DOC_CONTENT_LENGTH: {NEO4J_CSV_COL: "LONG"},
DOC_EXTRACTION_DATE: {NEO4J_CSV_COL: "DATETIME"},
DOC_METADATA: {},
DOC_PATH: {},
DOC_URL_SUFFIX: {},
}

DOC_ES_SOURCES = list(DOC_COLUMNS) + ["join", DOC_ROOT_ID]

# Order matters here, we're taking the cdterms create in priority to be consistent
# with datashare-api which sets the creationDate as from tika_metadata_dcterms_created
# we fall back to other metadata if this one is missing
DOC_CREATED_AT_META = [
"tika_metadata_dcterms_created_iso8601",
"tika_metadata_creation_date_iso8601",
"tika_metadata_date_iso8601",
]
DOC_MODIFIED_AT_META = [
"tika_metadata_dcterms_modified_iso8601",
"tika_metadata_last_modified_iso8601",
"tika_metadata_modified_iso8601",
"tika_metadata_last_save_date_iso8601",
"tika_metadata_pdf_docinfo_modified_iso8601",
"tika_metadata_date_iso8601",
]

PROJECT_RUNS_MIGRATION = "_RUNS"
PROJECT_NAME = "name"
PROJECT_NODE = "_Project"
Expand Down
21 changes: 19 additions & 2 deletions neo4j-app/neo4j_app/core/elasticsearch/to_neo4j.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import hashlib
from typing import Dict, List, Optional, TextIO
from typing import Any, Dict, List, Optional, TextIO

from neo4j_app.constants import (
DOC_COLUMNS,
DOC_CREATED_AT,
DOC_CREATED_AT_META,
DOC_ID,
DOC_METADATA,
DOC_MODIFIED_AT,
DOC_MODIFIED_AT_META,
DOC_NODE,
DOC_ROOT_ID,
DOC_URL_SUFFIX,
Expand Down Expand Up @@ -40,7 +45,7 @@
_DS_DOC_URL = "ds/"


def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, str]]:
def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, Any]]:
doc_id = document_hit["_id"]
doc = {DOC_ID: doc_id}
hit_source = document_hit[SOURCE]
Expand All @@ -55,11 +60,23 @@ def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, str]]:
return [doc]


def _coalesce(item: Dict[str, Any], columns: List[str]) -> Optional[Any]:
for c in columns:
value = item.get(c)
if value is not None:
return value
return None


def es_to_neo4j_doc_csv(
document_hit: Dict, *, prop_to_col_header: Dict[str, str]
) -> List[Dict[str, str]]:
doc = es_to_neo4j_doc_row(document_hit)[0]
doc.pop(DOC_ROOT_ID, None)
metadata = doc.pop(DOC_METADATA, None)
if metadata is not None:
doc[DOC_CREATED_AT] = _coalesce(metadata, DOC_CREATED_AT_META)
doc[DOC_MODIFIED_AT] = _coalesce(metadata, DOC_MODIFIED_AT_META)
doc = {prop_to_col_header[prop]: value for prop, value in doc.items()}
doc[NEO4J_CSV_LABEL] = DOC_NODE
return [doc]
Expand Down
17 changes: 16 additions & 1 deletion neo4j-app/neo4j_app/core/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
from neo4j_app import ROOT_DIR
from neo4j_app.constants import (
DOC_COLUMNS,
DOC_CREATED_AT,
DOC_ES_SOURCES,
DOC_ID,
DOC_ID_CSV,
DOC_METADATA,
DOC_MODIFIED_AT,
DOC_NODE,
DOC_ROOT_TYPE,
EMAIL_HEADER,
Expand Down Expand Up @@ -375,6 +378,18 @@ async def to_neo4j_csvs(
_DOC_ROOT_REL_HEADER = [f"{NEO4J_CSV_START_ID}({DOC_NODE})", _DOC_REL_END_CSV_COL]


def _doc_nodes_header_and_mapping() -> Tuple[List[str], Dict[str, str]]:
doc_nodes_header, doc_nodes_mapping = _make_header_and_mapping(DOC_COLUMNS)
doc_nodes_header = [h for h in doc_nodes_header if h != DOC_METADATA]
doc_nodes_mapping.pop(DOC_METADATA)
doc_created_at_h = f"{DOC_CREATED_AT}:DATETIME"
doc_modified_at_h = f"{DOC_MODIFIED_AT}:DATETIME"
doc_nodes_header.extend([doc_created_at_h, doc_modified_at_h])
doc_nodes_mapping[DOC_CREATED_AT] = doc_modified_at_h
doc_nodes_mapping[DOC_MODIFIED_AT] = doc_created_at_h
return doc_nodes_header, doc_nodes_mapping


async def _to_neo4j_doc_csvs(
*,
export_dir: Path,
Expand All @@ -388,7 +403,7 @@ async def _to_neo4j_doc_csvs(
) -> Tuple[NodeCSVs, RelationshipCSVs]:
doc_nodes_path = export_dir.joinpath("docs.csv")
doc_nodes_header_path = export_dir.joinpath("docs-header.csv")
doc_nodes_header, doc_nodes_mapping = _make_header_and_mapping(DOC_COLUMNS)
doc_nodes_header, doc_nodes_mapping = _doc_nodes_header_and_mapping()
doc_nodes_header.append(NEO4J_CSV_LABEL)
with doc_nodes_header_path.open("w") as f:
get_neo4j_csv_writer(f, doc_nodes_header).writeheader()
Expand Down
8 changes: 7 additions & 1 deletion neo4j-app/neo4j_app/core/neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_7_0_tx,
)

V_0_1_0 = Migration(
Expand Down Expand Up @@ -47,7 +48,12 @@
label="Add mention counts to named entity document relationships",
migration_fn=migration_v_0_6_0_tx,
)
MIGRATIONS = [V_0_1_0, V_0_2_0, V_0_3_0, V_0_4_0, V_0_5_0, V_0_6_0]
V_0_7_0 = Migration(
version="0.7.0",
label="Create document modified and created at indexes",
migration_fn=migration_v_0_7_0_tx,
)
MIGRATIONS = [V_0_1_0, V_0_2_0, V_0_3_0, V_0_4_0, V_0_5_0, V_0_6_0, V_0_7_0]


def get_neo4j_csv_reader(
Expand Down
19 changes: 18 additions & 1 deletion neo4j-app/neo4j_app/core/neo4j/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
from neo4j_app.constants import (
DOC_CONTENT_LENGTH,
DOC_CONTENT_TYPE,
DOC_CREATED_AT,
DOC_CREATED_AT_META,
DOC_DIRNAME,
DOC_EXTRACTION_DATE,
DOC_ID,
DOC_MODIFIED_AT,
DOC_MODIFIED_AT_META,
DOC_NODE,
DOC_PATH,
DOC_ROOT_ID,
Expand All @@ -20,6 +24,15 @@
logger = logging.getLogger(__name__)


_DOC_CREATED_AT_META = ["metadata." + c for c in DOC_CREATED_AT_META]
_DOC_MODIFIED_AT_META = ["metadata." + c for c in DOC_MODIFIED_AT_META]


def _coalesce(*, variable: str, attributes: List[str]) -> str:
values = ", ".join(f"{variable}.{a}" for a in attributes)
return f"coalesce({values})"


async def import_document_rows(
neo4j_session: neo4j.AsyncSession,
records: List[Dict],
Expand All @@ -37,7 +50,11 @@ async def import_document_rows(
doc.{DOC_EXTRACTION_DATE} = datetime(row.{DOC_EXTRACTION_DATE}),
doc.{DOC_DIRNAME} = row.{DOC_DIRNAME},
doc.{DOC_PATH} = row.{DOC_PATH},
doc.{DOC_URL_SUFFIX} = row.{DOC_URL_SUFFIX}
doc.{DOC_URL_SUFFIX} = row.{DOC_URL_SUFFIX},
doc.{DOC_CREATED_AT} = datetime({
_coalesce(variable="row", attributes=_DOC_CREATED_AT_META)}),
doc.{DOC_MODIFIED_AT} = datetime({
_coalesce(variable="row", attributes=_DOC_MODIFIED_AT_META)})
WITH doc, row
WHERE doc.{DOC_ID} = row.{DOC_ID} and row.{DOC_ROOT_ID} IS NOT NULL
MERGE (root:{DOC_NODE} {{{DOC_ID}: row.{DOC_ROOT_ID}}})
Expand Down
17 changes: 17 additions & 0 deletions neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from neo4j_app.constants import (
DOC_CONTENT_TYPE,
DOC_CREATED_AT,
DOC_ID,
DOC_MODIFIED_AT,
DOC_NODE,
DOC_PATH,
EMAIL_DOMAIN,
Expand Down Expand Up @@ -57,6 +59,10 @@ async def migration_v_0_6_0_tx(tx: neo4j.AsyncTransaction):
await _add_mention_count_to_named_entity_relationship(tx)


async def migration_v_0_7_0_tx(tx: neo4j.AsyncTransaction):
await _create_document_created_and_modified_at_indexes(tx)


async def _create_document_and_ne_id_unique_constraint_tx(tx: neo4j.AsyncTransaction):
doc_query = f"""CREATE CONSTRAINT constraint_document_unique_id
IF NOT EXISTS
Expand Down Expand Up @@ -164,3 +170,14 @@ async def _add_mention_count_to_named_entity_relationship(tx: neo4j.AsyncTransac
query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE})
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_OFFSETS})"""
await tx.run(query)


async def _create_document_created_and_modified_at_indexes(tx: neo4j.AsyncTransaction):
created_at_index = f"""CREATE INDEX index_document_created_at IF NOT EXISTS
FOR (doc:{DOC_NODE})
ON (doc.{DOC_CREATED_AT})"""
await tx.run(created_at_index)
modified_at_index = f"""CREATE INDEX index_document_modified_at IF NOT EXISTS
FOR (doc:{DOC_NODE})
ON (doc.{DOC_MODIFIED_AT})"""
await tx.run(modified_at_index)
2 changes: 1 addition & 1 deletion neo4j-app/neo4j_app/tests/app/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def test_create_task_should_return_429_when_too_many_tasks(

# Then
assert res_0.status_code == 201, res_0.json()
# This one is queued or rejected depending if the first one is processed or still
# This one is queued or rejected depending on if the first one is processed or still
# in the queue
assert res_1.status_code in [201, 429], res_1.json()
assert res_2.status_code == 429, res_1.json()
Expand Down
20 changes: 14 additions & 6 deletions neo4j-app/neo4j_app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ async def neo4j_test_session(
return session


def make_docs(n: int) -> Generator[Dict, None, None]:
def make_docs(n: int, add_dates: bool = False) -> Generator[Dict, None, None]:
random.seed(a=777)
for i in random.sample(list(range(n)), k=n):
yield {
doc = {
"_index": TEST_PROJECT,
"_id": f"doc-{i}",
"_source": {
Expand All @@ -368,6 +368,12 @@ def make_docs(n: int) -> Generator[Dict, None, None]:
"join": {"name": "Document"},
},
}
if add_dates:
doc["_source"]["metadata"] = {
"tika_metadata_dcterms_created_iso8601": "2022-04-08T11:41:34Z",
"tika_metadata_modified_iso8601": "2022-04-08T11:41:34Z",
}
yield doc


def make_named_entities(n: int) -> Generator[Dict, None, None]:
Expand All @@ -393,8 +399,10 @@ def make_named_entities(n: int) -> Generator[Dict, None, None]:
}


def index_docs_ops(*, index_name: str, n: int) -> Generator[Dict, None, None]:
for doc in make_docs(n):
def index_docs_ops(
*, index_name: str, n: int, add_dates: bool = False
) -> Generator[Dict, None, None]:
for doc in make_docs(n, add_dates):
op = {
"_op_type": "index",
"_index": index_name,
Expand Down Expand Up @@ -426,9 +434,9 @@ def index_named_entities_ops(*, index_name: str, n: int) -> Generator[Dict, None


async def index_docs(
client: ESClient, *, n: int, index_name: str = TEST_PROJECT
client: ESClient, *, n: int, index_name: str = TEST_PROJECT, add_dates: bool = False
) -> AsyncGenerator[Dict, None]:
ops = index_docs_ops(index_name=index_name, n=n)
ops = index_docs_ops(index_name=index_name, n=n, add_dates=add_dates)
# Let's wait to make this operation visible to the search
refresh = "wait_for"
async for res in async_streaming_bulk(client, actions=ops, refresh=refresh):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ async def test_init_project_should_raise_for_reserved_name(
)


@pytest.mark.regression("131")
@pytest.mark.pull("131")
async def test_migrate_project_db_schema_should_read_migrations_from_registry(
neo4j_test_driver_session: neo4j.AsyncDriver,
monkeypatch,
Expand Down
18 changes: 18 additions & 0 deletions neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_7_0_tx,
)


Expand Down Expand Up @@ -116,3 +117,20 @@ async def test_migration_v_0_6_0_tx(neo4j_test_session: neo4j.AsyncSession):
rel = res["rel"]
mention_counts = rel.get("mentionCount")
assert mention_counts == 2


async def test_migration_v_0_7_0_tx(neo4j_test_session: neo4j.AsyncSession):
# When
await neo4j_test_session.execute_write(migration_v_0_7_0_tx)

# Then
indexes_res = await neo4j_test_session.run("SHOW INDEXES")
existing_indexes = set()
async for rec in indexes_res:
existing_indexes.add(rec["name"])
expected_indexes = [
"index_document_created_at",
"index_document_modified_at",
]
for index in expected_indexes:
assert index in expected_indexes
Loading