Skip to content

Commit

Permalink
feature: add document create and modified at datetimes
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Dec 20, 2023
1 parent 1ae94ac commit c4608f9
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 17 deletions.
21 changes: 21 additions & 0 deletions neo4j-app/neo4j_app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
DOC_NODE = "Document"
DOC_CONTENT_LENGTH = "contentLength"
DOC_CONTENT_TYPE = "contentType"
DOC_CREATED_AT = "createdAt"
DOC_DIRNAME = "dirname"
DOC_ID = "id"
DOC_ID_CSV = f"ID({DOC_NODE})"
DOC_EXTRACTION_DATE = "extractionDate"
DOC_METADATA = "metadata"
DOC_MODIFIED_AT = "modifiedAt"
DOC_PATH = "path"
DOC_URL_SUFFIX = "urlSuffix"
DOC_ROOT_ID = "rootDocument"
Expand All @@ -20,12 +23,30 @@
DOC_CONTENT_TYPE: {},
DOC_CONTENT_LENGTH: {NEO4J_CSV_COL: "LONG"},
DOC_EXTRACTION_DATE: {NEO4J_CSV_COL: "DATETIME"},
DOC_METADATA: {},
DOC_PATH: {},
DOC_URL_SUFFIX: {},
}

DOC_ES_SOURCES = list(DOC_COLUMNS) + ["join", DOC_ROOT_ID]

# Order matters here, we're taking the cdterms create in priority to be consistent
# with datashare-api which sets the creationDate as from tika_metadata_dcterms_created
# we fall back to other metadata if this one is missing
DOC_CREATED_AT_META = [
"tika_metadata_dcterms_created_iso8601",
"tika_metadata_creation_date_iso8601",
"tika_metadata_date_iso8601",
]
DOC_MODIFIED_AT_META = [
"tika_metadata_dcterms_modified_iso8601",
"tika_metadata_last_modified_iso8601",
"tika_metadata_modified_iso8601",
"tika_metadata_last_save_date_iso8601",
"tika_metadata_pdf_docinfo_modified_iso8601",
"tika_metadata_date_iso8601",
]

PROJECT_RUNS_MIGRATION = "_RUNS"
PROJECT_NAME = "name"
PROJECT_NODE = "_Project"
Expand Down
21 changes: 19 additions & 2 deletions neo4j-app/neo4j_app/core/elasticsearch/to_neo4j.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import hashlib
from typing import Dict, List, Optional, TextIO
from typing import Any, Dict, List, Optional, TextIO

from neo4j_app.constants import (
DOC_COLUMNS,
DOC_CREATED_AT,
DOC_CREATED_AT_META,
DOC_ID,
DOC_METADATA,
DOC_MODIFIED_AT,
DOC_MODIFIED_AT_META,
DOC_NODE,
DOC_ROOT_ID,
DOC_URL_SUFFIX,
Expand Down Expand Up @@ -40,7 +45,7 @@
_DS_DOC_URL = "ds/"


def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, str]]:
def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, Any]]:
doc_id = document_hit["_id"]
doc = {DOC_ID: doc_id}
hit_source = document_hit[SOURCE]
Expand All @@ -55,11 +60,23 @@ def es_to_neo4j_doc_row(document_hit: Dict) -> List[Dict[str, str]]:
return [doc]


def _coalesce(item: Dict[str, Any], columns: List[str]) -> Optional[Any]:
for c in columns:
value = item.get(c)
if value is not None:
return value
return None


