Skip to content

Commit

Permalink
tests: add unit tests for NetworkXStorage (#24)
Browse files Browse the repository at this point in the history
* Added unittests and handling edge and node degree with non-existing node

* Added more test cases for community schema and stable largest connected component

---------

Co-authored-by: terence-gpt <[email protected]>
  • Loading branch information
NumberChiffre and NumberChiffre authored Sep 5, 2024
1 parent 65a162c commit e5ec32b
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 2 deletions.
6 changes: 4 additions & 2 deletions nano_graphrag/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,12 @@ async def get_node(self, node_id: str) -> Union[dict, None]:
return self._graph.nodes.get(node_id)

async def node_degree(self, node_id: str) -> int:
return self._graph.degree(node_id)
# [numberchiffre]: node_id not part of graph returns `DegreeView({})` instead of 0
return self._graph.degree(node_id) if self._graph.has_node(node_id) else 0

async def edge_degree(self, src_id: str, tgt_id: str) -> int:
return self._graph.degree(src_id) + self._graph.degree(tgt_id)
return (self._graph.degree(src_id) if self._graph.has_node(src_id) else 0) + \
(self._graph.degree(tgt_id) if self._graph.has_node(tgt_id) else 0)

async def get_edge(
self, source_node_id: str, target_node_id: str
Expand Down
377 changes: 377 additions & 0 deletions tests/test_networkx_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,377 @@
import os
import shutil
import pytest
import networkx as nx
import numpy as np
import asyncio
import json
from nano_graphrag import GraphRAG
from nano_graphrag._storage import NetworkXStorage
from nano_graphrag._utils import wrap_embedding_func_with_attrs

WORKING_DIR = "./tests/nano_graphrag_cache_networkx_storage_test"


@pytest.fixture(scope="function")
def setup_teardown():
if os.path.exists(WORKING_DIR):
shutil.rmtree(WORKING_DIR)
os.mkdir(WORKING_DIR)

yield

shutil.rmtree(WORKING_DIR)


@wrap_embedding_func_with_attrs(embedding_dim=384, max_token_size=8192)
async def mock_embedding(texts: list[str]) -> np.ndarray:
return np.random.rand(len(texts), 384)


@pytest.fixture
def networkx_storage(setup_teardown):
rag = GraphRAG(working_dir=WORKING_DIR, embedding_func=mock_embedding)
return NetworkXStorage(
namespace="test",
global_config=rag.__dict__,
)


@pytest.mark.asyncio
async def test_upsert_and_get_node(networkx_storage):
node_id = "node1"
node_data = {"attr1": "value1", "attr2": "value2"}

await networkx_storage.upsert_node(node_id, node_data)

result = await networkx_storage.get_node(node_id)
assert result == node_data

has_node = await networkx_storage.has_node(node_id)
assert has_node is True


@pytest.mark.asyncio
async def test_upsert_and_get_edge(networkx_storage):
source_id = "node1"
target_id = "node2"
edge_data = {"weight": 1.0, "type": "connection"}

await networkx_storage.upsert_node(source_id, {})
await networkx_storage.upsert_node(target_id, {})
await networkx_storage.upsert_edge(source_id, target_id, edge_data)

result = await networkx_storage.get_edge(source_id, target_id)
assert result == edge_data

has_edge = await networkx_storage.has_edge(source_id, target_id)
assert has_edge is True


@pytest.mark.asyncio
async def test_node_degree(networkx_storage):
node_id = "center"
await networkx_storage.upsert_node(node_id, {})

num_neighbors = 5
for i in range(num_neighbors):
neighbor_id = f"neighbor{i}"
await networkx_storage.upsert_node(neighbor_id, {})
await networkx_storage.upsert_edge(node_id, neighbor_id, {})

degree = await networkx_storage.node_degree(node_id)
assert degree == num_neighbors


@pytest.mark.asyncio
async def test_edge_degree(networkx_storage):
source_id = "node1"
target_id = "node2"

await networkx_storage.upsert_node(source_id, {})
await networkx_storage.upsert_node(target_id, {})
await networkx_storage.upsert_edge(source_id, target_id, {})

num_source_neighbors = 3
for i in range(num_source_neighbors):
neighbor_id = f"neighbor{i}"
await networkx_storage.upsert_node(neighbor_id, {})
await networkx_storage.upsert_edge(source_id, neighbor_id, {})

num_target_neighbors = 2
for i in range(num_target_neighbors):
neighbor_id = f"target_neighbor{i}"
await networkx_storage.upsert_node(neighbor_id, {})
await networkx_storage.upsert_edge(target_id, neighbor_id, {})

expected_edge_degree = (num_source_neighbors + 1) + (num_target_neighbors + 1)
edge_degree = await networkx_storage.edge_degree(source_id, target_id)
assert edge_degree == expected_edge_degree


@pytest.mark.asyncio
async def test_get_node_edges(networkx_storage):
center_id = "center"
await networkx_storage.upsert_node(center_id, {})

expected_edges = []
for i in range(3):
neighbor_id = f"neighbor{i}"
await networkx_storage.upsert_node(neighbor_id, {})
await networkx_storage.upsert_edge(center_id, neighbor_id, {})
expected_edges.append((center_id, neighbor_id))

result = await networkx_storage.get_node_edges(center_id)
assert set(result) == set(expected_edges)


@pytest.mark.parametrize("algorithm", ["leiden"])
@pytest.mark.asyncio
async def test_clustering(networkx_storage, algorithm):
# [numberchiffre]: node ID is case-sensitive for clustering with leiden.
for i in range(10):
await networkx_storage.upsert_node(f"NODE{i}", {"source_id": f"chunk{i}"})

for i in range(9):
await networkx_storage.upsert_edge(f"NODE{i}", f"NODE{i+1}", {})

assert networkx_storage._graph.number_of_nodes() > 0
assert networkx_storage._graph.number_of_edges() > 0
await networkx_storage.clustering(algorithm=algorithm)

community_schema = await networkx_storage.community_schema()

assert len(community_schema) > 0

for community in community_schema.values():
assert "level" in community
assert "title" in community
assert "edges" in community
assert "nodes" in community
assert "chunk_ids" in community
assert "occurrence" in community
assert "sub_communities" in community


@pytest.mark.parametrize("algorithm", ["leiden"])
@pytest.mark.asyncio
async def test_leiden_clustering_consistency(networkx_storage, algorithm):
for i in range(10):
await networkx_storage.upsert_node(f"NODE{i}", {"source_id": f"chunk{i}"})
for i in range(9):
await networkx_storage.upsert_edge(f"NODE{i}", f"NODE{i+1}", {})

results = []
for _ in range(3):
await networkx_storage.clustering(algorithm=algorithm)
community_schema = await networkx_storage.community_schema()
results.append(community_schema)

assert all(len(r) == len(results[0]) for r in results), "Number of communities should be consistent"


@pytest.mark.parametrize("algorithm", ["leiden"])
@pytest.mark.asyncio
async def test_leiden_clustering_community_structure(networkx_storage, algorithm):
for i in range(10):
await networkx_storage.upsert_node(f"A{i}", {"source_id": f"chunkA{i}"})
await networkx_storage.upsert_node(f"B{i}", {"source_id": f"chunkB{i}"})
for i in range(9):
await networkx_storage.upsert_edge(f"A{i}", f"A{i+1}", {})
await networkx_storage.upsert_edge(f"B{i}", f"B{i+1}", {})

await networkx_storage.clustering(algorithm=algorithm)
community_schema = await networkx_storage.community_schema()

assert len(community_schema) >= 2, "Should have at least two communities"

communities = list(community_schema.values())
a_nodes = set(node for node in communities[0]['nodes'] if node.startswith('A'))
b_nodes = set(node for node in communities[0]['nodes'] if node.startswith('B'))
assert len(a_nodes) == 0 or len(b_nodes) == 0, "Nodes from different groups should be in different communities"


@pytest.mark.parametrize("algorithm", ["leiden"])
@pytest.mark.asyncio
async def test_leiden_clustering_hierarchical_structure(networkx_storage, algorithm):
await networkx_storage.upsert_node("NODE1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])})
await networkx_storage.upsert_node("NODE2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])})
await networkx_storage.upsert_edge("NODE1", "NODE2", {})
await networkx_storage.clustering(algorithm=algorithm)
community_schema = await networkx_storage.community_schema()

levels = set(community['level'] for community in community_schema.values())
assert len(levels) >= 1, "Should have at least one level in the hierarchy"

communities_per_level = {level: sum(1 for c in community_schema.values() if c['level'] == level) for level in levels}
assert communities_per_level[0] >= communities_per_level.get(max(levels), 0), "Lower levels should have more or equal number of communities"


@pytest.mark.asyncio
async def test_persistence(setup_teardown):
rag = GraphRAG(working_dir=WORKING_DIR, embedding_func=mock_embedding)
initial_storage = NetworkXStorage(
namespace="test_persistence",
global_config=rag.__dict__,
)

await initial_storage.upsert_node("node1", {"attr": "value"})
await initial_storage.upsert_node("node2", {"attr": "value"})
await initial_storage.upsert_edge("node1", "node2", {"weight": 1.0})

await initial_storage.index_done_callback()

new_storage = NetworkXStorage(
namespace="test_persistence",
global_config=rag.__dict__,
)

assert await new_storage.has_node("node1")
assert await new_storage.has_node("node2")
assert await new_storage.has_edge("node1", "node2")

node1_data = await new_storage.get_node("node1")
assert node1_data == {"attr": "value"}

edge_data = await new_storage.get_edge("node1", "node2")
assert edge_data == {"weight": 1.0}


@pytest.mark.asyncio
async def test_embed_nodes(networkx_storage):
for i in range(5):
await networkx_storage.upsert_node(f"node{i}", {"id": f"node{i}"})

for i in range(4):
await networkx_storage.upsert_edge(f"node{i}", f"node{i+1}", {})

embeddings, node_ids = await networkx_storage.embed_nodes("node2vec")

assert embeddings.shape == (5, networkx_storage.global_config['node2vec_params']['dimensions'])
assert len(node_ids) == 5
assert all(f"node{i}" in node_ids for i in range(5))


@pytest.mark.asyncio
async def test_stable_largest_connected_component_equal_components():
G = nx.Graph()
G.add_edges_from([("A", "B"), ("C", "D"), ("E", "F")])
result = NetworkXStorage.stable_largest_connected_component(G)
assert sorted(result.nodes()) == ["A", "B"]
assert list(result.edges()) == [("A", "B")]


@pytest.mark.asyncio
async def test_stable_largest_connected_component_stability():
G = nx.Graph()
G.add_edges_from([("A", "B"), ("B", "C"), ("C", "D"), ("E", "F")])
result1 = NetworkXStorage.stable_largest_connected_component(G)
result2 = NetworkXStorage.stable_largest_connected_component(G)
assert nx.is_isomorphic(result1, result2)
assert list(result1.nodes()) == list(result2.nodes())
assert list(result1.edges()) == list(result2.edges())


@pytest.mark.asyncio
async def test_stable_largest_connected_component_directed_graph():
G = nx.DiGraph()
G.add_edges_from([("A", "B"), ("B", "C"), ("C", "D"), ("E", "F")])
result = NetworkXStorage.stable_largest_connected_component(G)
assert sorted(result.nodes()) == ["A", "B", "C", "D"]
assert sorted(result.edges()) == [("A", "B"), ("B", "C"), ("C", "D")]


@pytest.mark.asyncio
async def test_stable_largest_connected_component_self_loops_and_parallel_edges():
G = nx.Graph()
G.add_edges_from([("A", "B"), ("B", "C"), ("C", "A"), ("A", "A"), ("B", "B"), ("A", "B")])
result = NetworkXStorage.stable_largest_connected_component(G)
assert sorted(result.nodes()) == ["A", "B", "C"]
assert sorted(result.edges()) == [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C')]


@pytest.mark.asyncio
async def test_community_schema_with_no_clusters(networkx_storage):
await networkx_storage.upsert_node("node1", {"source_id": "chunk1"})
await networkx_storage.upsert_node("node2", {"source_id": "chunk2"})
await networkx_storage.upsert_edge("node1", "node2", {})

community_schema = await networkx_storage.community_schema()
assert len(community_schema) == 0


@pytest.mark.asyncio
async def test_community_schema_multiple_levels(networkx_storage):
await networkx_storage.upsert_node("node1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])})
await networkx_storage.upsert_node("node2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])})
await networkx_storage.upsert_edge("node1", "node2", {})

community_schema = await networkx_storage.community_schema()
assert len(community_schema) == 3
assert set(community_schema.keys()) == {"0", "1", "2"}
assert community_schema["0"]["level"] == 0
assert community_schema["1"]["level"] == 1
assert community_schema["2"]["level"] == 1
assert set(community_schema["0"]["sub_communities"]) == {"1", "2"}


@pytest.mark.asyncio
async def test_community_schema_occurrence(networkx_storage):
await networkx_storage.upsert_node("node1", {"source_id": "chunk1,chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}])})
await networkx_storage.upsert_node("node2", {"source_id": "chunk3", "clusters": json.dumps([{"level": 0, "cluster": "0"}])})
await networkx_storage.upsert_node("node3", {"source_id": "chunk4", "clusters": json.dumps([{"level": 0, "cluster": "1"}])})

community_schema = await networkx_storage.community_schema()
assert len(community_schema) == 2
assert community_schema["0"]["occurrence"] == 1
assert community_schema["1"]["occurrence"] == 0.5


@pytest.mark.asyncio
async def test_community_schema_sub_communities(networkx_storage):
await networkx_storage.upsert_node("node1", {"source_id": "chunk1", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "1"}])})
await networkx_storage.upsert_node("node2", {"source_id": "chunk2", "clusters": json.dumps([{"level": 0, "cluster": "0"}, {"level": 1, "cluster": "2"}])})
await networkx_storage.upsert_node("node3", {"source_id": "chunk3", "clusters": json.dumps([{"level": 0, "cluster": "3"}, {"level": 1, "cluster": "4"}])})

community_schema = await networkx_storage.community_schema()
assert len(community_schema) == 5
assert set(community_schema["0"]["sub_communities"]) == {"1", "2"}
assert community_schema["3"]["sub_communities"] == ["4"]
assert community_schema["1"]["sub_communities"] == []
assert community_schema["2"]["sub_communities"] == []
assert community_schema["4"]["sub_communities"] == []


@pytest.mark.asyncio
async def test_concurrent_operations(networkx_storage):
async def add_nodes(start, end):
for i in range(start, end):
await networkx_storage.upsert_node(f"node{i}", {"value": i})

await asyncio.gather(
add_nodes(0, 500),
add_nodes(500, 1000)
)

assert await networkx_storage.node_degree("node0") == 0
assert len(networkx_storage._graph.nodes) == 1000


@pytest.mark.asyncio
async def test_nonexistent_node_and_edge(networkx_storage):
assert await networkx_storage.has_node("nonexistent") is False
assert await networkx_storage.has_edge("node1", "node2") is False
assert await networkx_storage.get_node("nonexistent") is None
assert await networkx_storage.get_edge("node1", "node2") is None
assert await networkx_storage.get_node_edges("nonexistent") is None
assert await networkx_storage.node_degree("nonexistent") == 0
assert await networkx_storage.edge_degree("node1", "node2") == 0


@pytest.mark.asyncio
async def test_error_handling(networkx_storage):
with pytest.raises(ValueError, match="Clustering algorithm invalid_algo not supported"):
await networkx_storage.clustering("invalid_algo")

with pytest.raises(ValueError, match="Node embedding algorithm invalid_algo not supported"):
await networkx_storage.embed_nodes("invalid_algo")

0 comments on commit e5ec32b

Please sign in to comment.