diff --git a/neps/optimizers/models/graphs/__init__.py b/neps/optimizers/models/graphs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neps/optimizers/models/graphs/context_managers.py b/neps/optimizers/models/graphs/context_managers.py new file mode 100644 index 00000000..56b64359 --- /dev/null +++ b/neps/optimizers/models/graphs/context_managers.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from typing import TYPE_CHECKING + +from botorch.models import SingleTaskGP + +from neps.optimizers.models.graphs.kernels import BoTorchWLKernel, compute_kernel + +if TYPE_CHECKING: + import networkx as nx + from botorch.models.gp_regression_mixed import Kernel + + +@contextmanager +def set_graph_lookup( + kernel_or_gp: Kernel | SingleTaskGP, + new_graphs: list[nx.Graph], + *, + append: bool = True, +) -> Iterator[None]: + """Context manager to temporarily set the graph lookup for a kernel or GP model. + + Args: + kernel_or_gp (Kernel | SingleTaskGP): The kernel or GP model whose graph lookup is + to be set. + new_graphs (list[nx.Graph]): The new graphs to set in the graph lookup. + append (bool, optional): Whether to append the new graphs to the existing graph + lookup. Defaults to True. + """ + kernel_prev_graphs: list[tuple[Kernel, list[nx.Graph]]] = [] + + # Determine the modules to update based on the input type + if isinstance(kernel_or_gp, SingleTaskGP): + modules = [ + k + for k in kernel_or_gp.covar_module.sub_kernels() + if isinstance(k, BoTorchWLKernel) + ] + elif isinstance(kernel_or_gp, BoTorchWLKernel): + modules = [kernel_or_gp] + else: + assert hasattr(kernel_or_gp, "sub_kernels"), ( + "Kernel module must have sub_kernels method." + ) + modules = [ + k for k in kernel_or_gp.sub_kernels() if isinstance(k, BoTorchWLKernel) + ] + + # Save the current graph lookup and set the new graph lookup + for kern in modules: + compute_kernel.cache_clear() + + kernel_prev_graphs.append((kern, kern.graph_lookup)) + if append: + kern.set_graph_lookup([*kern.graph_lookup, *new_graphs]) + else: + kern.set_graph_lookup(new_graphs) + + yield + + # Restore the original graph lookup after the context manager exits + for kern, prev_graphs in kernel_prev_graphs: + kern.set_graph_lookup(prev_graphs) diff --git a/neps/optimizers/models/graphs/kernels.py b/neps/optimizers/models/graphs/kernels.py new file mode 100644 index 00000000..64f4a7f7 --- /dev/null +++ b/neps/optimizers/models/graphs/kernels.py @@ -0,0 +1,304 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Any + +import torch +from botorch.models.gp_regression_mixed import Kernel +from torch import Tensor +from torch.nn import Module + +from neps.optimizers.models.graphs.utils import graphs_to_tensors + +if TYPE_CHECKING: + import networkx as nx + + +@lru_cache(maxsize=128) +def compute_kernel( + adjacency_cache: tuple[Tensor, ...], + label_cache: tuple[Tensor, ...], + indices1: tuple[int, ...], + indices2: tuple[int, ...], + n_iter: int, + *, + diag: bool, + normalize: bool, +) -> Tensor: + """Compute the kernel matrix. + + This function is defined outside the class to leverage the `lru_cache` decorator, + which caches the results of expensive function calls and reuses them when the same + inputs occur again. + + Args: + adjacency_cache: Tuple of adjacency matrices for the graphs. + label_cache: Tuple of initial node labels for the graphs. + indices1: Tuple of indices for the first set of graphs. + indices2: Tuple of indices for the second set of graphs. + n_iter: Number of WL iterations. + diag: Whether to return only the diagonal of the kernel matrix. + normalize: Whether to normalize the kernel matrix. + + Returns: + A Tensor representing the kernel matrix. + """ + all_graphs = list(set(indices1).union(indices2)) + adj_matrices = [adjacency_cache[i] for i in all_graphs] + label_tensors = [label_cache[i] for i in all_graphs] + + # Compute full kernel matrix + _kernel = TorchWLKernel(n_iter=n_iter, normalize=normalize) + K_full = _kernel(adj_matrices, label_tensors) + + # Map indices to their positions in all_graphs + idx1 = [all_graphs.index(i) for i in indices1] + idx2 = [all_graphs.index(i) for i in indices2] + + # Extract the relevant submatrix + K = K_full[idx1][:, idx2] + + # Return the diagonal if requested + if diag: + return torch.diag(K) + + return K + + +class BoTorchWLKernel(Kernel): + """A custom kernel for Gaussian Processes using the Weisfeiler-Lehman (WL) algorithm. + + This kernel computes similarities between graphs based on their structural properties + using the WL algorithm. It is designed to be used with BoTorch and GPyTorch for + Gaussian Process regression. + + Args: + graph_lookup (list[nx.Graph]): List of NetworkX graphs. + n_iter (int, optional): Number of WL iterations to perform. Default is 5. + normalize (bool, optional): Whether to normalize the kernel matrix. + Default is True. + active_dims (tuple[int, ...]): Dimensions of the input to consider. + Not used in this kernel but included for compatibility with the base Kernel class. + **kwargs (Any): Additional arguments for the base Kernel class. + + Attributes: + graph_lookup (list[nx.Graph]): List of graphs used for kernel computation. + n_iter (int): Number of WL iterations. + normalize (bool): Whether to normalize the kernel matrix. + adjacency_cache (list[Tensor]): Cached adjacency matrices of the graphs. + label_cache (list[Tensor]): Cached initial node labels of the graphs. + """ + + has_lengthscale = False + + def __init__( + self, + graph_lookup: list[nx.Graph], + n_iter: int = 5, + *, + normalize: bool = True, + active_dims: tuple[int, ...], + **kwargs: Any, + ) -> None: + super().__init__(active_dims=active_dims, **kwargs) + self.graph_lookup = graph_lookup + self.n_iter = n_iter + self.normalize = normalize + self._precompute_graph_data() + + def _precompute_graph_data(self) -> None: + """Precompute and cache adjacency matrices and initial node labels.""" + self.adjacency_cache, self.label_cache = graphs_to_tensors( + self.graph_lookup, device=self.device + ) + + def set_graph_lookup(self, graph_lookup: list[nx.Graph]) -> None: + """Update the graph lookup and refresh the cached data.""" + self.graph_lookup = graph_lookup + self._precompute_graph_data() + + def forward( + self, + x1: Tensor, + x2: Tensor, + *, + diag: bool = False, + last_dim_is_batch: bool = False, + **params: Any, + ) -> Tensor: + """Compute kernel matrix containing pairwise similarities between graphs.""" + if last_dim_is_batch: + raise NotImplementedError("Batch dimension handling is not implemented.") + + if x1.ndim == 3: + return self._handle_batched_input(x1=x1, x2=x2, diag=diag) + + indices1, indices2 = self._prepare_indices(x1, x2) + + return compute_kernel( + adjacency_cache=tuple(self.adjacency_cache), + label_cache=tuple(self.label_cache), + indices1=tuple(indices1), + indices2=tuple(indices2), + n_iter=self.n_iter, + diag=diag, + normalize=self.normalize, + ) + + def _handle_batched_input(self, x1: Tensor, x2: Tensor, *, diag: bool) -> Tensor: + """Handle computation for batched input tensors.""" + q_dim_size = x1.shape[0] + assert x2.shape[0] == q_dim_size + + out = torch.empty((q_dim_size, x1.shape[1], x2.shape[1]), device=x1.device) + for q in range(q_dim_size): + out[q] = self.forward(x1[q], x2[q], diag=diag) + return out + + def _prepare_indices(self, x1: Tensor, x2: Tensor) -> tuple[list[int], list[int]]: + """Convert tensor indices to integer lists.""" + indices1 = x1.flatten().to(torch.int64).tolist() + indices2 = x2.flatten().to(torch.int64).tolist() + + # Check for missing graph indices (-1) and handle them + # Explanation: The index `-1` is used as a placeholder for "missing" or "invalid" + # graphs. This can occur when a graph feature is missing or undefined, such as + # during the exploration of new candidates where no corresponding graph is + # available in the `graph_lookup`. The kernel expects non-negative indices, so we + # need to convert `-1` to the index of the last graph in the lookup. + + # Use the last graph in the lookup as a placeholder + last_graph_idx = len(self.graph_lookup) - 1 + + if -1 in indices1: + # Replace any `-1` indices with the index of the last graph. + indices1 = [last_graph_idx if i == -1 else i for i in indices1] + + if -1 in indices2: + # Replace any `-1` indices with the index of the last graph. + indices2 = [last_graph_idx if i == -1 else i for i in indices2] + + return indices1, indices2 + + +class TorchWLKernel(Module): + """A custom implementation of Weisfeiler-Lehman (WL) Kernel in PyTorch. + + The WL Kernel is a graph kernel that measures similarity between graphs based on + their structural properties. It works by iteratively updating node labels based on + their neighborhoods and computing feature vectors from label distributions. + + Args: + n_iter: Number of WL iterations to perform + normalize: bool, optional. Whether to normalize the kernel matrix + + Attributes: + device: torch.device for computation (CPU/GPU) + label_dict: Mapping from node labels to numerical indices + label_counter: Counter for generating new label indices + """ + + def __init__(self, n_iter: int = 5, *, normalize: bool = True) -> None: + super().__init__() + self.n_iter = n_iter + self.normalize = normalize + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Keep track of labels across iterations + self.label_dict: dict[str, int] = {} + self.label_counter: int = 0 + + def _get_node_neighbors(self, adj: Tensor) -> list[list[int]]: + """Extract neighborhood information from adjacency matrix.""" + if adj.layout == torch.sparse_csr: + adj = adj.to_sparse_coo() + + adj = adj.coalesce() + rows, cols = adj.indices() + num_nodes = adj.size(0) + + neighbors: list[list[int]] = [[] for _ in range(num_nodes)] + for row, col in zip(rows.tolist(), cols.tolist(), strict=False): + neighbors[row].append(col) + + return neighbors + + def _wl_iteration(self, adj: Tensor, labels: Tensor) -> Tensor: + """Perform one WL iteration.""" + if not self.label_dict: + # Start new labels after initial ones + self.label_counter = int(labels.max().item()) + 1 + + num_nodes = labels.size(0) + new_labels: list[int] = [] + neighbors = self._get_node_neighbors(adj) + + for node_idx in range(num_nodes): + # Get current node label + node_label = int(labels[node_idx].item()) + neighbor_labels = sorted([int(labels[n].item()) for n in neighbors[node_idx]]) + + credential = f"{node_label},{neighbor_labels}" + + # Update label dictionary + new_labels.append( + self.label_dict.setdefault(credential, len(self.label_dict)) + ) + + return torch.tensor(new_labels, dtype=torch.int64, device=self.device) + + def _compute_feature_vector(self, all_labels: list[list[Tensor]]) -> Tensor: + """Compute the histogram feature vector for all graphs.""" + batch_size = len(all_labels[0]) + features: list[Tensor] = [] + + for iteration_labels in all_labels: + # Find maximum label value across all graphs in this iteration + max_label = int(max(label.max().item() for label in iteration_labels)) + 1 + + iter_features = torch.zeros((batch_size, max_label), device=self.device) + + # Compute label frequencies + for graph_idx, labels in enumerate(iteration_labels): + counts = torch.bincount(labels, minlength=max_label) + iter_features[graph_idx] = counts + + features.append(iter_features) + + return torch.cat(features, dim=1) + + def forward(self, adj_matrices: list[Tensor], label_tensors: list[Tensor]) -> Tensor: + """Compute WL kernel matrix for a list of graphs. + + Args: + adj_matrices: Precomputed sparse adjacency matrices for graphs. + label_tensors: Precomputed node label tensors for graphs. + + Returns: + Kernel matrix containing pairwise graph similarities. + """ + if len(adj_matrices) != len(label_tensors): + raise ValueError("Mismatch between adjacency matrices and label tensors.") + + # Reset label dictionary for new computation + self.label_dict = {} + # Store all label iterations + all_labels: list[list[Tensor]] = [label_tensors] + + # Perform WL iterations + for _ in range(self.n_iter): + new_labels = [ + self._wl_iteration(adj, labels) + for adj, labels in zip(adj_matrices, all_labels[-1], strict=False) + ] + all_labels.append(new_labels) + + # Compute feature vectors and kernel matrix (similarity matrix) + final_features = self._compute_feature_vector(all_labels) + kernel_matrix = torch.mm(final_features, final_features.t()) + + if self.normalize: + diag = torch.sqrt(torch.diag(kernel_matrix)) + kernel_matrix /= torch.outer(diag, diag) + + return kernel_matrix diff --git a/neps/optimizers/models/graphs/optimization.py b/neps/optimizers/models/graphs/optimization.py new file mode 100644 index 00000000..c036f581 --- /dev/null +++ b/neps/optimizers/models/graphs/optimization.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import torch +from botorch.optim import optimize_acqf_mixed + +from neps.optimizers.models.graphs.context_managers import set_graph_lookup +from neps.optimizers.models.graphs.utils import sample_graphs + +if TYPE_CHECKING: + import networkx as nx + from botorch.acquisition import AcquisitionFunction + + +def optimize_acqf_graph( + acq_function: AcquisitionFunction, + bounds: torch.Tensor, + fixed_features_list: list[dict[int, int]] | None = None, + num_graph_samples: int = 10, + train_graphs: list[nx.Graph] | None = None, + num_restarts: int = 10, + raw_samples: int = 1024, + q: int = 1, +) -> tuple[torch.Tensor, nx.Graph, float]: + """Optimize an acquisition function with graph sampling. + + This function optimizes the acquisition function by sampling graphs from the training + set, temporarily updating the kernel's graph lookup, and evaluating the acquisition + function for each sampled graph. The best candidate, the best graph, and its + corresponding acquisition score are returned. + + Args: + acq_function (AcquisitionFunction): The acquisition function to optimize. + bounds (torch.Tensor): A 2 x d tensor of bounds for numerical and categorical + features, where d is the number of features. + fixed_features_list (list[dict[int, float]] | None): A list of dictionaries + specifying fixed categorical feature configurations. Each dictionary maps + feature indices to their fixed values. Defaults to None. + num_graph_samples (int): The number of graphs to sample from the training set. + Defaults to 10. + train_graphs (list[nx.Graph] | None): The original training graphs. If None, a + ValueError is raised. + num_restarts (int): The number of optimization restarts. Defaults to 10. + raw_samples (int): The number of raw samples to generate for optimization. + Defaults to 1024. + q (int): The number of candidates to generate. Defaults to 1. + + Returns: + tuple[torch.Tensor, nx.Graph, float]: A tuple containing the best candidate + (as a tensor), the best graph, and its corresponding acquisition score. + + Raises: + ValueError: If `train_graphs` is None. + """ + if train_graphs is None: + raise ValueError("train_graphs cannot be None.") + + sampled_graphs = sample_graphs(train_graphs, num_samples=num_graph_samples) + + best_candidates, best_graphs, best_scores = [], [], [] + + # Get the index of the graph feature in the bounds + graph_idx = bounds.shape[1] - 1 + + # Todo: Instead of iterating over the graphs, optimize by putting all + # sampled graphs into the kernel and compute the scores in a single batch. + # Update the caching logic accordingly. + for graph in sampled_graphs: + with set_graph_lookup(acq_function.model.covar_module, [graph], append=True): + # Iterate through each fixed feature configuration (if provided) + for fixed_features in fixed_features_list or [{}]: + # Add the graph index to the fixed features, indicating that the last + # graph in the lookup should be used + updated_fixed_features = {**fixed_features, graph_idx: -1.0} + + # Optimize the acquisition function with the updated fixed features + candidates, scores = optimize_acqf_mixed( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=[updated_fixed_features], + num_restarts=num_restarts, + raw_samples=raw_samples, + q=q, + ) + + # Store the candidates, graphs, and their scores + best_candidates.append(candidates) + best_graphs.append(graph) + best_scores.append(scores) + + # Find the index of the best score + best_idx = torch.argmax(torch.tensor(best_scores)) + + # Return the best candidate (without the graph index), the best graph, and its score + return ( + best_candidates[best_idx][:, :-1], + best_graphs[best_idx], + best_scores[best_idx].item(), + ) diff --git a/neps/optimizers/models/graphs/utils.py b/neps/optimizers/models/graphs/utils.py new file mode 100644 index 00000000..05692155 --- /dev/null +++ b/neps/optimizers/models/graphs/utils.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import random + +import networkx as nx +import numpy as np +import torch + + +def seed_all(seed: int = 100) -> None: + """Seed all random generators for reproducibility.""" + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + # Ensure reproducibility with CuDNN (may reduce performance) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + +def min_max_scale(tensor: torch.Tensor) -> torch.Tensor: + """Scale the input tensor to the range [0, 1].""" + min_vals = tensor.min(dim=0, keepdim=True).values + max_vals = tensor.max(dim=0, keepdim=True).values + return (tensor - min_vals) / (max_vals - min_vals) + + +def graphs_to_tensors( + graphs: list[nx.Graph], device: torch.device | None = None +) -> tuple[list[torch.sparse.Tensor], list[torch.Tensor]]: + """Convert a list of NetworkX graphs into sparse adjacency matrices and label tensors. + + Args: + graphs (List[nx.Graph]): A list of NetworkX graphs. + device (torch.device | None): The device to place the tensors on. + Default is CPU. + + Returns: + Tuple[List[torch.sparse.Tensor], List[torch.Tensor]]: + A tuple containing: + - A list of sparse adjacency matrices. + - A list of label tensors. + """ + if device is None: + device = torch.device("cpu") + + adjacency_matrices = [] + label_tensors = [] + + # Create a consistent label mapping across all graphs + label_dict: dict[str, int] = {} + label_counter: int = 0 + + for graph in graphs: + # Create adjacency matrix + edges = list(graph.edges()) + num_nodes = graph.number_of_nodes() + + if not edges: + adj = torch.sparse_coo_tensor( + indices=torch.empty((2, 0), dtype=torch.long), + values=torch.empty(0), + size=(num_nodes, num_nodes), + device=device, + ).to_sparse_csr() + else: + edge_indices = edges + [(v, u) for u, v in edges] + rows, cols = zip(*edge_indices, strict=False) + indices = torch.tensor([rows, cols], dtype=torch.long, device=device) + values = torch.ones(len(edge_indices), dtype=torch.float, device=device) + adj = torch.sparse_coo_tensor( + indices, values, (num_nodes, num_nodes), device=device + ).to_sparse_csr() + + adjacency_matrices.append(adj) + + # Create label tensor + node_labels: list[int] = [] + for node in range(graph.number_of_nodes()): + if "label" in graph.nodes[node]: + label = graph.nodes[node]["label"] + if label not in label_dict: + label_dict[label] = label_counter + label_counter += 1 + node_labels.append(label_dict[label]) + else: + node_labels.append(node) + + label_tensors.append(torch.tensor(node_labels, dtype=torch.long, device=device)) + + return adjacency_matrices, label_tensors + + +def sample_graphs(graphs: list[nx.Graph], num_samples: int) -> list[nx.Graph]: + """Sample graphs using random walks or edge modifications. + + Args: + graphs (list[nx.Graph]): Existing training graphs. + num_samples (int): Number of graph samples to generate. + + Returns: + list[nx.Graph]: Sampled graphs. + """ + sampled_graphs = [] + for _ in range(num_samples): + base_graph = random.choice(graphs) + sampled_graph = base_graph.copy() + + # More aggressive modifications + num_modifications = random.randint(2, 5) # Increase minimum modifications + for _ in range(num_modifications): + if random.random() > 0.3: # 70% chance to add edge + nodes = list(sampled_graph.nodes) + if len(nodes) >= 2: + u, v = random.sample(nodes, 2) + if not sampled_graph.has_edge(u, v): + sampled_graph.add_edge(u, v) + elif sampled_graph.edges: # 30% chance to remove edge + u, v = random.choice(list(sampled_graph.edges)) + sampled_graph.remove_edge(u, v) + + # Ensure the graph stays connected + if not nx.is_connected(sampled_graph): + components = list(nx.connected_components(sampled_graph)) + for i in range(len(components) - 1): + u = random.choice(list(components[i])) + v = random.choice(list(components[i + 1])) + sampled_graph.add_edge(u, v) + + sampled_graphs.append(sampled_graph) + + return sampled_graphs diff --git a/pyproject.toml b/pyproject.toml index a1ce332e..3824542a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,7 @@ dev = [ "mkdocs-literate-nav", "mike", "black", # This allows mkdocstrings to format signatures in the docs + "grakel==0.1.10", ] [tool.setuptools.packages.find] diff --git a/tests/test_graphs/__init__.py b/tests/test_graphs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_graphs/test_botorch_wl_kernel.py b/tests/test_graphs/test_botorch_wl_kernel.py new file mode 100644 index 00000000..2ded6237 --- /dev/null +++ b/tests/test_graphs/test_botorch_wl_kernel.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import networkx as nx +import pytest +import torch +from botorch.models.gp_regression_mixed import Kernel + +from neps.optimizers.models.graphs.kernels import BoTorchWLKernel + + +def create_simple_graphs(num_graphs: int) -> list[nx.Graph]: + """Helper function to create a list of graphs.""" + graphs = [] + for _i in range(num_graphs): + G = nx.Graph() + G.add_nodes_from([0, 1, 2]) + G.add_edges_from([(0, 1), (1, 2)]) + graphs.append(G) + return graphs + + +class TestBoTorchWLKernel: + @pytest.fixture + def simple_graphs(self) -> list[nx.Graph]: + return create_simple_graphs(3) + + @pytest.fixture + def wl_kernel(self, simple_graphs: list[nx.Graph]) -> BoTorchWLKernel: + return BoTorchWLKernel( + graph_lookup=simple_graphs, + n_iter=2, + normalize=True, + active_dims=(0,), + ) + + def test_initialization( + self, wl_kernel: BoTorchWLKernel, simple_graphs: list[nx.Graph] + ) -> None: + """Test that the kernel is initialized correctly.""" + assert isinstance(wl_kernel, Kernel) + assert len(wl_kernel.graph_lookup) == len(simple_graphs) + assert wl_kernel.n_iter == 2 + assert wl_kernel.normalize is True + assert torch.equal(wl_kernel.active_dims, torch.tensor([0])) + + def test_precompute_graph_data(self, wl_kernel: BoTorchWLKernel) -> None: + """Test that graph data is precomputed correctly.""" + assert hasattr(wl_kernel, "adjacency_cache") + assert hasattr(wl_kernel, "label_cache") + assert len(wl_kernel.adjacency_cache) == len(wl_kernel.graph_lookup) + assert len(wl_kernel.label_cache) == len(wl_kernel.graph_lookup) + + def test_set_graph_lookup(self, wl_kernel: BoTorchWLKernel) -> None: + """Test that the graph lookup can be updated.""" + new_graphs = create_simple_graphs(2) + wl_kernel.set_graph_lookup(new_graphs) + assert len(wl_kernel.graph_lookup) == 2 + assert len(wl_kernel.adjacency_cache) == 2 + assert len(wl_kernel.label_cache) == 2 + + def test_forward_self_kernel(self, wl_kernel: BoTorchWLKernel) -> None: + """Test the kernel computation for self-similarity.""" + x = torch.tensor([[0], [1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x, x) + assert K.shape == (3, 3) # Kernel matrix should be 3x3 + assert torch.allclose(K, K.T) # Kernel matrix should be symmetric + + def test_forward_cross_kernel(self, wl_kernel: BoTorchWLKernel) -> None: + """Test the kernel computation for cross-similarity.""" + x1 = torch.tensor([[0], [1]], dtype=torch.float64) + x2 = torch.tensor([[1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x1, x2) + assert K.shape == (2, 2) # Kernel matrix should be 2x2 + + def test_forward_diagonal(self, wl_kernel: BoTorchWLKernel) -> None: + """Test the kernel computation for diagonal only.""" + x = torch.tensor([[0], [1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x, x, diag=True) + assert K.shape == (3,) # Diagonal should be a vector of length 3 + + def test_handle_negative_one_index(self, wl_kernel: BoTorchWLKernel) -> None: + """Test the handling of the -1 index.""" + x = torch.tensor([[-1], [0], [1]], dtype=torch.float64) + K = wl_kernel.forward(x, x) + assert K.shape == (3, 3) # Kernel matrix should be 3x3 + # Ensure that -1 refers to the last graph + last_graph_idx = len(wl_kernel.graph_lookup) - 1 + assert torch.allclose(K[0, 0], K[last_graph_idx, last_graph_idx]) + + def test_forward_batched_input(self, wl_kernel: BoTorchWLKernel) -> None: + """Test the kernel computation for batched input.""" + x1 = torch.tensor([[[0], [1]], [[1], [2]]], dtype=torch.float64) + x2 = torch.tensor([[[1], [2]], [[0], [1]]], dtype=torch.float64) + K = wl_kernel.forward(x1, x2) + assert K.shape == (2, 2, 2) # Batched kernel matrix should be 2x2x2 + + def test_forward_invalid_input(self, wl_kernel: BoTorchWLKernel) -> None: + """Test that invalid input raises an error.""" + x1 = torch.tensor([[0], [1], [2]], dtype=torch.float64) + x2 = torch.tensor([[0], [1]], dtype=torch.float64) + with pytest.raises(NotImplementedError): + wl_kernel.forward(x1, x2, last_dim_is_batch=True) diff --git a/tests/test_graphs/test_optimization_over_graphs.py b/tests/test_graphs/test_optimization_over_graphs.py new file mode 100644 index 00000000..958031af --- /dev/null +++ b/tests/test_graphs/test_optimization_over_graphs.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +from itertools import product + +import networkx as nx +import pytest +import torch +from botorch import fit_gpytorch_mll +from botorch.acquisition import LinearMCObjective, qLogNoisyExpectedImprovement +from botorch.models import SingleTaskGP +from botorch.models.kernels import CategoricalKernel +from gpytorch import ExactMarginalLogLikelihood +from gpytorch.kernels import AdditiveKernel, MaternKernel, ScaleKernel + +from neps.optimizers.models.graphs.context_managers import set_graph_lookup +from neps.optimizers.models.graphs.kernels import BoTorchWLKernel +from neps.optimizers.models.graphs.optimization import optimize_acqf_graph, sample_graphs +from neps.optimizers.models.graphs.utils import min_max_scale + + +class TestGraphOptimizationPipeline: + @pytest.fixture + def setup_data(self) -> dict: + """Fixture to set up common data for tests.""" + TRAIN_CONFIGS = 50 + TEST_CONFIGS = 10 + TOTAL_CONFIGS = TRAIN_CONFIGS + TEST_CONFIGS + + N_NUMERICAL = 2 + N_CATEGORICAL = 1 + N_CATEGORICAL_VALUES_PER_CATEGORY = 2 + N_GRAPH = 1 + + # Generate random data + X = torch.cat( + [ + torch.rand((TOTAL_CONFIGS, N_NUMERICAL), dtype=torch.float64), + torch.randint( + 0, + N_CATEGORICAL_VALUES_PER_CATEGORY, + (TOTAL_CONFIGS, N_CATEGORICAL), + dtype=torch.float64, + ), + torch.arange(TOTAL_CONFIGS, dtype=torch.float64).unsqueeze(1), + ], + dim=1, + ) + + # Generate random graphs + graphs = [nx.erdos_renyi_graph(5, 0.5) for _ in range(TOTAL_CONFIGS)] + + # Generate random target values + y = torch.rand(TOTAL_CONFIGS, dtype=torch.float64) + 0.5 + + # Split into train and test sets + train_x, test_x = X[:TRAIN_CONFIGS], X[TRAIN_CONFIGS:] + train_graphs, test_graphs = graphs[:TRAIN_CONFIGS], graphs[TRAIN_CONFIGS:] + train_y, test_y = y[:TRAIN_CONFIGS].unsqueeze(-1), y[TRAIN_CONFIGS:].unsqueeze(-1) + + # Scale the data + train_x, test_x = min_max_scale(train_x), min_max_scale(test_x) + + return { + "train_x": train_x, + "test_x": test_x, + "train_graphs": train_graphs, + "test_graphs": test_graphs, + "train_y": train_y, + "test_y": test_y, + "N_NUMERICAL": N_NUMERICAL, + "N_CATEGORICAL": N_CATEGORICAL, + "N_CATEGORICAL_VALUES_PER_CATEGORY": N_CATEGORICAL_VALUES_PER_CATEGORY, + "N_GRAPH": N_GRAPH, + } + + def test_gp_fit_and_predict(self, setup_data: dict) -> None: + """Test fitting the GP and making predictions.""" + train_x = setup_data["train_x"] + train_y = setup_data["train_y"] + test_x = setup_data["test_x"] + train_graphs = setup_data["train_graphs"] + setup_data["test_graphs"] + + # Define the kernels + kernels = [ + ScaleKernel( + MaternKernel( + nu=2.5, + ard_num_dims=setup_data["N_NUMERICAL"], + active_dims=range(setup_data["N_NUMERICAL"]), + ) + ), + ScaleKernel( + CategoricalKernel( + ard_num_dims=setup_data["N_CATEGORICAL"], + active_dims=range( + setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + setup_data["N_CATEGORICAL"], + ), + ) + ), + ScaleKernel( + BoTorchWLKernel( + graph_lookup=train_graphs, + n_iter=5, + normalize=True, + active_dims=(train_x.shape[1] - 1,), + ) + ), + ] + + # Create the GP model + gp = SingleTaskGP( + train_X=train_x, train_Y=train_y, covar_module=AdditiveKernel(*kernels) + ) + + # Fit the GP + mll = ExactMarginalLogLikelihood(gp.likelihood, gp) + fit_gpytorch_mll(mll) + + # Make predictions on the test set + with torch.no_grad(): + posterior = gp.forward(test_x) + predictions = posterior.mean + uncertainties = posterior.variance.sqrt() + + # Ensure predictions are in the correct shape (10, 1) + predictions = predictions.unsqueeze(-1) # Reshape to (10, 1) + + # Basic checks + assert predictions.shape == (setup_data["test_x"].shape[0], 1) + assert uncertainties.shape == (setup_data["test_x"].shape[0],) + + def test_acquisition_function_optimization(self, setup_data: dict) -> None: + """Test optimizing the acquisition function with graph sampling.""" + train_x = setup_data["train_x"] + train_y = setup_data["train_y"] + train_graphs = setup_data["train_graphs"] + + # Define the kernels + kernels = [ + ScaleKernel( + MaternKernel( + nu=2.5, + ard_num_dims=setup_data["N_NUMERICAL"], + active_dims=range(setup_data["N_NUMERICAL"]), + ) + ), + ScaleKernel( + CategoricalKernel( + ard_num_dims=setup_data["N_CATEGORICAL"], + active_dims=range( + setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + setup_data["N_CATEGORICAL"], + ), + ) + ), + ScaleKernel( + BoTorchWLKernel( + graph_lookup=train_graphs, + n_iter=5, + normalize=True, + active_dims=(train_x.shape[1] - 1,), + ) + ), + ] + + # Create the GP model + gp = SingleTaskGP( + train_X=train_x, train_Y=train_y, covar_module=AdditiveKernel(*kernels) + ) + + # Fit the GP + mll = ExactMarginalLogLikelihood(gp.likelihood, gp) + fit_gpytorch_mll(mll) + + # Define the acquisition function + acq_function = qLogNoisyExpectedImprovement( + model=gp, + X_baseline=train_x, + objective=LinearMCObjective(weights=torch.tensor([-1.0])), + prune_baseline=True, + ) + + # Define bounds for optimization + bounds = torch.tensor( + [ + [0.0] * setup_data["N_NUMERICAL"] + + [0.0] * setup_data["N_CATEGORICAL"] + + [-1.0] * setup_data["N_GRAPH"], + [1.0] * setup_data["N_NUMERICAL"] + + [float(setup_data["N_CATEGORICAL_VALUES_PER_CATEGORY"] - 1)] + * setup_data["N_CATEGORICAL"] + + [len(train_x) - 1] * setup_data["N_GRAPH"], + ] + ) + + # Define fixed categorical features + cats_per_column = { + i: list(range(setup_data["N_CATEGORICAL_VALUES_PER_CATEGORY"])) + for i in range( + setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + setup_data["N_CATEGORICAL"], + ) + } + fixed_cats = [ + dict(zip(cats_per_column.keys(), combo, strict=False)) + for combo in product(*cats_per_column.values()) + ] + + # Optimize the acquisition function + best_candidate, best_graph, best_score = optimize_acqf_graph( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=fixed_cats, + train_graphs=train_graphs, + num_graph_samples=2, + num_restarts=2, + raw_samples=16, + q=1, + ) + + # Assertions for the acquisition function optimization + assert isinstance(best_candidate, torch.Tensor), ( + "Best candidate should be a tensor" + ) + assert best_candidate.shape == (1, train_x.shape[1] - 1), ( + "Best candidate should have the correct shape (excluding the graph index)" + ) + assert isinstance(best_graph, nx.Graph), "Best graph should be a NetworkX graph" + assert isinstance(best_score, float), "Best score should be a float" + + # Ensure the best candidate does not contain the graph index column + assert best_candidate.shape[1] == train_x.shape[1] - 1, ( + "Best candidate should not include the graph index column" + ) + + def test_graph_sampling(self, setup_data: dict) -> None: + """Test the graph sampling functionality.""" + train_graphs = setup_data["train_graphs"] + num_samples = 5 + + # Sample graphs + sampled_graphs = sample_graphs(train_graphs, num_samples=num_samples) + + # Basic checks + assert len(sampled_graphs) == num_samples, ( + f"Expected {num_samples} sampled graphs, got {len(sampled_graphs)}" + ) + assert all(isinstance(graph, nx.Graph) for graph in sampled_graphs), ( + "All sampled graphs should be NetworkX graphs" + ) + assert all(nx.is_connected(graph) for graph in sampled_graphs), ( + "All sampled graphs should be connected" + ) + + def test_min_max_scaling(self, setup_data: dict) -> None: + """Test the min-max scaling utility.""" + train_x = setup_data["train_x"] + + # Apply min-max scaling + scaled_train_x = min_max_scale(train_x) + + # Assertions for min-max scaling + assert torch.all(scaled_train_x >= 0), "Scaled values should be >= 0" + assert torch.all(scaled_train_x <= 1), "Scaled values should be <= 1" + assert scaled_train_x.shape == train_x.shape, ( + "Scaled data should have the same shape as the input data" + ) + + # Check that the scaling is correct + for i in range(train_x.shape[1]): + col_min = torch.min(train_x[:, i]) + col_max = torch.max(train_x[:, i]) + if col_min != col_max: # Avoid division by zero + expected_scaled_col = (train_x[:, i] - col_min) / (col_max - col_min) + assert torch.allclose(scaled_train_x[:, i], expected_scaled_col), ( + f"Scaling is incorrect for column {i}" + ) + + def test_set_graph_lookup(self, setup_data: dict) -> None: + """Test the set_graph_lookup context manager.""" + train_graphs = setup_data["train_graphs"] + test_graphs = setup_data["test_graphs"] + + # Define the kernel + kernel = BoTorchWLKernel( + graph_lookup=train_graphs, n_iter=5, normalize=True, active_dims=(0,) + ) + + # Use the context manager to temporarily set the graph lookup + with set_graph_lookup(kernel, test_graphs, append=True): + assert len(kernel.graph_lookup) == len(train_graphs) + len(test_graphs) + + # Check that the original graph lookup is restored + assert len(kernel.graph_lookup) == len(train_graphs) diff --git a/tests/test_graphs/test_torch_wl_kernel.py b/tests/test_graphs/test_torch_wl_kernel.py new file mode 100644 index 00000000..3e2f3c1f --- /dev/null +++ b/tests/test_graphs/test_torch_wl_kernel.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import networkx as nx +import numpy as np +import pytest +import torch +from grakel import WeisfeilerLehman, graph_from_networkx + +from neps.optimizers.models.graphs.kernels import TorchWLKernel +from neps.optimizers.models.graphs.utils import graphs_to_tensors + + +class TestTorchWLKernel: + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + @pytest.fixture + def example_graphs_set(self) -> list[nx.Graph]: + # Create example graphs for testing + G1 = nx.Graph() + G1.add_edges_from([(0, 1), (1, 2), (1, 3), (2, 3), (3, 4)]) + for node in G1.nodes(): + G1.nodes[node]["label"] = str(node) + + G2 = nx.Graph() + G2.add_edges_from([(0, 1), (1, 2), (3, 4), (4, 0)]) + for node in G2.nodes(): + G2.nodes[node]["label"] = str(node) + + G3 = nx.Graph() + G3.add_edges_from([(0, 1), (1, 3), (3, 2), (2, 4), (4, 0), (1, 2)]) + for node in G3.nodes(): + G3.nodes[node]["label"] = str(node) + + return [G1, G2, G3] + + @pytest.fixture + def random_graphs_sets(self) -> list[list[nx.Graph]]: + # Set a seed for reproducibility + seed = 100 + np.random.seed(seed) + torch.manual_seed(seed) + random_graph_sets = [] + + # Generate 10 random sets of graphs + for _ in range(10): + # Number of graphs in the set (2 to 10) + num_graphs = np.random.randint(2, 11) + graph_set = [] + + for _ in range(num_graphs): + # Number of nodes in the graph (3 to 50) + num_nodes = np.random.randint(3, 51) + G = nx.Graph() + + # Add nodes with labels + for node in range(num_nodes): + G.add_node(node, label=str(node)) + + # Add random edges + for u in range(num_nodes): + for v in range(u + 1, num_nodes): + if np.random.rand() > 0.5: # 50% chance to add an edge + G.add_edge(u, v) + + graph_set.append(G) + + random_graph_sets.append(graph_set) + + return random_graph_sets + + @pytest.mark.parametrize("n_iter", [1, 2, 3, 5, 10]) + @pytest.mark.parametrize("normalize", [True, False]) + def test_wl_kernel_against_grakel( + self, n_iter: int, normalize: bool, random_graphs_sets: list[list[nx.Graph]] + ) -> None: + for graph_set in random_graphs_sets: + adjacency_matrices, label_tensors = graphs_to_tensors( + graph_set, device=self.device + ) + + # Initialize Torch WL Kernel + torch_kernel = TorchWLKernel(n_iter=n_iter, normalize=normalize) + torch_kernel_matrix = ( + torch_kernel(adjacency_matrices, label_tensors).cpu().numpy() + ) + + # Initialize GraKel WL Kernel + grakel_graphs = list( + graph_from_networkx(graph_set, node_labels_tag="label", as_Graph=True) + ) + grakel_kernel = WeisfeilerLehman(n_iter=n_iter, normalize=normalize) + grakel_kernel_matrix = grakel_kernel.fit_transform(grakel_graphs) + + # Compare the kernel matrices + np.testing.assert_allclose( + torch_kernel_matrix, + grakel_kernel_matrix, + rtol=1e-5, + atol=1e-8, + err_msg=f"Kernel matrices differ for graph={graph_set}, n_iter={n_iter}", + ) + + def test_empty_graph(self) -> None: + G_empty = nx.Graph() + G_empty.add_node(0) + G_empty.nodes[0]["label"] = "0" + + adjacency_matrices, label_tensors = graphs_to_tensors( + [G_empty], device=self.device + ) + + # Initialize kernel and compute + kernel = TorchWLKernel(n_iter=3, normalize=True) + kernel_matrix = kernel(adjacency_matrices, label_tensors) + + # For a single graph, should get a 1x1 matrix with value 1.0 + expected = torch.ones(1, 1, device=self.device) + torch.testing.assert_close(kernel_matrix, expected) + + def test_invalid_input(self) -> None: + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + + with pytest.raises( + ValueError, match="Mismatch between adjacency matrices and label tensors" + ): + wl_kernel([], [torch.tensor([0])]) + + def test_kernel_on_single_node_graph(self) -> None: + G_single = nx.Graph() + G_single.add_node(0) + G_single.nodes[0]["label"] = "0" + + adjacency_matrices, label_tensors = graphs_to_tensors( + [G_single], device=self.device + ) + + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + expected = torch.ones(1, 1, device=self.device) + torch.testing.assert_close(K, expected) + + def test_wl_kernel_with_empty_graph_and_reordered_edges( + self, random_graphs_sets: list[list[nx.Graph]] + ) -> None: + """Test the TorchWLKernel with an empty graph and a graph with reordered edges.""" + for graph_set in random_graphs_sets: + # Create an empty graph + G_empty = nx.Graph() + G_empty.add_node(0) + G_empty.nodes[0]["label"] = "0" + + # Select the first graph from the set to reorder its edges + G = graph_set[0] + G_reordered = nx.Graph() + + # Add all nodes from the original graph to G_reordered + for node in G.nodes(): + G_reordered.add_node(node, label=G.nodes[node]["label"]) + + # Reorder edges randomly + edges = list(G.edges()) + np.random.shuffle(edges) # Randomly shuffle the edges + G_reordered.add_edges_from(edges) + + # Combine the empty graph, original graph, and reordered graph + graphs = [G_empty, G, G_reordered] + adjacency_matrices, label_tensors = graphs_to_tensors( + graphs, device=self.device + ) + + # Initialize and compute the kernel + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + assert K.shape == (3, 3), "Kernel matrix shape is incorrect" + assert torch.allclose(K[1, 1], K[2, 2]), ( + "Kernel value for original and reordered graphs should be the same" + ) + + @pytest.mark.parametrize("n_iter", [1, 2, 3, 4, 5, 6, 7]) + @pytest.mark.parametrize("normalize", [True, False]) + def test_wl_kernel_with_different_node_labels( + self, n_iter: int, normalize: bool, example_graphs_set: list[nx.Graph] + ) -> None: + graphs = [] + for i, G in enumerate(example_graphs_set): + G_copy = G.copy() + prefix = ["node_", "vertex_", "n"][i] + for node in G_copy.nodes(): + G_copy.nodes[node]["label"] = f"{prefix}{node}" + graphs.append(G_copy) + + adjacency_matrices, label_tensors = graphs_to_tensors(graphs, device=self.device) + + wl_kernel = TorchWLKernel(n_iter=n_iter, normalize=normalize) + torch_kernel_matrix = wl_kernel(adjacency_matrices, label_tensors).cpu().numpy() + + grakel_graphs = graph_from_networkx(graphs, node_labels_tag="label") + grakel_wl = WeisfeilerLehman(n_iter=n_iter, normalize=normalize) + grakel_kernel_matrix = grakel_wl.fit_transform(grakel_graphs) + + np.testing.assert_allclose( + torch_kernel_matrix, + grakel_kernel_matrix, + rtol=1e-5, + atol=1e-8, + err_msg=f"Kernel matrices differ for n_iter={n_iter}, normalize={normalize}", + ) + + def test_wl_kernel_with_same_node_labels( + self, example_graphs_set: list[nx.Graph] + ) -> None: + """Test WL kernel behavior with same node labels but different structures. + + Even when all nodes have the same label, the WL kernel should: + 1. Produce a symmetric matrix + 2. Have 1.0 on the diagonal (self-similarity) + 3. Have off-diagonal values less than 1.0 (different structures) + 4. Maintain non-negative values (it's a valid kernel) + """ + graphs = [] + for G in example_graphs_set: + G_copy = G.copy() + for node in G_copy.nodes(): + G_copy.nodes[node]["label"] = "A" + graphs.append(G_copy) + + adjacency_matrices, label_tensors = graphs_to_tensors(graphs, device=self.device) + + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + # Check basic properties + assert K.shape == (3, 3), "Kernel matrix shape is incorrect" + assert torch.allclose(K, K.T, atol=1e-4), "Kernel matrix is not symmetric" + + # Check diagonal elements are 1 (normalized self-similarity) + assert torch.allclose(torch.diag(K), torch.ones_like(torch.diag(K)), atol=1e-4), ( + "Diagonal elements should be 1.0" + ) + + # Check off-diagonal elements are less than 1 (different structures) + off_diag_mask = ~torch.eye(K.shape[0], dtype=torch.bool, device=self.device) + assert torch.all(K[off_diag_mask] < 1.0), ( + "Off-diagonal elements should be less than 1.0 for different structures" + ) + + # Check all elements are non-negative (valid kernel) + assert torch.all(K >= 0), "Kernel values should be non-negative"