Skip to content

Commit

Permalink
chore: optimize import and display graph limits
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Jan 24, 2024
1 parent e7735ab commit 9de3b20
Show file tree
Hide file tree
Showing 23 changed files with 800 additions and 544 deletions.
1 change: 1 addition & 0 deletions neo4j-app/neo4j_app/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def _log_exceptions():
@asynccontextmanager
async def run_app_deps(app: FastAPI, dependencies: List) -> AsyncGenerator[None, None]:
async with run_deps(dependencies, config=app.state.config):
app.state.config = await app.state.config.with_neo4j_support()
yield


Expand Down
49 changes: 35 additions & 14 deletions neo4j-app/neo4j_app/app/graphs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging

from fastapi import APIRouter
from fastapi import APIRouter, Request
from starlette.responses import StreamingResponse

from neo4j_app.app.dependencies import lifespan_neo4j_driver
from neo4j_app.app.doc import DOC_GRAPH_DUMP, DOC_GRAPH_DUMP_DESC, GRAPH_TAG
from neo4j_app.core import AppConfig
from neo4j_app.core.neo4j.graphs import count_documents_and_named_entities, dump_graph
from neo4j_app.core.objects import DumpRequest, GraphCounts
from neo4j_app.core.utils.logging import log_elapsed_time_cm
Expand All @@ -24,30 +25,50 @@ def graphs_router() -> APIRouter:
async def _graph_dump(
project: str,
payload: DumpRequest,
request: Request,
) -> StreamingResponse:
with log_elapsed_time_cm(
logger, logging.INFO, "Dumped graph in {elapsed_time} !"
):
res = StreamingResponse(
dump_graph(
project=project,
dump_format=payload.format,
neo4j_driver=lifespan_neo4j_driver(),
query=payload.query,
),
media_type="binary/octet-stream",
config: AppConfig = request.app.state.config
if config.supports_neo4j_parallel_runtime is None:
msg = (
"parallel support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
parallel = False # Parallel seem to slow down export let's deactivate it
res = StreamingResponse(
dump_graph(
project=project,
dump_format=payload.format,
neo4j_driver=lifespan_neo4j_driver(),
query=payload.query,
default_docs_limit=config.neo4j_app_max_dumped_documents,
parallel=parallel,
export_batch_size=config.neo4j_export_batch_size,
),
media_type="binary/octet-stream",
)
return res

@router.get("/counts", response_model=GraphCounts)
async def _count_documents_and_named_entities(project: str) -> GraphCounts:
async def _count_documents_and_named_entities(
project: str, request: Request
) -> GraphCounts:
config: AppConfig = request.app.state.config
if config.supports_neo4j_parallel_runtime is None:
msg = (
"parallel support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
with log_elapsed_time_cm(
logger,
logging.INFO,
"Counted documents and named entities in {elapsed_time} !",
):
count = await count_documents_and_named_entities(
project=project, neo4j_driver=lifespan_neo4j_driver()
project=project,
neo4j_driver=lifespan_neo4j_driver(),
parallel=config.supports_neo4j_parallel_runtime,
)
return count

Expand Down
13 changes: 9 additions & 4 deletions neo4j-app/neo4j_app/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ async def ping() -> str:

@router.get("/config", response_model=AppConfig, response_model_exclude_unset=True)
async def config(request: Request) -> AppConfig:
if request.app.state.config.supports_neo4j_enterprise is None:
conf = request.app.state.config
with_support = await conf.with_neo4j_support()
request.app.state.config = with_support
if (
request.app.state.config.supports_neo4j_enterprise is None
or request.app.state.config.supports_neo4j_parallel_runtime is None
):
msg = (
"neo4j support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
return request.app.state.config

@router.get("/version")
Expand Down
16 changes: 13 additions & 3 deletions neo4j-app/neo4j_app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from neo4j_app.core.elasticsearch import ESClientABC
from neo4j_app.core.elasticsearch.client import ESClient, OSClient
from neo4j_app.core.neo4j.projects import is_enterprise
from neo4j_app.core.neo4j.projects import is_enterprise, supports_parallel_runtime
from neo4j_app.core.utils.logging import (
DATE_FMT,
STREAM_HANDLER_FMT,
Expand Down Expand Up @@ -75,6 +75,7 @@ class AppConfig(LowerCamelCaseModel, IgnoreExtraModel):
neo4j_app_gunicorn_workers: int = 1
neo4j_app_log_level: str = "INFO"
neo4j_app_log_in_json: bool = False
neo4j_app_max_dumped_documents: Optional[int] = None
neo4j_app_max_records_in_memory: int = int(1e6)
neo4j_app_migration_timeout_s: float = 60 * 5
neo4j_app_migration_throttle_s: float = 1
Expand All @@ -91,13 +92,15 @@ class AppConfig(LowerCamelCaseModel, IgnoreExtraModel):
neo4j_connection_timeout: float = 5.0
neo4j_host: str = "127.0.0.1"
neo4j_import_batch_size: int = int(5e5)
neo4j_export_batch_size: int = int(1e3)
neo4j_password: Optional[str] = None
neo4j_port: int = 7687
neo4j_transaction_batch_size = 50000
neo4j_user: Optional[str] = None
# Other supported schemes are neo4j+ssc, neo4j+s, bolt, bolt+ssc, bolt+s
neo4j_uri_scheme: str = "bolt"
supports_neo4j_enterprise: Optional[bool] = None
supports_neo4j_parallel_runtime: Optional[bool] = None
test: bool = False

# Ugly but hard to do differently if we want to avoid to retrieve the config on a
Expand Down Expand Up @@ -192,8 +195,15 @@ def to_worker_cls(self) -> Type["Worker"]:

async def with_neo4j_support(self) -> AppConfig:
async with self.to_neo4j_driver() as neo4j_driver: # pylint: disable=not-async-context-manager
support = await is_enterprise(neo4j_driver)
copied = safe_copy(self, update={"supports_neo4j_enterprise": support})
enterprise_support = await is_enterprise(neo4j_driver)
parallel_support = await supports_parallel_runtime(neo4j_driver)
copied = safe_copy(
self,
update={
"supports_neo4j_enterprise": enterprise_support,
"supports_neo4j_parallel_runtime": parallel_support,
},
)
return copied

def setup_loggers(self, worker_id: Optional[str] = None):
Expand Down
83 changes: 66 additions & 17 deletions neo4j-app/neo4j_app/core/neo4j/graphs.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,86 @@
import logging
from copy import deepcopy
from typing import AsyncGenerator, Optional

import neo4j

from neo4j_app.constants import (
DOC_NODE,
DOC_PATH,
EMAIL_RECEIVED_TYPE,
EMAIL_SENT_TYPE,
NE_APPEARS_IN_DOC,
NE_MENTION_COUNT,
NE_NODE,
)
from neo4j_app.core.neo4j.projects import project_db, supports_parallel_runtime
from neo4j_app.core.neo4j.projects import project_db
from neo4j_app.core.objects import DumpFormat, GraphCounts

logger = logging.getLogger(__name__)

_EXPORT_BATCH_SIZE = "batchSize"
_GRAPHML_DUMP_CONFIG = {
"format": "gephi",
"batchSize": 20000,
"stream": True,
"streamStatements": True,
"readLabels": False,
"storeNodeIds": False,
}

_CYPHER_DUMP_CONFIG = {
"stream": True,
"streamStatements": True,
"writeNodeProperties": True,
"format": "cypher-shell",
"cypherFormat": "create",
"useOptimizations": {"type": "UNWIND_BATCH", "unwindBatchSize": 100},
"useOptimizations": {"type": "UNWIND_BATCH", "unwindBatchSize": 1000},
}

_DEFAULT_DUMP_QUERY = """MATCH (node)
OPTIONAL MATCH (node)-[r]-(other)
RETURN *
"""

def _make_default_query(default_docs_limit: Optional[int] = None) -> str:
query = f"""MATCH (doc:{DOC_NODE})
WITH doc
ORDER BY doc.{DOC_PATH} ASC"""
if isinstance(default_docs_limit, int):
query += f"""
LIMIT {default_docs_limit}"""
query += f"""
OPTIONAL MATCH (doc)-[\
rel:{NE_APPEARS_IN_DOC}|{EMAIL_SENT_TYPE}|{EMAIL_RECEIVED_TYPE}]-(ne:{NE_NODE})
RETURN apoc.coll.toSet(collect(doc) + collect(ne) + collect(rel)) AS values"""
return query


async def dump_graph(
project: str,
dump_format: DumpFormat,
neo4j_driver: neo4j.AsyncDriver,
*,
query: Optional[str] = None,
default_docs_limit: Optional[int] = None,
parallel: bool = None,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
# TODO: support batchsize ?
neo4j_db = await project_db(neo4j_driver, project)
if query is None:
query = _DEFAULT_DUMP_QUERY
query = _make_default_query(default_docs_limit)
if dump_format is DumpFormat.GRAPHML:
gen = _dump_subgraph_to_graphml(neo4j_driver, neo4j_db, query)
gen = _dump_subgraph_to_graphml(
neo4j_driver,
neo4j_db=neo4j_db,
query=query,
parallel=parallel,
export_batch_size=export_batch_size,
)
elif dump_format is DumpFormat.CYPHER_SHELL:
gen = _dump_subgraph_to_cypher(neo4j_driver, neo4j_db, query)
gen = _dump_subgraph_to_cypher(
neo4j_driver,
neo4j_db=neo4j_db,
query=query,
parallel=parallel,
export_batch_size=export_batch_size,
)
else:
raise ValueError(f'dump not supported for "{dump_format}" format')
async for record in gen:
Expand All @@ -55,14 +89,21 @@ async def dump_graph(

async def _dump_subgraph_to_graphml(
neo4j_driver: neo4j.AsyncDriver,
*,
neo4j_db: str,
query: str,
parallel: bool,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
runtime = "CYPHER runtime=parallel" if parallel else ""
config = deepcopy(_GRAPHML_DUMP_CONFIG)
config[_EXPORT_BATCH_SIZE] = export_batch_size
async with neo4j_driver.session(database=neo4j_db) as sess:
neo4j_query = """CALL apoc.export.graphml.query($query_filter, null, $config)
YIELD data
neo4j_query = f"""{runtime}
CALL apoc.export.graphml.query($query_filter, null, $config) YIELD data
RETURN data;
"""
logger.debug("executing dump query: %s", query)
res = await sess.run(
neo4j_query, config=_GRAPHML_DUMP_CONFIG, query_filter=query
)
Expand All @@ -71,13 +112,22 @@ async def _dump_subgraph_to_graphml(


async def _dump_subgraph_to_cypher(
neo4j_driver, neo4j_db, query
neo4j_driver: neo4j.AsyncDriver,
*,
neo4j_db: str,
query: str,
parallel: bool,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
runtime = "CYPHER runtime=parallel" if parallel else ""
async with neo4j_driver.session(database=neo4j_db) as sess:
neo4j_query = """CALL apoc.export.cypher.query($query_filter, null, $config)
YIELD cypherStatements
neo4j_query = f"""{runtime}
CALL apoc.export.cypher.query($query_filter, null, $config) YIELD cypherStatements
RETURN cypherStatements;
"""
config = deepcopy(_CYPHER_DUMP_CONFIG)
config[_EXPORT_BATCH_SIZE] = export_batch_size
logger.debug("executing dump query: %s", query)
res = await sess.run(
neo4j_query, config=_CYPHER_DUMP_CONFIG, query_filter=query
)
Expand All @@ -86,10 +136,9 @@ async def _dump_subgraph_to_cypher(


async def count_documents_and_named_entities(
neo4j_driver: neo4j.AsyncDriver, project: str
neo4j_driver: neo4j.AsyncDriver, project: str, parallel: bool
) -> GraphCounts:
neo4j_db = await project_db(neo4j_driver, project)
parallel = await supports_parallel_runtime(neo4j_driver)
async with neo4j_driver.session(database=neo4j_db) as sess:
count = await sess.execute_read(
_count_documents_and_named_entities_tx, parallel=parallel
Expand Down
Loading

0 comments on commit 9de3b20

Please sign in to comment.