-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspoke_loader.py
345 lines (322 loc) · 13.7 KB
/
spoke_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Import spoke matrix from a neo4j csv dump.
import csv
import gzip
import json
import os
import numpy as np
from scipy import sparse, io
# TODO: multiple edges between two nodes?
def import_csv(csv_filename, edges_to_include=None, remove_unused_nodes=False):
"""
Args:
csv_filename: name of csv file
edges_to_include: set of edge types
remove_unused_nodes: True if nodes with no in- or out-edges are to be removed.
Returns:
nodes: list of (_id, _name, _labels_id) where _labels_id corresponds to a key in node_types
edges: dict of (node1, node2): _type_id where node1 and node2 index into nodes, and _type_id corresponds to a key in edge_types
node_types: dict of int: str (_labels)
edge_types: dict of int: str (_type)
"""
nodes = []
n_nodes = 0
# mapping of _id to index in nodes
node_index = {}
# node_types is a map of string (
node_types = {}
edges = {}
# edge_types is a map of string (_type) to node
edge_types = {}
# sets of nodes that have in-edges or out-edges (to use when deciding whether to remove nodes)
node_has_edge = set()
csv.field_size_limit(99999999)
if csv_filename.endswith('.gz'):
# handle gzip
f = gzip.open(csv_filename, 'rt')
else:
f = open(csv_filename)
dr = csv.DictReader(f, dialect='unix')
for i, row in enumerate(dr):
if i % 10000 == 0:
print(i, 'nodes: ', len(node_index), 'edges: ', len(edges))
# if this is a node
if row['_id']:
print(row['license'])
if row['name']:
row_name = row['name']
print(row_name)
else:
row_name = row['pref_name']
if row['_labels'] in node_types:
nodes.append((int(row['_id']), row_name, node_types[row['_labels']]))
else:
nodes.append((int(row['_id']), row_name, len(node_types) + 1))
node_types[row['_labels']] = len(node_types) + 1
node_index[int(row['_id'])] = n_nodes
n_nodes += 1
# if this row is an edge
else:
edge_type = row['_type']
if edges_to_include is None or edge_type in edges_to_include:
node1 = int(row['_start'])
node2 = int(row['_end'])
node_has_edge.add(node1)
node_has_edge.add(node2)
if edge_type in edge_types:
edges[(node1, node2)] = edge_types[edge_type]
else:
edges[(node1, node2)] = len(edge_types) + 1
edge_types[row['_type']] = len(edge_types) + 1
if remove_unused_nodes:
# remove all nodes that don't have edges
to_remove = set(node_index.keys()).difference(node_has_edge)
nodes = [n for n in nodes if n[0] not in to_remove]
# rebuild node_index
node_index = {n[0]: i for i, n in enumerate(nodes)}
# convert edge indices
new_edges = {}
for k, e in edges.items():
node1, node2 = k
node1 = node_index[node1]
node2 = node_index[node2]
new_edges[(node1, node2)] = e
edges = new_edges
node_types = {v: k for k, v in node_types.items()}
edge_types = {v: k for k, v in edge_types.items()}
return nodes, edges, node_types, edge_types
# TODO: multiple edges between two nodes?
def import_jsonl(filename, edges_to_include=None, remove_unused_nodes=True, use_edge_types=True, use_node_types=True):
"""
Imports a jsonl file.
Args:
filename: name of jsonl file
edges_to_include: set of edge types
remove_unused_nodes: True if nodes with no in- or out-edges are to be removed.
Returns:
nodes: list of (_id, _name, _labels_id) where _labels_id corresponds to a key in node_types
edges: dict of (node1, node2): _type_id where node1 and node2 index into nodes, and _type_id corresponds to a key in edge_types
node_types: dict of int: str (_labels)
edge_types: dict of int: str (_type)
"""
nodes = []
n_nodes = 0
# mapping of _id to index in nodes
node_index = {}
# node_types is a map of string (
node_types = {}
edges = {}
# edge_types is a map of string (_type) to node
edge_types = {}
# sets of nodes that have in-edges or out-edges (to use when deciding whether to remove nodes)
node_has_edge = set()
if filename.endswith('.gz'):
# handle gzip
f = gzip.open(filename, 'rt')
else:
f = open(filename)
line = f.readline()
i = 0
while line:
row = json.loads(line)
if i % 10000 == 0:
print(i, 'nodes: ', len(node_index), 'edges: ', len(edges))
# if this is a node
if row['type'] == 'node':
if 'name' in row['properties'] and row['properties']['name'] != '':
row_name = row['properties']['name']
elif 'pref_name' in row['properties'] and row['properties']['pref_name'] != '':
row_name = row['properties']['pref_name']
elif 'identifier' in row['properties'] and row['properties']['identifier'] != '':
row_name = row['properties']['identifier']
elif 'id' in row['properties'] and row['properties']['id']:
row_name = row['properties']['id']
else:
row_name = ''
row_label = row['labels'][0]
if use_node_types:
if row_label in node_types:
nodes.append((int(row['id']), row_name, node_types[row_label]))
else:
nodes.append((int(row['id']), row_name, len(node_types) + 1))
node_types[row_label] = len(node_types) + 1
else:
nodes.append((int(row['id']), row_name, True))
node_index[int(row['id'])] = n_nodes
n_nodes += 1
# if this row is an edge
else:
edge_type = row['label']
if edges_to_include is None or edge_type in edges_to_include:
node1 = int(row['start']['id'])
node2 = int(row['end']['id'])
node_has_edge.add(node1)
node_has_edge.add(node2)
if use_edge_types:
if edge_type in edge_types:
edges[(node1, node2)] = edge_types[edge_type]
else:
edges[(node1, node2)] = len(edge_types) + 1
edge_types[edge_type] = len(edge_types) + 1
else:
edges[(node1, node2)] = True
line = f.readline()
i += 1
if remove_unused_nodes:
# remove all nodes that don't have edges
to_remove = set(node_index.keys()).difference(node_has_edge)
nodes = [n for n in nodes if n[0] not in to_remove]
# rebuild node_index
node_index = {n[0]: i for i, n in enumerate(nodes)}
# convert edge indices
new_edges = {}
for k, e in edges.items():
node1, node2 = k
node1 = node_index[node1]
node2 = node_index[node2]
new_edges[(node1, node2)] = e
edges = new_edges
node_types = {v: k for k, v in node_types.items()}
edge_types = {v: k for k, v in edge_types.items()}
return nodes, edges, node_types, edge_types
def import_ckg(filename, edges_to_include=None, remove_unused_nodes=False, use_edge_types=True, use_node_types=True, n_edges=300000000, n_nodes=20000000):
"""
Imports a jsonl file.
This tries to be less memory-intensive than the other import procedure.
Args:
filename: name of jsonl file
edges_to_include: set of edge types
remove_unused_nodes: True if nodes with no in- or out-edges are to be removed.
n_edges: An upper bound on the number of edges (does not have to be exact, but should be greater than the actual number of edges)
Returns:
nodes: list of (_id, _name, _labels_id) where _labels_id corresponds to a key in node_types
edges: COO array
node_types: dict of int: str (_labels)
edge_types: dict of int: str (_type)
"""
nodes = []
n_nodes = 0
# mapping of _id to index in nodes
node_index = {}
# node_types is a map of string (
node_types = {}
edges_start = np.zeros(n_edges, dtype=np.int64)
edges_end = np.zeros(n_edges, dtype=np.int64)
edges_values = np.zeros(n_edges, dtype=np.uint8)
# edge_types is a map of string (_type) to node
edge_types = {}
# sets of nodes that have in-edges or out-edges (to use when deciding whether to remove nodes)
if filename.endswith('.gz'):
# handle gzip
f = gzip.open(filename, 'rt')
else:
f = open(filename)
line = f.readline()
i = 0
# ne is number of current edges
ne = 0
while line:
row = json.loads(line)
if i % 10000 == 0:
print(i, 'nodes: ', len(node_index), 'edges: ', ne)
# if this is a node
if row['type'] == 'node':
if 'name' in row['properties'] and row['properties']['name'] != '':
row_name = row['properties']['name']
elif 'pref_name' in row['properties'] and row['properties']['pref_name'] != '':
row_name = row['properties']['pref_name']
elif 'identifier' in row['properties'] and row['properties']['identifier'] != '':
row_name = row['properties']['identifier']
elif 'id' in row['properties'] and row['properties']['id']:
row_name = row['properties']['id']
else:
row_name = ''
row_label = row['labels'][0]
if use_node_types:
if row_label in node_types:
nodes.append((int(row['id']), row_name, node_types[row_label]))
else:
nodes.append((int(row['id']), row_name, len(node_types) + 1))
node_types[row_label] = len(node_types) + 1
else:
nodes.append((int(row['id']), row_name, True))
node_index[int(row['id'])] = n_nodes
n_nodes += 1
# if this row is an edge
# in neo4j exports, edges always come after nodes.
# assumption: there are less than 255 edge types
else:
edge_type = row['label']
if edges_to_include is None or edge_type in edges_to_include:
node1 = node_index[int(row['start']['id'])]
node2 = node_index[int(row['end']['id'])]
edges_start[ne] = node1
edges_end[ne] = node2
if use_edge_types:
if edge_type in edge_types:
edges_values[ne] = edge_types[edge_type]
else:
edges_values[ne] = len(edge_types) + 1
edge_types[edge_type] = len(edge_types) + 1
else:
edges_values[ne] = 1
ne += 1
line = f.readline()
i += 1
edges_values = edges_values[:ne]
edges_start = edges_start[:ne]
edges_end = edges_end[:ne]
edges = sparse.coo_array((edges_values, (edges_start, edges_end)), shape=(len(nodes), len(nodes)))
node_types = {v: k for k, v in node_types.items()}
edge_types = {v: k for k, v in edge_types.items()}
return nodes, edges, node_types, edge_types
def import_nodes_edges(node_file, edges_file):
"""
Imports nodes and edges separately.
"""
# TODO: make this work
def to_sparse(nodes, edges):
"""
Returns a DOK matrix from the edges...
"""
n_nodes = len(nodes)
edge_matrix = sparse.dok_array((n_nodes, n_nodes), dtype=int)
for k, v in sorted(edges.items()):
n1, n2 = k
edge_matrix[n1, n2] = v
return edge_matrix
def load_spoke(filename='spoke.csv', edges_to_include=None, remove_unused_nodes=False, mtx_filename='spoke.mtx', **kwargs):
if filename.endswith('.csv') or filename.endswith('.csv.gz'):
nodes, edges, node_types, edge_types = import_csv(filename, edges_to_include, remove_unused_nodes, **kwargs)
elif filename.endswith('.json') or filename.endswith('.json.gz') or filename.endswith('.jsonl') or filename.endswith('.jsonl.gz'):
nodes, edges, node_types, edge_types = import_jsonl(filename, edges_to_include, remove_unused_nodes, **kwargs)
if not os.path.exists(mtx_filename):
edge_matrix = to_sparse(nodes, edges)
io.mmwrite(mtx_filename, edge_matrix)
else:
edge_matrix = io.mmread(mtx_filename)
return nodes, edges, node_types, edge_types, edge_matrix
def symmetrize_matrix(matrix):
"""
Symmetrizes an adjacency matrix.
Warning: this completely destroys any meaning applied to node values. Nonzero = edge exists, zero = edge doesn't exist.
"""
lower_triangle = sparse.tril(matrix)
upper_triangle = sparse.triu(matrix)
return lower_triangle + lower_triangle.T + upper_triangle + upper_triangle.T
if __name__ == '__main__':
#nodes, edges, node_types, edge_types, edge_matrix = load_spoke('spoke_2021.csv', remove_unused_nodes=True, mtx_filename='spoke_2021.mtx.gz')
nodes, edges, node_types, edge_types = import_jsonl('spoke_2021.jsonl.gz', remove_unused_nodes=True)
# TODO: compute some graph statistics?
from collections import Counter
node_type_counts = Counter()
for n in nodes:
node_type_counts[n[2]] += 1
node_type_counts = {node_types[k]: c for k, c in node_type_counts.items()}
edge_type_counts = Counter()
for k, e in edges.items():
edge_type_counts[e] += 1
edge_type_counts = {edge_types[k]: c for k, c in edge_type_counts.items()}
with open('spoke_2021_node_types.json', 'w') as f:
json.dump(node_type_counts, f, indent=2)
with open('spoke_2021_edge_types.json', 'w') as f:
json.dump(edge_type_counts, f, indent=2)