Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: display export limits and optimize export query #155

Merged
merged 8 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .data/datashare.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ batchDownloadMaxSize=100M
parallelism=8
batchQueueType=MEMORY
scrollSlices=1
neo4jHost=neo4j
neo4jHost=127.0.0.1
nlpParallelism=1
dataSourceUrl=jdbc\:sqlite\:file\:memorydb.db?mode\=memory&cache\=shared
batchDownloadMaxNbFiles=10000
Expand All @@ -49,4 +49,5 @@ neo4jAppStartTimeoutS=80
neo4jUser=neo4j
neo4jUriScheme=bolt
neo4jPassword=changeme
neo4jCliTaskPollIntervalS=2
neo4jCliTaskPollIntervalS=2
neo4jAppMaxDumpedDocuments=10000
6 changes: 2 additions & 4 deletions neo4j
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function _export_global_variables() {
DOCKER_GID="$(id -g)"
export DOCKER_GID
export ELASTICSEARCH_PORT=9201
export NEO4J_VERSION=4.4.17
export NEO4J_VERSION=4.4.29
export TEST_APP_PORT=8002
BIN_ARCHI="$(arch)"
if [ "$BIN_ARCHI" == "x86_64" ] || [ "$BIN_ARCHI" == "i386" ] || [ "$BIN_ARCHI" == "amd64" ]; then
Expand Down Expand Up @@ -361,7 +361,7 @@ function _helpers() {

local transform
if [ "$PLATFORM" == "Darwin" ]; then
transform="-s/package/$package_name/g"
transform="-s,package/,$package_name/,g"
elif [ "$PLATFORM" == "Linux" ]; then
transform="--transform=s,package/,$package_name/,"
else
Expand Down Expand Up @@ -702,7 +702,6 @@ function _main() {
PRINT_VERSION=
PROJECT=
PROJECTS=
PORT=8081

# Define sub projects here
PROJECTS=(
Expand Down Expand Up @@ -762,7 +761,6 @@ function _main() {
elif [[ $TYPE_ARG = "--sha" ]]; then
PRINT_SHA=1
elif [[ $TYPE_ARG = "--port" ]]; then
PORT=$VALUE_ARG
SKIP_NEXT_TYPE_ARG=true
fi
else
Expand Down
1 change: 1 addition & 0 deletions neo4j-app/neo4j_app/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def _log_exceptions():
@asynccontextmanager
async def run_app_deps(app: FastAPI, dependencies: List) -> AsyncGenerator[None, None]:
async with run_deps(dependencies, config=app.state.config):
app.state.config = await app.state.config.with_neo4j_support()
yield


Expand Down
49 changes: 35 additions & 14 deletions neo4j-app/neo4j_app/app/graphs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging

from fastapi import APIRouter
from fastapi import APIRouter, Request
from starlette.responses import StreamingResponse

from neo4j_app.app.dependencies import lifespan_neo4j_driver
from neo4j_app.app.doc import DOC_GRAPH_DUMP, DOC_GRAPH_DUMP_DESC, GRAPH_TAG
from neo4j_app.core import AppConfig
from neo4j_app.core.neo4j.graphs import count_documents_and_named_entities, dump_graph
from neo4j_app.core.objects import DumpRequest, GraphCounts
from neo4j_app.core.utils.logging import log_elapsed_time_cm
Expand All @@ -24,30 +25,50 @@ def graphs_router() -> APIRouter:
async def _graph_dump(
project: str,
payload: DumpRequest,
request: Request,
) -> StreamingResponse:
with log_elapsed_time_cm(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log was remove on purpose here, the elapsed time here can't be correclty computed as we're streaming, this returns right away

logger, logging.INFO, "Dumped graph in {elapsed_time} !"
):
res = StreamingResponse(
dump_graph(
project=project,
dump_format=payload.format,
neo4j_driver=lifespan_neo4j_driver(),
query=payload.query,
),
media_type="binary/octet-stream",
config: AppConfig = request.app.state.config
if config.supports_neo4j_parallel_runtime is None:
msg = (
"parallel support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
parallel = False # Parallel seem to slow down export let's deactivate it
res = StreamingResponse(
dump_graph(
project=project,
dump_format=payload.format,
neo4j_driver=lifespan_neo4j_driver(),
query=payload.query,
default_docs_limit=config.neo4j_app_max_dumped_documents,
parallel=parallel,
export_batch_size=config.neo4j_export_batch_size,
),
media_type="binary/octet-stream",
)
return res

@router.get("/counts", response_model=GraphCounts)
async def _count_documents_and_named_entities(project: str) -> GraphCounts:
async def _count_documents_and_named_entities(
project: str, request: Request
) -> GraphCounts:
config: AppConfig = request.app.state.config
if config.supports_neo4j_parallel_runtime is None:
msg = (
"parallel support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
with log_elapsed_time_cm(
logger,
logging.INFO,
"Counted documents and named entities in {elapsed_time} !",
):
count = await count_documents_and_named_entities(
project=project, neo4j_driver=lifespan_neo4j_driver()
project=project,
neo4j_driver=lifespan_neo4j_driver(),
parallel=config.supports_neo4j_parallel_runtime,
)
return count

Expand Down
13 changes: 9 additions & 4 deletions neo4j-app/neo4j_app/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ async def ping() -> str:

@router.get("/config", response_model=AppConfig, response_model_exclude_unset=True)
async def config(request: Request) -> AppConfig:
if request.app.state.config.supports_neo4j_enterprise is None:
conf = request.app.state.config
with_support = await conf.with_neo4j_support()
request.app.state.config = with_support
if (
request.app.state.config.supports_neo4j_enterprise is None
or request.app.state.config.supports_neo4j_parallel_runtime is None
):
msg = (
"neo4j support has not been set, config has not been properly"
" initialized using AppConfig.with_neo4j_support"
)
raise ValueError(msg)
return request.app.state.config

@router.get("/version")
Expand Down
16 changes: 13 additions & 3 deletions neo4j-app/neo4j_app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from neo4j_app.core.elasticsearch import ESClientABC
from neo4j_app.core.elasticsearch.client import ESClient, OSClient
from neo4j_app.core.neo4j.projects import is_enterprise
from neo4j_app.core.neo4j.projects import is_enterprise, supports_parallel_runtime
from neo4j_app.core.utils.logging import (
DATE_FMT,
STREAM_HANDLER_FMT,
Expand Down Expand Up @@ -75,6 +75,7 @@ class AppConfig(LowerCamelCaseModel, IgnoreExtraModel):
neo4j_app_gunicorn_workers: int = 1
neo4j_app_log_level: str = "INFO"
neo4j_app_log_in_json: bool = False
neo4j_app_max_dumped_documents: Optional[int] = None
neo4j_app_max_records_in_memory: int = int(1e6)
neo4j_app_migration_timeout_s: float = 60 * 5
neo4j_app_migration_throttle_s: float = 1
Expand All @@ -91,13 +92,15 @@ class AppConfig(LowerCamelCaseModel, IgnoreExtraModel):
neo4j_connection_timeout: float = 5.0
neo4j_host: str = "127.0.0.1"
neo4j_import_batch_size: int = int(5e5)
neo4j_export_batch_size: int = int(1e3)
neo4j_password: Optional[str] = None
neo4j_port: int = 7687
neo4j_transaction_batch_size = 50000
neo4j_user: Optional[str] = None
# Other supported schemes are neo4j+ssc, neo4j+s, bolt, bolt+ssc, bolt+s
neo4j_uri_scheme: str = "bolt"
supports_neo4j_enterprise: Optional[bool] = None
supports_neo4j_parallel_runtime: Optional[bool] = None
test: bool = False

# Ugly but hard to do differently if we want to avoid to retrieve the config on a
Expand Down Expand Up @@ -192,8 +195,15 @@ def to_worker_cls(self) -> Type["Worker"]:

async def with_neo4j_support(self) -> AppConfig:
async with self.to_neo4j_driver() as neo4j_driver: # pylint: disable=not-async-context-manager
support = await is_enterprise(neo4j_driver)
copied = safe_copy(self, update={"supports_neo4j_enterprise": support})
enterprise_support = await is_enterprise(neo4j_driver)
parallel_support = await supports_parallel_runtime(neo4j_driver)
copied = safe_copy(
self,
update={
"supports_neo4j_enterprise": enterprise_support,
"supports_neo4j_parallel_runtime": parallel_support,
},
)
return copied

def setup_loggers(self, worker_id: Optional[str] = None):
Expand Down
83 changes: 66 additions & 17 deletions neo4j-app/neo4j_app/core/neo4j/graphs.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,86 @@
import logging
from copy import deepcopy
from typing import AsyncGenerator, Optional

import neo4j

from neo4j_app.constants import (
DOC_NODE,
DOC_PATH,
EMAIL_RECEIVED_TYPE,
EMAIL_SENT_TYPE,
NE_APPEARS_IN_DOC,
NE_MENTION_COUNT,
NE_NODE,
)
from neo4j_app.core.neo4j.projects import project_db, supports_parallel_runtime
from neo4j_app.core.neo4j.projects import project_db
from neo4j_app.core.objects import DumpFormat, GraphCounts

logger = logging.getLogger(__name__)

_EXPORT_BATCH_SIZE = "batchSize"
_GRAPHML_DUMP_CONFIG = {
"format": "gephi",
"batchSize": 20000,
"stream": True,
"streamStatements": True,
"readLabels": False,
"storeNodeIds": False,
}

_CYPHER_DUMP_CONFIG = {
"stream": True,
"streamStatements": True,
"writeNodeProperties": True,
"format": "cypher-shell",
"cypherFormat": "create",
"useOptimizations": {"type": "UNWIND_BATCH", "unwindBatchSize": 100},
"useOptimizations": {"type": "UNWIND_BATCH", "unwindBatchSize": 1000},
}

_DEFAULT_DUMP_QUERY = """MATCH (node)
OPTIONAL MATCH (node)-[r]-(other)
RETURN *
"""

def _make_default_query(default_docs_limit: Optional[int] = None) -> str:
query = f"""MATCH (doc:{DOC_NODE})
WITH doc
ORDER BY doc.{DOC_PATH} ASC"""
if isinstance(default_docs_limit, int):
query += f"""
LIMIT {default_docs_limit}"""
query += f"""
OPTIONAL MATCH (doc)-[\
rel:{NE_APPEARS_IN_DOC}|{EMAIL_SENT_TYPE}|{EMAIL_RECEIVED_TYPE}]-(ne:{NE_NODE})
RETURN apoc.coll.toSet(collect(doc) + collect(ne) + collect(rel)) AS values"""
return query


async def dump_graph(
project: str,
dump_format: DumpFormat,
neo4j_driver: neo4j.AsyncDriver,
*,
query: Optional[str] = None,
default_docs_limit: Optional[int] = None,
parallel: bool = None,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
# TODO: support batchsize ?
neo4j_db = await project_db(neo4j_driver, project)
if query is None:
query = _DEFAULT_DUMP_QUERY
query = _make_default_query(default_docs_limit)
if dump_format is DumpFormat.GRAPHML:
gen = _dump_subgraph_to_graphml(neo4j_driver, neo4j_db, query)
gen = _dump_subgraph_to_graphml(
neo4j_driver,
neo4j_db=neo4j_db,
query=query,
parallel=parallel,
export_batch_size=export_batch_size,
)
elif dump_format is DumpFormat.CYPHER_SHELL:
gen = _dump_subgraph_to_cypher(neo4j_driver, neo4j_db, query)
gen = _dump_subgraph_to_cypher(
neo4j_driver,
neo4j_db=neo4j_db,
query=query,
parallel=parallel,
export_batch_size=export_batch_size,
)
else:
raise ValueError(f'dump not supported for "{dump_format}" format')
async for record in gen:
Expand All @@ -55,14 +89,21 @@ async def dump_graph(

async def _dump_subgraph_to_graphml(
neo4j_driver: neo4j.AsyncDriver,
*,
neo4j_db: str,
query: str,
parallel: bool,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
runtime = "CYPHER runtime=parallel" if parallel else ""
config = deepcopy(_GRAPHML_DUMP_CONFIG)
config[_EXPORT_BATCH_SIZE] = export_batch_size
async with neo4j_driver.session(database=neo4j_db) as sess:
neo4j_query = """CALL apoc.export.graphml.query($query_filter, null, $config)
YIELD data
neo4j_query = f"""{runtime}
CALL apoc.export.graphml.query($query_filter, null, $config) YIELD data
RETURN data;
"""
logger.debug("executing dump query: %s", query)
res = await sess.run(
neo4j_query, config=_GRAPHML_DUMP_CONFIG, query_filter=query
)
Expand All @@ -71,13 +112,22 @@ async def _dump_subgraph_to_graphml(


async def _dump_subgraph_to_cypher(
neo4j_driver, neo4j_db, query
neo4j_driver: neo4j.AsyncDriver,
*,
neo4j_db: str,
query: str,
parallel: bool,
export_batch_size: int,
) -> AsyncGenerator[str, None]:
runtime = "CYPHER runtime=parallel" if parallel else ""
async with neo4j_driver.session(database=neo4j_db) as sess:
neo4j_query = """CALL apoc.export.cypher.query($query_filter, null, $config)
YIELD cypherStatements
neo4j_query = f"""{runtime}
CALL apoc.export.cypher.query($query_filter, null, $config) YIELD cypherStatements
RETURN cypherStatements;
"""
config = deepcopy(_CYPHER_DUMP_CONFIG)
config[_EXPORT_BATCH_SIZE] = export_batch_size
logger.debug("executing dump query: %s", query)
res = await sess.run(
neo4j_query, config=_CYPHER_DUMP_CONFIG, query_filter=query
)
Expand All @@ -86,10 +136,9 @@ async def _dump_subgraph_to_cypher(


async def count_documents_and_named_entities(
neo4j_driver: neo4j.AsyncDriver, project: str
neo4j_driver: neo4j.AsyncDriver, project: str, parallel: bool
) -> GraphCounts:
neo4j_db = await project_db(neo4j_driver, project)
parallel = await supports_parallel_runtime(neo4j_driver)
async with neo4j_driver.session(database=neo4j_db) as sess:
count = await sess.execute_read(
_count_documents_and_named_entities_tx, parallel=parallel
Expand Down
Loading
Loading