Skip to content

Commit

Permalink
Merge pull request #276 from biolink/fix-rdfsource-caching
Browse files Browse the repository at this point in the history
Fix caching in RdfSource
  • Loading branch information
deepakunni3 authored Mar 16, 2021
2 parents 3254a32 + f59e107 commit a17f06e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
60 changes: 39 additions & 21 deletions kgx/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ def cli():
@click.argument('inputs', required=True, type=click.Path(exists=True), nargs=-1)
@click.option(
'--input-format',
'-i',
required=True,
help=f'The input format. Can be one of {get_input_file_types()}',
)
@click.option('--input-compression', required=False, help='The input compression type')
@click.option('--output', required=True, type=click.Path(exists=False))
@click.option('--input-compression', '-c', required=False, help='The input compression type')
@click.option('--output', '-o', required=True, type=click.Path(exists=False))
@click.option(
'--report-type',
'-r',
required=False,
type=str,
help=f'The summary report type. Can be one of {summary_report_types.keys()}',
help=f'The summary report type. Must be one of {tuple(summary_report_types.keys())}',
default='kgx-map',
)
@click.option('--stream', is_flag=True, help='Parse input as a stream')
@click.option('--stream', '-s', is_flag=True, help='Parse input as a stream')
@click.option(
'--node-facet-properties',
required=False,
Expand Down Expand Up @@ -116,17 +118,19 @@ def graph_summary_wrapper(
@click.argument('inputs', required=True, type=click.Path(exists=True), nargs=-1)
@click.option(
'--input-format',
'-i',
required=True,
help=f'The input format. Can be one of {get_input_file_types()}',
)
@click.option('--input-compression', required=False, help='The input compression type')
@click.option('--input-compression', '-c', required=False, help='The input compression type')
@click.option(
'--output',
'-o',
required=False,
type=click.Path(exists=False),
help='File to write validation reports to',
)
@click.option('--stream', is_flag=True, help='Parse input as a stream')
@click.option('--stream', '-s', is_flag=True, help='Parse input as a stream')
def validate_wrapper(
inputs: List[str], input_format: str, input_compression: str, output: str, stream: bool
):
Expand Down Expand Up @@ -154,29 +158,33 @@ def validate_wrapper(
@cli.command(name='neo4j-download')
@click.option(
'--uri',
'-l',
required=True,
type=str,
help='Neo4j URI to download from. For example, https://localhost:7474',
)
@click.option('--username', required=True, type=str, help='Neo4j username')
@click.option('--password', required=True, type=str, help='Neo4j password')
@click.option('--output', required=True, type=click.Path(exists=False), help='Output')
@click.option('--username', '-u', required=True, type=str, help='Neo4j username')
@click.option('--password', '-p', required=True, type=str, help='Neo4j password')
@click.option('--output', '-o', required=True, type=click.Path(exists=False), help='Output')
@click.option(
'--output-format',
'-f',
required=True,
help=f'The output format. Can be one of {get_input_file_types()}',
)
@click.option('--output-compression', required=False, help='The output compression type')
@click.option('--stream', is_flag=True, help='Parse input as a stream')
@click.option('--output-compression', '-d', required=False, help='The output compression type')
@click.option('--stream', '-s', is_flag=True, help='Parse input as a stream')
@click.option(
'--node-filters',
'-n',
required=False,
type=click.Tuple([str, str]),
multiple=True,
help=f'Filters for filtering nodes from the input graph',
)
@click.option(
'--edge-filters',
'-e',
required=False,
type=click.Tuple([str, str]),
multiple=True,
Expand Down Expand Up @@ -236,28 +244,32 @@ def neo4j_download_wrapper(
@click.argument('inputs', required=True, type=click.Path(exists=True), nargs=-1)
@click.option(
'--input-format',
'-i',
required=True,
help=f'The input format. Can be one of {get_input_file_types()}',
)
@click.option('--input-compression', required=False, help='The input compression type')
@click.option('--input-compression', '-c', required=False, help='The input compression type')
@click.option(
'--uri',
'-l',
required=True,
type=str,
help='Neo4j URI to upload to. For example, https://localhost:7474',
)
@click.option('--username', required=True, type=str, help='Neo4j username')
@click.option('--password', required=True, type=str, help='Neo4j password')
@click.option('--stream', is_flag=True, help='Parse input as a stream')
@click.option('--username', '-u', required=True, type=str, help='Neo4j username')
@click.option('--password', '-p', required=True, type=str, help='Neo4j password')
@click.option('--stream', '-s', is_flag=True, help='Parse input as a stream')
@click.option(
'--node-filters',
'-n',
required=False,
type=click.Tuple([str, str]),
multiple=True,
help=f'Filters for filtering nodes from the input graph',
)
@click.option(
'--edge-filters',
'-e',
required=False,
type=click.Tuple([str, str]),
multiple=True,
Expand Down Expand Up @@ -317,27 +329,31 @@ def neo4j_upload_wrapper(
@click.argument('inputs', required=False, type=click.Path(exists=True), nargs=-1)
@click.option(
'--input-format',
'-i',
required=False,
help=f'The input format. Can be one of {get_input_file_types()}',
)
@click.option('--input-compression', required=False, help='The input compression type')
@click.option('--output', required=False, type=click.Path(exists=False), help='Output')
@click.option('--input-compression', '-c', required=False, help='The input compression type')
@click.option('--output', '-o', required=False, type=click.Path(exists=False), help='Output')
@click.option(
'--output-format',
'-f',
required=False,
help=f'The output format. Can be one of {get_input_file_types()}',
)
@click.option('--output-compression', required=False, help='The output compression type')
@click.option('--output-compression', '-d', required=False, help='The output compression type')
@click.option('--stream', is_flag=True, help='Parse input as a stream')
@click.option(
'--node-filters',
'-n',
required=False,
type=click.Tuple([str, str]),
multiple=True,
help=f'Filters for filtering nodes from the input graph',
)
@click.option(
'--edge-filters',
'-e',
required=False,
type=click.Tuple([str, str]),
multiple=True,
Expand All @@ -347,7 +363,7 @@ def neo4j_upload_wrapper(
@click.option(
'--source', required=False, type=str, multiple=True, help='Source(s) from the YAML to process'
)
@click.option('--processes', required=False, type=int, default=1, help='Number of processes to use')
@click.option('--processes', '-p', required=False, type=int, default=1, help='Number of processes to use')
def transform_wrapper(
inputs: List[str],
input_format: str,
Expand Down Expand Up @@ -379,7 +395,9 @@ def transform_wrapper(
output_format: str
The output format
output_compression: str
The output compression type
The output compression typ
stream: bool
Wheter or not to stream
node_filters: Tuple[str, str]
Node filters
edge_filters: Tuple[str, str]
Expand Down Expand Up @@ -420,7 +438,7 @@ def transform_wrapper(
multiple=True,
help='Destination(s) from the YAML to process',
)
@click.option('--processes', required=False, type=int, default=1, help='Number of processes to use')
@click.option('--processes', '-p', required=False, type=int, default=1, help='Number of processes to use')
def merge_wrapper(merge_config: str, source: List, destination: List, processes: int):
"""
Load nodes and edges from files and KGs, as defined in a config YAML, and merge them into a single graph.
Expand Down
3 changes: 3 additions & 0 deletions kgx/parsers/ntriples_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def parse(self, filename: str) -> Generator:
self.line = self.readline()
if self.line is None:
break
if self.line == '':
break
try:
yield from self.parseline()
except ParseError:
Expand All @@ -53,6 +55,7 @@ def parseline(self) -> Generator:
A generator
"""
print(self.line)
self.eat(r_wspace)
if self.line or not self.line.startswith('#'):
subject = self.subject()
Expand Down
16 changes: 13 additions & 3 deletions kgx/source/rdf_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ def __init__(self):
self.reified_nodes: Set = set()
self.start: int = 0
self.count: int = 0
self.CACHE_SIZE = 100000
self.CACHE_SIZE = 10000
self.node_record = {}
self.edge_record = {}
self.node_cache = {}
self.edge_cache = {}
self._incomplete_nodes = {}

def set_predicate_mapping(self, m: Dict) -> None:
"""
Expand Down Expand Up @@ -252,7 +253,16 @@ def triple(self, s: URIRef, p: URIRef, o: URIRef) -> None:
while self.reified_nodes:
n = self.reified_nodes.pop()
data = self.node_cache.pop(n)
self.dereify(n, data)
try:
self.dereify(n, data)
except ValueError as e:
log.info(e)
self._incomplete_nodes[n] = data

for n in self._incomplete_nodes.keys():
self.node_cache[n] = self._incomplete_nodes[n]
self.reified_nodes.add(n)
self._incomplete_nodes.clear()

for k in self.edge_cache.keys():
if 'id' not in self.edge_cache[k] and 'association_id' not in self.edge_cache[k]:
Expand Down Expand Up @@ -294,7 +304,7 @@ def dereify(self, n: str, node: Dict) -> None:
if 'subject' in node and 'object' in node:
self.add_edge(node['subject'], node['object'], node['predicate'], node)
else:
log.warning(f"Cannot dereify node {n} {node}")
raise ValueError(f"Incomplete node {n} {node}")

def add_node_attribute(
self, iri: Union[URIRef, str], key: str, value: Union[str, List]
Expand Down

0 comments on commit a17f06e

Please sign in to comment.