def es_to_neo4j_doc_csv(
document_hit: Dict, *, prop_to_col_header: Dict[str, str]
) -> List[Dict[str, str]]:
doc = es_to_neo4j_doc_row(document_hit)[0]
doc.pop(DOC_ROOT_ID, None)
metadata = doc.pop(DOC_METADATA, None)
if metadata is not None:
doc[DOC_CREATED_AT] = _coalesce(metadata, DOC_CREATED_AT_META)
doc[DOC_MODIFIED_AT] = _coalesce(metadata, DOC_MODIFIED_AT_META)
doc = {prop_to_col_header[prop]: value for prop, value in doc.items()}
doc[NEO4J_CSV_LABEL] = DOC_NODE
return [doc]
Expand Down
17 changes: 16 additions & 1 deletion neo4j-app/neo4j_app/core/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
from neo4j_app import ROOT_DIR
from neo4j_app.constants import (
DOC_COLUMNS,
DOC_CREATED_AT,
DOC_ES_SOURCES,
DOC_ID,
DOC_ID_CSV,
DOC_METADATA,
DOC_MODIFIED_AT,
DOC_NODE,
DOC_ROOT_TYPE,
EMAIL_HEADER,
Expand Down Expand Up @@ -375,6 +378,18 @@ async def to_neo4j_csvs(
_DOC_ROOT_REL_HEADER = [f"{NEO4J_CSV_START_ID}({DOC_NODE})", _DOC_REL_END_CSV_COL]


def _doc_nodes_header_and_mapping() -> Tuple[List[str], Dict[str, str]]:
doc_nodes_header, doc_nodes_mapping = _make_header_and_mapping(DOC_COLUMNS)
doc_nodes_header = [h for h in doc_nodes_header if h != DOC_METADATA]
doc_nodes_mapping.pop(DOC_METADATA)
doc_created_at_h = f"{DOC_CREATED_AT}:DATETIME"
doc_modified_at_h = f"{DOC_MODIFIED_AT}:DATETIME"
doc_nodes_header.extend([doc_created_at_h, doc_modified_at_h])
doc_nodes_mapping[DOC_CREATED_AT] = doc_modified_at_h
doc_nodes_mapping[DOC_MODIFIED_AT] = doc_created_at_h
return doc_nodes_header, doc_nodes_mapping


async def _to_neo4j_doc_csvs(
*,
export_dir: Path,
Expand All @@ -388,7 +403,7 @@ async def _to_neo4j_doc_csvs(
) -> Tuple[NodeCSVs, RelationshipCSVs]:
doc_nodes_path = export_dir.joinpath("docs.csv")
doc_nodes_header_path = export_dir.joinpath("docs-header.csv")
doc_nodes_header, doc_nodes_mapping = _make_header_and_mapping(DOC_COLUMNS)
doc_nodes_header, doc_nodes_mapping = _doc_nodes_header_and_mapping()
doc_nodes_header.append(NEO4J_CSV_LABEL)
with doc_nodes_header_path.open("w") as f:
get_neo4j_csv_writer(f, doc_nodes_header).writeheader()
Expand Down
8 changes: 7 additions & 1 deletion neo4j-app/neo4j_app/core/neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_7_0_tx,
)

V_0_1_0 = Migration(
Expand Down Expand Up @@ -47,7 +48,12 @@
label="Add mention counts to named entity document relationships",
migration_fn=migration_v_0_6_0_tx,
)
MIGRATIONS = [V_0_1_0, V_0_2_0, V_0_3_0, V_0_4_0, V_0_5_0, V_0_6_0]
V_0_7_0 = Migration(
version="0.7.0",
label="Create document modified and created at indexes",
migration_fn=migration_v_0_7_0_tx,
)
MIGRATIONS = [V_0_1_0, V_0_2_0, V_0_3_0, V_0_4_0, V_0_5_0, V_0_6_0, V_0_7_0]


def get_neo4j_csv_reader(
Expand Down
19 changes: 18 additions & 1 deletion neo4j-app/neo4j_app/core/neo4j/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
from neo4j_app.constants import (
DOC_CONTENT_LENGTH,
DOC_CONTENT_TYPE,
DOC_CREATED_AT,
DOC_CREATED_AT_META,
DOC_DIRNAME,
DOC_EXTRACTION_DATE,
DOC_ID,
DOC_MODIFIED_AT,
DOC_MODIFIED_AT_META,
DOC_NODE,
DOC_PATH,
DOC_ROOT_ID,
Expand All @@ -20,6 +24,15 @@
logger = logging.getLogger(__name__)


_DOC_CREATED_AT_META = ["metadata." + c for c in DOC_CREATED_AT_META]
_DOC_MODIFIED_AT_META = ["metadata." + c for c in DOC_MODIFIED_AT_META]


def _coalesce(*, variable: str, attributes: List[str]) -> str:
values = ", ".join(f"{variable}.{a}" for a in attributes)
return f"coalesce({values})"


async def import_document_rows(
neo4j_session: neo4j.AsyncSession,
records: List[Dict],
Expand All @@ -37,7 +50,11 @@ async def import_document_rows(
doc.{DOC_EXTRACTION_DATE} = datetime(row.{DOC_EXTRACTION_DATE}),
doc.{DOC_DIRNAME} = row.{DOC_DIRNAME},
doc.{DOC_PATH} = row.{DOC_PATH},
doc.{DOC_URL_SUFFIX} = row.{DOC_URL_SUFFIX}
doc.{DOC_URL_SUFFIX} = row.{DOC_URL_SUFFIX},
doc.{DOC_CREATED_AT} = datetime({
_coalesce(variable="row", attributes=_DOC_CREATED_AT_META)}),
doc.{DOC_MODIFIED_AT} = datetime({
_coalesce(variable="row", attributes=_DOC_MODIFIED_AT_META)})
WITH doc, row
WHERE doc.{DOC_ID} = row.{DOC_ID} and row.{DOC_ROOT_ID} IS NOT NULL
MERGE (root:{DOC_NODE} {{{DOC_ID}: row.{DOC_ROOT_ID}}})
Expand Down
17 changes: 17 additions & 0 deletions neo4j-app/neo4j_app/core/neo4j/migrations/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from neo4j_app.constants import (
DOC_CONTENT_TYPE,
DOC_CREATED_AT,
DOC_ID,
DOC_MODIFIED_AT,
DOC_NODE,
DOC_PATH,
EMAIL_DOMAIN,
Expand Down Expand Up @@ -57,6 +59,10 @@ async def migration_v_0_6_0_tx(tx: neo4j.AsyncTransaction):
await _add_mention_count_to_named_entity_relationship(tx)


async def migration_v_0_7_0_tx(tx: neo4j.AsyncTransaction):
await _create_document_created_and_modified_at_indexes(tx)


async def _create_document_and_ne_id_unique_constraint_tx(tx: neo4j.AsyncTransaction):
doc_query = f"""CREATE CONSTRAINT constraint_document_unique_id
IF NOT EXISTS
Expand Down Expand Up @@ -164,3 +170,14 @@ async def _add_mention_count_to_named_entity_relationship(tx: neo4j.AsyncTransac
query = f"""MATCH (:{NE_NODE})-[rel:{NE_APPEARS_IN_DOC}]->(:{DOC_NODE})
SET rel.{NE_MENTION_COUNT} = size(rel.{NE_OFFSETS})"""
await tx.run(query)


async def _create_document_created_and_modified_at_indexes(tx: neo4j.AsyncTransaction):
created_at_index = f"""CREATE INDEX index_document_created_at IF NOT EXISTS
FOR (doc:{DOC_NODE})
ON (doc.{DOC_CREATED_AT})"""
await tx.run(created_at_index)
modified_at_index = f"""CREATE INDEX index_document_modified_at IF NOT EXISTS
FOR (doc:{DOC_NODE})
ON (doc.{DOC_MODIFIED_AT})"""
await tx.run(modified_at_index)
20 changes: 14 additions & 6 deletions neo4j-app/neo4j_app/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ async def neo4j_test_session(
return session


def make_docs(n: int) -> Generator[Dict, None, None]:
def make_docs(n: int, add_dates: bool = False) -> Generator[Dict, None, None]:
random.seed(a=777)
for i in random.sample(list(range(n)), k=n):
yield {
doc = {
"_index": TEST_PROJECT,
"_id": f"doc-{i}",
"_source": {
Expand All @@ -368,6 +368,12 @@ def make_docs(n: int) -> Generator[Dict, None, None]:
"join": {"name": "Document"},
},
}
if add_dates:
doc["_source"]["metadata"] = {
"tika_metadata_dcterms_created_iso8601": "2022-04-08T11:41:34Z",
"tika_metadata_modified_iso8601": "2022-04-08T11:41:34Z",
}
yield doc


def make_named_entities(n: int) -> Generator[Dict, None, None]:
Expand All @@ -393,8 +399,10 @@ def make_named_entities(n: int) -> Generator[Dict, None, None]:
}


def index_docs_ops(*, index_name: str, n: int) -> Generator[Dict, None, None]:
for doc in make_docs(n):
def index_docs_ops(
*, index_name: str, n: int, add_dates: bool = False
) -> Generator[Dict, None, None]:
for doc in make_docs(n, add_dates):
op = {
"_op_type": "index",
"_index": index_name,
Expand Down Expand Up @@ -426,9 +434,9 @@ def index_named_entities_ops(*, index_name: str, n: int) -> Generator[Dict, None


async def index_docs(
client: ESClient, *, n: int, index_name: str = TEST_PROJECT
client: ESClient, *, n: int, index_name: str = TEST_PROJECT, add_dates: bool = False
) -> AsyncGenerator[Dict, None]:
ops = index_docs_ops(index_name=index_name, n=n)
ops = index_docs_ops(index_name=index_name, n=n, add_dates=add_dates)
# Let's wait to make this operation visible to the search
refresh = "wait_for"
async for res in async_streaming_bulk(client, actions=ops, refresh=refresh):
Expand Down
18 changes: 18 additions & 0 deletions neo4j-app/neo4j_app/tests/core/neo4j/migrations/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
migration_v_0_4_0_tx,
migration_v_0_5_0_tx,
migration_v_0_6_0_tx,
migration_v_0_7_0_tx,
)


Expand Down Expand Up @@ -116,3 +117,20 @@ async def test_migration_v_0_6_0_tx(neo4j_test_session: neo4j.AsyncSession):
rel = res["rel"]
mention_counts = rel.get("mentionCount")
assert mention_counts == 2


async def test_migration_v_0_7_0_tx(neo4j_test_session: neo4j.AsyncSession):
# When
await neo4j_test_session.execute_write(migration_v_0_7_0_tx)

# Then
indexes_res = await neo4j_test_session.run("SHOW INDEXES")
existing_indexes = set()
async for rec in indexes_res:
existing_indexes.add(rec["name"])
expected_indexes = [
"index_document_created_at",
"index_document_modified_at",
]
for index in expected_indexes:
assert index in expected_indexes
Loading

0 comments on commit c4608f9

Please sign in to comment.