diff --git a/neps/graphs/__init__.py b/neps/graphs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neps/graphs/context_managers.py b/neps/graphs/context_managers.py new file mode 100644 index 00000000..3a00b186 --- /dev/null +++ b/neps/graphs/context_managers.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from typing import TYPE_CHECKING + +from botorch.models import SingleTaskGP + +from neps.graphs.kernels import BoTorchWLKernel + +if TYPE_CHECKING: + import networkx as nx + from botorch.models.gp_regression_mixed import Kernel + + +@contextmanager +def set_graph_lookup( + kernel_or_gp: Kernel | SingleTaskGP, + new_graphs: list[nx.Graph], + *, + append: bool = True, +) -> Iterator[None]: + """Context manager to temporarily set the graph lookup for a kernel or GP model. + + Args: + kernel_or_gp (Kernel | SingleTaskGP): The kernel or GP model whose graph lookup is + to be set. + new_graphs (list[nx.Graph]): The new graphs to set in the graph lookup. + append (bool, optional): Whether to append the new graphs to the existing graph + lookup. Defaults to True. + """ + kernel_prev_graphs: list[tuple[Kernel, list[nx.Graph]]] = [] + + # Determine the modules to update based on the input type + if isinstance(kernel_or_gp, SingleTaskGP): + modules = [k for k in kernel_or_gp.covar_module.sub_kernels() if + isinstance(k, BoTorchWLKernel)] + elif isinstance(kernel_or_gp, BoTorchWLKernel): + modules = [kernel_or_gp] + else: + assert hasattr(kernel_or_gp, + "sub_kernels"), "Kernel module must have sub_kernels method." + modules = [k for k in kernel_or_gp.sub_kernels() if + isinstance(k, BoTorchWLKernel)] + + # Save the current graph lookup and set the new graph lookup + for kern in modules: + kernel_prev_graphs.append((kern, kern.graph_lookup)) + if append: + kern.set_graph_lookup([*kern.graph_lookup, *new_graphs]) + else: + kern.set_graph_lookup(new_graphs) + + yield + + # Restore the original graph lookup after the context manager exits + for kern, prev_graphs in kernel_prev_graphs: + kern.set_graph_lookup(prev_graphs) diff --git a/neps/graphs/examples/grakel_wl_usage_example.py b/neps/graphs/examples/grakel_wl_usage_example.py new file mode 100644 index 00000000..c5e65187 --- /dev/null +++ b/neps/graphs/examples/grakel_wl_usage_example.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import matplotlib.pyplot as plt +import networkx as nx +from grakel import WeisfeilerLehman, graph_from_networkx + + +def visualize_graph(G): + """Visualize the NetworkX graph.""" + pos = nx.spring_layout(G) + nx.draw(G, pos, with_labels=True, node_size=700, node_color="lightblue") + plt.show() + + +def add_labels(G): + """Add labels to the nodes of the graph.""" + for node in G.nodes(): + G.nodes[node]["label"] = str(node) + + +# Create graphs +G1 = nx.Graph() +G1.add_edges_from([(0, 1), (1, 2), (1, 3), (1, 4), (2, 3)]) +add_labels(G1) + +G2 = nx.Graph() +G2.add_edges_from([(0, 1), (1, 2), (2, 3), (3, 4)]) +add_labels(G2) + +G3 = nx.Graph() +G3.add_edges_from([(0, 1), (1, 3), (3, 2)]) +add_labels(G3) + +# Visualize the graphs +visualize_graph(G1) +visualize_graph(G2) +visualize_graph(G3) + +# Convert NetworkX graphs to Grakel format using graph_from_networkx +graph_list = list( + graph_from_networkx([G1, G2, G3], node_labels_tag="label", as_Graph=True) +) + +# Initialize the Weisfeiler-Lehman kernel +wl_kernel = WeisfeilerLehman(n_iter=5, normalize=False) + +# Compute the kernel matrix +K = wl_kernel.fit_transform(graph_list) + +# Display the kernel matrix +print("Fit and Transform on Kernel matrix (pairwise similarities):") +print(K) diff --git a/neps/graphs/examples/single_task_gp_usage_example.py b/neps/graphs/examples/single_task_gp_usage_example.py new file mode 100644 index 00000000..7314c88c --- /dev/null +++ b/neps/graphs/examples/single_task_gp_usage_example.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from itertools import product +from typing import TYPE_CHECKING + +import torch +from botorch import fit_gpytorch_mll +from botorch.acquisition import LinearMCObjective, qLogNoisyExpectedImprovement +from botorch.models import SingleTaskGP +from botorch.models.gp_regression_mixed import CategoricalKernel, ScaleKernel +from botorch.optim import optimize_acqf_mixed +from gpytorch import ExactMarginalLogLikelihood +from gpytorch.kernels import AdditiveKernel, MaternKernel + +if TYPE_CHECKING: + from gpytorch.distributions.multivariate_normal import MultivariateNormal + +TRAIN_CONFIGS = 10 +TEST_CONFIGS = 10 +TOTAL_CONFIGS = TRAIN_CONFIGS + TEST_CONFIGS + +N_NUMERICAL = 2 +N_CATEGORICAL = 2 +N_CATEGORICAL_VALUES_PER_CATEGORY = 3 + +kernels = [] + +# Create some random encoded hyperparameter configurations +X = torch.empty(size=(TOTAL_CONFIGS, N_NUMERICAL + N_CATEGORICAL), dtype=torch.float64) +if N_NUMERICAL > 0: + X[:, :N_NUMERICAL] = torch.rand( + size=(TOTAL_CONFIGS, N_NUMERICAL), + dtype=torch.float64, + ) + +if N_CATEGORICAL > 0: + X[:, N_NUMERICAL:] = torch.randint( + 0, + N_CATEGORICAL_VALUES_PER_CATEGORY, + size=(TOTAL_CONFIGS, N_CATEGORICAL), + dtype=torch.float64, + ) + +y = torch.rand(size=(TOTAL_CONFIGS,), dtype=torch.float64) + +if N_NUMERICAL > 0: + matern = ScaleKernel( + MaternKernel( + nu=2.5, + ard_num_dims=N_NUMERICAL, + active_dims=tuple(range(N_NUMERICAL)), + ), + ) + kernels.append(matern) + +if N_CATEGORICAL > 0: + hamming = ScaleKernel( + CategoricalKernel( + ard_num_dims=N_CATEGORICAL, + active_dims=tuple(range(N_NUMERICAL, N_NUMERICAL + N_CATEGORICAL)), + ), + ) + kernels.append(hamming) + +combined_num_cat_kernel = AdditiveKernel(*kernels) + +train_x = X[:TRAIN_CONFIGS] +train_y = y[:TRAIN_CONFIGS] + +test_x = X[TRAIN_CONFIGS:] +test_y = y[TRAIN_CONFIGS:] + +K_matrix = combined_num_cat_kernel.forward(train_x, train_x) + +train_y = train_y.unsqueeze(-1) +test_y = test_y.unsqueeze(-1) + +gp = SingleTaskGP( + train_X=train_x, + train_Y=train_y, + covar_module=combined_num_cat_kernel, +) + +multivariate_normal: MultivariateNormal = gp.forward(train_x) + +# =============== Fitting the GP using botorch =============== + +print("\nFitting the GP model using botorch...") + +mll = ExactMarginalLogLikelihood(gp.likelihood, gp) +fit_gpytorch_mll(mll) + +acq_function = qLogNoisyExpectedImprovement( + model=gp, + X_baseline=train_x, + objective=LinearMCObjective(weights=torch.tensor([-1.0])), + prune_baseline=True, +) + +# Define bounds +bounds = torch.tensor( + [ + [0.0] * N_NUMERICAL + [0.0] * N_CATEGORICAL, + [1.0] * N_NUMERICAL + [ + float(N_CATEGORICAL_VALUES_PER_CATEGORY - 1)] * N_CATEGORICAL + ] +) + +# Setup categorical feature optimization +cats_per_column: dict[int, list[float]] = { + column_ix: [float(i) for i in range(N_CATEGORICAL_VALUES_PER_CATEGORY)] + for column_ix in range(N_NUMERICAL, N_NUMERICAL + N_CATEGORICAL) +} + +# Generate fixed categorical features +fixed_cats: list[dict[int, float]] +if len(cats_per_column) == 1: + col, choice_indices = next(iter(cats_per_column.items())) + fixed_cats = [{col: i} for i in choice_indices] +else: + fixed_cats = [ + dict(zip(cats_per_column.keys(), combo, strict=False)) + for combo in product(*cats_per_column.values()) + ] + +best_candidate, best_score = optimize_acqf_mixed( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=fixed_cats, + num_restarts=10, + raw_samples=10, + q=1, +) + +print("Best candidate:", best_candidate) +print("Acquisition score:", best_score) diff --git a/neps/graphs/graph_aware_gp_optimization_example.py b/neps/graphs/graph_aware_gp_optimization_example.py new file mode 100644 index 00000000..e57819fb --- /dev/null +++ b/neps/graphs/graph_aware_gp_optimization_example.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import time +from itertools import product +from typing import TYPE_CHECKING + +import networkx as nx +import torch +from botorch import fit_gpytorch_mll, settings +from botorch.acquisition import LinearMCObjective, qLogNoisyExpectedImprovement +from botorch.models import SingleTaskGP +from botorch.models.gp_regression_mixed import CategoricalKernel, ScaleKernel +from gpytorch import ExactMarginalLogLikelihood +from gpytorch.kernels import AdditiveKernel, MaternKernel + +from neps.graphs.context_managers import set_graph_lookup +from neps.graphs.kernels import BoTorchWLKernel, TorchWLKernel +from neps.graphs.optimization import optimize_acqf_graph +from neps.graphs.utils import min_max_scale, seed_all + +if TYPE_CHECKING: + from gpytorch.distributions.multivariate_normal import MultivariateNormal + +start_time = time.time() +settings.debug._set_state(True) +seed_all() + +TRAIN_CONFIGS = 50 +TEST_CONFIGS = 10 +TOTAL_CONFIGS = TRAIN_CONFIGS + TEST_CONFIGS + +N_NUMERICAL = 2 +N_CATEGORICAL = 1 +N_CATEGORICAL_VALUES_PER_CATEGORY = 2 +N_GRAPH = 1 + +assert N_GRAPH == 1, "This example only supports a single graph feature" + +# Generate random data +X = torch.cat([ + torch.rand((TOTAL_CONFIGS, N_NUMERICAL), dtype=torch.float64), + torch.randint(0, N_CATEGORICAL_VALUES_PER_CATEGORY, (TOTAL_CONFIGS, N_CATEGORICAL), + dtype=torch.float64), + torch.arange(TOTAL_CONFIGS, dtype=torch.float64).unsqueeze(1) +], dim=1) + +# Generate random graphs +graphs = [nx.erdos_renyi_graph(5, 0.5) for _ in range(TOTAL_CONFIGS)] + +# Generate random target values +y = torch.rand(TOTAL_CONFIGS, dtype=torch.float64) + 0.5 + +# Split into train and test sets +train_x, test_x = X[:TRAIN_CONFIGS], X[TRAIN_CONFIGS:] +train_graphs, test_graphs = graphs[:TRAIN_CONFIGS], graphs[TRAIN_CONFIGS:] +train_y, test_y = y[:TRAIN_CONFIGS].unsqueeze(-1), y[TRAIN_CONFIGS:].unsqueeze(-1) + +train_x, test_x = min_max_scale(train_x), min_max_scale(test_x) + +kernels = [ + ScaleKernel( + MaternKernel(nu=2.5, ard_num_dims=N_NUMERICAL, active_dims=range(N_NUMERICAL))), + ScaleKernel(CategoricalKernel( + ard_num_dims=N_CATEGORICAL, + active_dims=range(N_NUMERICAL, N_NUMERICAL + N_CATEGORICAL))), + ScaleKernel(BoTorchWLKernel( + graph_lookup=train_graphs, n_iter=5, normalize=True, + active_dims=(X.shape[1] - 1,))) +] + +# Create the Gaussian Process model +gp = SingleTaskGP(train_X=train_x, train_Y=train_y, covar_module=AdditiveKernel(*kernels)) + +# Compute the posterior distribution +multivariate_normal: MultivariateNormal = gp.forward(train_x) + +# Making predictions on test data +with torch.no_grad(), set_graph_lookup(gp, train_graphs + test_graphs, append=False): + posterior = gp.forward(test_x) + predictions = posterior.mean + uncertainties = posterior.variance.sqrt() + covar = posterior.covariance_matrix + +# Fit the GP model +mll = ExactMarginalLogLikelihood(gp.likelihood, gp) +fit_gpytorch_mll(mll) + +# Define the acquisition function +acq_function = qLogNoisyExpectedImprovement( + model=gp, + X_baseline=train_x, + objective=LinearMCObjective(weights=torch.tensor([-1.0])), + prune_baseline=True, +) + +# Define the bounds for optimization +bounds = torch.tensor([ + [0.0] * N_NUMERICAL + [0.0] * N_CATEGORICAL + [-1.0] * N_GRAPH, + [1.0] * N_NUMERICAL + [ + float(N_CATEGORICAL_VALUES_PER_CATEGORY - 1)] * N_CATEGORICAL + [ + len(X) - 1] * N_GRAPH, +]) + +# Define fixed categorical features +cats_per_column = {i: list(range(N_CATEGORICAL_VALUES_PER_CATEGORY)) for i in + range(N_NUMERICAL, N_NUMERICAL + N_CATEGORICAL)} +fixed_cats = [dict(zip(cats_per_column.keys(), combo, strict=False)) for combo in + product(*cats_per_column.values())] + +# Optimize the acquisition function with graph sampling +best_candidate, best_score = optimize_acqf_graph( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=fixed_cats, + train_graphs=train_graphs, + num_graph_samples=2, + num_restarts=2, + raw_samples=16, + q=1, +) + +# Print the results +print(f"Best candidate: {best_candidate}") +print(f"Best score: {best_score}") +print(f"Elapsed time: {time.time() - start_time} seconds") + +# Clear caches after optimization to avoid memory leaks or unexpected behavior +BoTorchWLKernel._compute_kernel.cache_clear() +TorchWLKernel._get_node_neighbors.cache_clear() +TorchWLKernel._wl_iteration.cache_clear() diff --git a/neps/graphs/kernels.py b/neps/graphs/kernels.py new file mode 100644 index 00000000..4104918e --- /dev/null +++ b/neps/graphs/kernels.py @@ -0,0 +1,285 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Any + +import torch +from botorch.models.gp_regression_mixed import Kernel +from torch import Tensor +from torch.nn import Module + +from neps.graphs.utils import graphs_to_tensors + +if TYPE_CHECKING: + import networkx as nx + + +class BoTorchWLKernel(Kernel): + """A custom kernel for Gaussian Processes using the Weisfeiler-Lehman (WL) algorithm. + + This kernel computes similarities between graphs based on their structural properties + using the WL algorithm. It is designed to be used with BoTorch and GPyTorch for + Gaussian Process regression. + + Args: + graph_lookup (list[nx.Graph]): List of NetworkX graphs. + n_iter (int, optional): Number of WL iterations to perform. Default is 5. + normalize (bool, optional): Whether to normalize the kernel matrix. + Default is True. + active_dims (tuple[int, ...]): Dimensions of the input to consider. + Not used in this kernel but included for compatibility with the base Kernel class. + **kwargs (Any): Additional arguments for the base Kernel class. + + Attributes: + graph_lookup (list[nx.Graph]): List of graphs used for kernel computation. + n_iter (int): Number of WL iterations. + normalize (bool): Whether to normalize the kernel matrix. + adjacency_cache (list[Tensor]): Cached adjacency matrices of the graphs. + label_cache (list[Tensor]): Cached initial node labels of the graphs. + """ + has_lengthscale = False + + def __init__( + self, + graph_lookup: list[nx.Graph], + n_iter: int = 5, + *, + normalize: bool = True, + active_dims: tuple[int, ...], + **kwargs: Any, + ) -> None: + super().__init__(active_dims=active_dims, **kwargs) + self.graph_lookup = graph_lookup + self.n_iter = n_iter + self.normalize = normalize + self._precompute_graph_data() + + def _precompute_graph_data(self) -> None: + """Precompute and cache adjacency matrices and initial node labels.""" + self.adjacency_cache, self.label_cache = graphs_to_tensors( + self.graph_lookup, device=self.device + ) + + def set_graph_lookup(self, graph_lookup: list[nx.Graph]) -> None: + """Update the graph lookup and refresh the cached data.""" + self.graph_lookup = graph_lookup + self._precompute_graph_data() + + def forward( + self, + x1: Tensor, + x2: Tensor, + *, + diag: bool = False, + last_dim_is_batch: bool = False, + **params: Any, + ) -> Tensor: + """Compute kernel matrix containing pairwise similarities between graphs.""" + if last_dim_is_batch: + raise NotImplementedError("Batch dimension handling is not implemented.") + + if x1.ndim == 3: + return self._handle_batched_input(x1, x2, diag) + + indices1, indices2 = self._prepare_indices(x1, x2) + + return self._compute_kernel(tuple(indices1), tuple(indices2), diag) + + def _handle_batched_input(self, x1: Tensor, x2: Tensor, diag: bool) -> Tensor: + """Handle computation for batched input tensors.""" + q_dim_size = x1.shape[0] + assert x2.shape[0] == q_dim_size + + out = torch.empty((q_dim_size, x1.shape[1], x2.shape[1]), device=x1.device) + for q in range(q_dim_size): + out[q] = self.forward(x1[q], x2[q], diag=diag) + return out + + def _prepare_indices(self, x1: Tensor, x2: Tensor) -> tuple[list[int], list[int]]: + """Convert tensor indices to integer lists.""" + indices1 = x1.flatten().to(torch.int64).tolist() + indices2 = x2.flatten().to(torch.int64).tolist() + + # Check for missing graph indices (-1) and handle them + # Explanation: The index `-1` is used as a placeholder for "missing" or "invalid" + # graphs. This can occur when a graph feature is missing or undefined, such as + # during the exploration of new candidates where no corresponding graph is + # available in the `graph_lookup`. The kernel expects non-negative indices, so we + # need to convert `-1` to the index of the last graph in the lookup. + if -1 in indices1 or -1 in indices2: + # Use the last graph in the lookup as a placeholder + last_graph_idx = len(self.graph_lookup) - 1 + # Replace any `-1` indices with the index of the last graph. + indices1 = [last_graph_idx if i == -1 else i for i in indices1] + indices2 = [last_graph_idx if i == -1 else i for i in indices2] + + return indices1, indices2 + + @lru_cache(maxsize=128) + def _compute_kernel( + self, + indices1: tuple[int, ...], + indices2: tuple[int, ...], + diag: bool, + ) -> Tensor: + """Compute the kernel matrix. + + Args: + indices1: Tuple of indices for the first set of graphs. + indices2: Tuple of indices for the second set of graphs. + diag: Whether to return only the diagonal of the kernel matrix. + + Returns: + A Tensor representing the kernel matrix. + """ + all_graphs = list(set(indices1).union(indices2)) + adj_matrices = [self.adjacency_cache[i] for i in all_graphs] + label_tensors = [self.label_cache[i] for i in all_graphs] + + # Compute full kernel matrix + K_full = self._compute_base_kernel(adj_matrices, label_tensors) + + # Map indices to their positions in all_graphs + idx1 = [all_graphs.index(i) for i in indices1] + idx2 = [all_graphs.index(i) for i in indices2] + + # Extract the relevant submatrix + K = K_full[idx1][:, idx2] + + # Return the diagonal if requested + if diag: + return torch.diag(K) + + return K + + def _compute_base_kernel( + self, adj_matrices: list[Tensor], label_tensors: list[Tensor] + ) -> Tensor: + """Compute the base kernel matrix using WL algorithm.""" + _kernel = TorchWLKernel(n_iter=self.n_iter, normalize=self.normalize) + return _kernel(adj_matrices, label_tensors) + + +class TorchWLKernel(Module): + """A custom implementation of Weisfeiler-Lehman (WL) Kernel in PyTorch. + + The WL Kernel is a graph kernel that measures similarity between graphs based on + their structural properties. It works by iteratively updating node labels based on + their neighborhoods and computing feature vectors from label distributions. + + Args: + n_iter: Number of WL iterations to perform + normalize: bool, optional. Whether to normalize the kernel matrix + + Attributes: + device: torch.device for computation (CPU/GPU) + label_dict: Mapping from node labels to numerical indices + label_counter: Counter for generating new label indices + """ + + def __init__(self, n_iter: int = 5, *, normalize: bool = True) -> None: + super().__init__() + self.n_iter = n_iter + self.normalize = normalize + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Keep track of labels across iterations + self.label_dict: dict[str, int] = {} + self.label_counter: int = 0 + + @lru_cache(maxsize=128) + def _get_node_neighbors(self, adj: Tensor) -> list[list[int]]: + """Extract neighborhood information from adjacency matrix.""" + if adj.layout == torch.sparse_csr: + adj = adj.to_sparse_coo() + + adj = adj.coalesce() + rows, cols = adj.indices() + num_nodes = adj.size(0) + + neighbors: list[list[int]] = [[] for _ in range(num_nodes)] + for row, col in zip(rows.tolist(), cols.tolist(), strict=False): + neighbors[row].append(col) + + return neighbors + + @lru_cache(maxsize=128) + def _wl_iteration(self, adj: Tensor, labels: Tensor) -> Tensor: + """Perform one WL iteration.""" + if not self.label_dict: + # Start new labels after initial ones + self.label_counter = int(labels.max().item()) + 1 + + num_nodes = labels.size(0) + new_labels: list[int] = [] + neighbors = self._get_node_neighbors(adj) + + for node_idx in range(num_nodes): + # Get current node label + node_label = int(labels[node_idx].item()) + neighbor_labels = sorted([int(labels[n].item()) for n in neighbors[node_idx]]) + + credential = f"{node_label},{neighbor_labels}" + + # Update label dictionary + new_labels.append( + self.label_dict.setdefault(credential, len(self.label_dict)) + ) + + return torch.tensor(new_labels, dtype=torch.int64, device=self.device) + + def _compute_feature_vector(self, all_labels: list[list[Tensor]]) -> Tensor: + """Compute the histogram feature vector for all graphs.""" + batch_size = len(all_labels[0]) + features: list[Tensor] = [] + + for iteration_labels in all_labels: + # Find maximum label value across all graphs in this iteration + max_label = int(max(label.max().item() for label in iteration_labels)) + 1 + + iter_features = torch.zeros((batch_size, max_label), device=self.device) + + # Compute label frequencies + for graph_idx, labels in enumerate(iteration_labels): + counts = torch.bincount(labels, minlength=max_label) + iter_features[graph_idx] = counts + + features.append(iter_features) + + return torch.cat(features, dim=1) + + def forward(self, adj_matrices: list[Tensor], label_tensors: list[Tensor]) -> Tensor: + """Compute WL kernel matrix for a list of graphs. + + Args: + adj_matrices: Precomputed sparse adjacency matrices for graphs. + label_tensors: Precomputed node label tensors for graphs. + + Returns: + Kernel matrix containing pairwise graph similarities. + """ + if len(adj_matrices) != len(label_tensors): + raise ValueError("Mismatch between adjacency matrices and label tensors.") + + # Reset label dictionary for new computation + self.label_dict = {} + # Store all label iterations + all_labels: list[list[Tensor]] = [label_tensors] + + # Perform WL iterations + for _ in range(self.n_iter): + new_labels = [ + self._wl_iteration(adj, labels) + for adj, labels in zip(adj_matrices, all_labels[-1], strict=False) + ] + all_labels.append(new_labels) + + # Compute feature vectors and kernel matrix (similarity matrix) + final_features = self._compute_feature_vector(all_labels) + kernel_matrix = torch.mm(final_features, final_features.t()) + + if self.normalize: + diag = torch.sqrt(torch.diag(kernel_matrix)) + kernel_matrix /= torch.outer(diag, diag) + + return kernel_matrix diff --git a/neps/graphs/optimization.py b/neps/graphs/optimization.py new file mode 100644 index 00000000..485bdcf6 --- /dev/null +++ b/neps/graphs/optimization.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import torch +from botorch.optim import optimize_acqf_mixed + +from neps.graphs.context_managers import set_graph_lookup +from neps.graphs.utils import sample_graphs + +if TYPE_CHECKING: + import networkx as nx + from botorch.acquisition import AcquisitionFunction + + +def optimize_acqf_graph( + acq_function: AcquisitionFunction, + bounds: torch.Tensor, + fixed_features_list: list[dict[int, int]] | None = None, + num_graph_samples: int = 10, + train_graphs: list[nx.Graph] | None = None, + num_restarts: int = 10, + raw_samples: int = 1024, + q: int = 1, +) -> tuple[torch.Tensor, float]: + """Optimize an acquisition function with graph sampling. + + This function optimizes the acquisition function by sampling graphs from the training + set, temporarily updating the kernel's graph lookup, and evaluating the acquisition + function for each sampled graph. The best candidate and its corresponding acquisition + score are returned. + + Args: + acq_function (AcquisitionFunction): The acquisition function to optimize. + bounds (torch.Tensor): A 2 x d tensor of bounds for numerical and categorical + features, where d is the number of features. + fixed_features_list (list[dict[int, float]] | None): A list of dictionaries + specifying fixed categorical feature configurations. Each dictionary maps + feature indices to their fixed values. Defaults to None. + num_graph_samples (int): The number of graphs to sample from the training set. + Defaults to 10. + train_graphs (list[nx.Graph] | None): The original training graphs. If None, a + ValueError is raised. + num_restarts (int): The number of optimization restarts. Defaults to 10. + raw_samples (int): The number of raw samples to generate for optimization. + Defaults to 1024. + q (int): The number of candidates to generate. Defaults to 1. + + Returns: + tuple[torch.Tensor, float]: A tuple containing the best candidate (as a tensor) + and its corresponding acquisition score. + + Raises: + ValueError: If `train_graphs` is None. + """ + if train_graphs is None: + raise ValueError("train_graphs cannot be None.") + + # Sample graphs from the training set + sampled_graphs = sample_graphs(train_graphs, num_samples=num_graph_samples) + + # Initialize lists to store the best candidates and their scores + best_candidates, best_scores = [], [] + + # Get the index of the graph feature in the bounds + graph_idx = bounds.shape[1] - 1 + + # Iterate through each sampled graph + for graph in sampled_graphs: + # Temporarily set the graph lookup for the kernel + with set_graph_lookup(acq_function.model.covar_module, [graph], append=True): + # Iterate through each fixed feature configuration (if provided) + for fixed_features in fixed_features_list or [{}]: + # Add the graph index to the fixed features, indicating that the last + # graphin the lookup should be used + updated_fixed_features = {**fixed_features, graph_idx: -1.0} + + # Optimize the acquisition function with the updated fixed features + candidates, scores = optimize_acqf_mixed( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=[updated_fixed_features], + num_restarts=num_restarts, + raw_samples=raw_samples, + q=q, + ) + + # Store the candidates and their scores + best_candidates.append(candidates) + best_scores.append(scores) + + # Find the index of the best score + best_idx = torch.argmax(torch.tensor(best_scores)) + + # Return the best candidate and its score + return best_candidates[best_idx], best_scores[best_idx].item() diff --git a/neps/graphs/utils.py b/neps/graphs/utils.py new file mode 100644 index 00000000..22e9d8a6 --- /dev/null +++ b/neps/graphs/utils.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +import random + +import networkx as nx +import numpy as np +import torch + + +def seed_all(seed: int = 100) -> None: + """Seed all random generators for reproducibility.""" + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed_all(seed) + # Ensure reproducibility with CuDNN (may reduce performance) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + +def min_max_scale(tensor: torch.Tensor) -> torch.Tensor: + """Scale the input tensor to the range [0, 1].""" + min_vals = tensor.min(dim=0, keepdim=True).values + max_vals = tensor.max(dim=0, keepdim=True).values + return (tensor - min_vals) / (max_vals - min_vals) + + +def graphs_to_tensors( + graphs: list[nx.Graph], + device: torch.device | None = None +) -> tuple[list[torch.sparse.Tensor], list[torch.Tensor]]: + """Convert a list of NetworkX graphs into sparse adjacency matrices and label tensors. + + Args: + graphs (List[nx.Graph]): A list of NetworkX graphs. + device (torch.device | None): The device to place the tensors on. + Default is CPU. + + Returns: + Tuple[List[torch.sparse.Tensor], List[torch.Tensor]]: + A tuple containing: + - A list of sparse adjacency matrices. + - A list of label tensors. + """ + if device is None: + device = torch.device("cpu") + + adjacency_matrices = [] + label_tensors = [] + + # Create a consistent label mapping across all graphs + label_dict: dict[str, int] = {} + label_counter: int = 0 + + for graph in graphs: + # Create adjacency matrix + edges = list(graph.edges()) + num_nodes = graph.number_of_nodes() + + if not edges: + adj = torch.sparse_coo_tensor( + indices=torch.empty((2, 0), dtype=torch.long), + values=torch.empty(0), + size=(num_nodes, num_nodes), + device=device, + ).to_sparse_csr() + else: + edge_indices = edges + [(v, u) for u, v in edges] + rows, cols = zip(*edge_indices, strict=False) + indices = torch.tensor([rows, cols], dtype=torch.long, device=device) + values = torch.ones(len(edge_indices), dtype=torch.float, device=device) + adj = torch.sparse_coo_tensor( + indices, values, (num_nodes, num_nodes), device=device + ).to_sparse_csr() + + adjacency_matrices.append(adj) + + # Create label tensor + node_labels: list[int] = [] + for node in range(graph.number_of_nodes()): + if "label" in graph.nodes[node]: + label = graph.nodes[node]["label"] + if label not in label_dict: + label_dict[label] = label_counter + label_counter += 1 + node_labels.append(label_dict[label]) + else: + node_labels.append(node) + + label_tensors.append(torch.tensor(node_labels, dtype=torch.long, device=device)) + + return adjacency_matrices, label_tensors + + +def sample_graphs(graphs: list[nx.Graph], num_samples: int) -> list[nx.Graph]: + """Sample graphs using random walks or edge modifications. + + Args: + graphs (list[nx.Graph]): Existing training graphs. + num_samples (int): Number of graph samples to generate. + + Returns: + list[nx.Graph]: Sampled graphs. + """ + sampled_graphs = [] + for _ in range(num_samples): + base_graph = random.choice(graphs) + sampled_graph = base_graph.copy() + + # More aggressive modifications + num_modifications = random.randint(2, 5) # Increase minimum modifications + for _ in range(num_modifications): + if random.random() > 0.3: # 70% chance to add edge + nodes = list(sampled_graph.nodes) + if len(nodes) >= 2: + u, v = random.sample(nodes, 2) + if not sampled_graph.has_edge(u, v): + sampled_graph.add_edge(u, v) + elif sampled_graph.edges: # 30% chance to remove edge + u, v = random.choice(list(sampled_graph.edges)) + sampled_graph.remove_edge(u, v) + + # Ensure the graph stays connected + if not nx.is_connected(sampled_graph): + components = list(nx.connected_components(sampled_graph)) + for i in range(len(components) - 1): + u = random.choice(list(components[i])) + v = random.choice(list(components[i + 1])) + sampled_graph.add_edge(u, v) + + sampled_graphs.append(sampled_graph) + + return sampled_graphs diff --git a/tests/test_graphs/test_botorch_wl_kernel.py b/tests/test_graphs/test_botorch_wl_kernel.py new file mode 100644 index 00000000..4c87fd67 --- /dev/null +++ b/tests/test_graphs/test_botorch_wl_kernel.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import networkx as nx +import pytest +import torch +from botorch.models.gp_regression_mixed import Kernel +from neps.graphs.kernels import BoTorchWLKernel + + +def create_simple_graphs(num_graphs: int) -> list[nx.Graph]: + """Helper function to create a list of graphs.""" + graphs = [] + for _i in range(num_graphs): + G = nx.Graph() + G.add_nodes_from([0, 1, 2]) + G.add_edges_from([(0, 1), (1, 2)]) + graphs.append(G) + return graphs + + +class TestBoTorchWLKernel: + @pytest.fixture() + def simple_graphs(self): + return create_simple_graphs(3) + + @pytest.fixture() + def wl_kernel(self, simple_graphs): + return BoTorchWLKernel( + graph_lookup=simple_graphs, + n_iter=2, + normalize=True, + active_dims=(0,), + ) + + def test_initialization(self, wl_kernel, simple_graphs): + """Test that the kernel is initialized correctly.""" + assert isinstance(wl_kernel, Kernel) + assert len(wl_kernel.graph_lookup) == len(simple_graphs) + assert wl_kernel.n_iter == 2 + assert wl_kernel.normalize is True + assert torch.equal(wl_kernel.active_dims, torch.tensor([0])) + + def test_precompute_graph_data(self, wl_kernel): + """Test that graph data is precomputed correctly.""" + assert hasattr(wl_kernel, "adjacency_cache") + assert hasattr(wl_kernel, "label_cache") + assert len(wl_kernel.adjacency_cache) == len(wl_kernel.graph_lookup) + assert len(wl_kernel.label_cache) == len(wl_kernel.graph_lookup) + + def test_set_graph_lookup(self, wl_kernel): + """Test that the graph lookup can be updated.""" + new_graphs = create_simple_graphs(2) + wl_kernel.set_graph_lookup(new_graphs) + assert len(wl_kernel.graph_lookup) == 2 + assert len(wl_kernel.adjacency_cache) == 2 + assert len(wl_kernel.label_cache) == 2 + + def test_forward_self_kernel(self, wl_kernel): + """Test the kernel computation for self-similarity.""" + x = torch.tensor([[0], [1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x, x) + assert K.shape == (3, 3) # Kernel matrix should be 3x3 + assert torch.allclose(K, K.T) # Kernel matrix should be symmetric + + def test_forward_cross_kernel(self, wl_kernel): + """Test the kernel computation for cross-similarity.""" + x1 = torch.tensor([[0], [1]], dtype=torch.float64) + x2 = torch.tensor([[1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x1, x2) + assert K.shape == (2, 2) # Kernel matrix should be 2x2 + + def test_forward_diagonal(self, wl_kernel): + """Test the kernel computation for diagonal only.""" + x = torch.tensor([[0], [1], [2]], dtype=torch.float64) + K = wl_kernel.forward(x, x, diag=True) + assert K.shape == (3,) # Diagonal should be a vector of length 3 + + def test_handle_negative_one_index(self, wl_kernel): + """Test the handling of the -1 index.""" + x = torch.tensor([[-1], [0], [1]], dtype=torch.float64) + K = wl_kernel.forward(x, x) + assert K.shape == (3, 3) # Kernel matrix should be 3x3 + # Ensure that -1 refers to the last graph + last_graph_idx = len(wl_kernel.graph_lookup) - 1 + assert torch.allclose(K[0, 0], K[last_graph_idx, last_graph_idx]) + + def test_forward_batched_input(self, wl_kernel): + """Test the kernel computation for batched input.""" + x1 = torch.tensor([[[0], [1]], [[1], [2]]], dtype=torch.float64) + x2 = torch.tensor([[[1], [2]], [[0], [1]]], dtype=torch.float64) + K = wl_kernel.forward(x1, x2) + assert K.shape == (2, 2, 2) # Batched kernel matrix should be 2x2x2 + + def test_forward_invalid_input(self, wl_kernel): + """Test that invalid input raises an error.""" + x1 = torch.tensor([[0], [1], [2]], dtype=torch.float64) + x2 = torch.tensor([[0], [1]], dtype=torch.float64) + with pytest.raises(NotImplementedError): + wl_kernel.forward(x1, x2, last_dim_is_batch=True) diff --git a/tests/test_graphs/test_optimization_over_graphs.py b/tests/test_graphs/test_optimization_over_graphs.py new file mode 100644 index 00000000..e4605bd1 --- /dev/null +++ b/tests/test_graphs/test_optimization_over_graphs.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +from itertools import product + +import networkx as nx +import pytest +import torch +from botorch import fit_gpytorch_mll +from botorch.acquisition import LinearMCObjective, qLogNoisyExpectedImprovement +from botorch.models import SingleTaskGP +from botorch.models.kernels import CategoricalKernel +from gpytorch import ExactMarginalLogLikelihood +from gpytorch.kernels import AdditiveKernel, MaternKernel, ScaleKernel +from neps.graphs.context_managers import set_graph_lookup +from neps.graphs.kernels import BoTorchWLKernel +from neps.graphs.optimization import optimize_acqf_graph, sample_graphs +from neps.graphs.utils import min_max_scale + + +class TestGraphOptimizationPipeline: + @pytest.fixture() + def setup_data(self): + """Fixture to set up common data for tests.""" + TRAIN_CONFIGS = 50 + TEST_CONFIGS = 10 + TOTAL_CONFIGS = TRAIN_CONFIGS + TEST_CONFIGS + + N_NUMERICAL = 2 + N_CATEGORICAL = 1 + N_CATEGORICAL_VALUES_PER_CATEGORY = 2 + N_GRAPH = 1 + + # Generate random data + X = torch.cat([ + torch.rand((TOTAL_CONFIGS, N_NUMERICAL), dtype=torch.float64), + torch.randint(0, N_CATEGORICAL_VALUES_PER_CATEGORY, + (TOTAL_CONFIGS, N_CATEGORICAL), dtype=torch.float64), + torch.arange(TOTAL_CONFIGS, dtype=torch.float64).unsqueeze(1) + ], dim=1) + + # Generate random graphs + graphs = [nx.erdos_renyi_graph(5, 0.5) for _ in range(TOTAL_CONFIGS)] + + # Generate random target values + y = torch.rand(TOTAL_CONFIGS, dtype=torch.float64) + 0.5 + + # Split into train and test sets + train_x, test_x = X[:TRAIN_CONFIGS], X[TRAIN_CONFIGS:] + train_graphs, test_graphs = graphs[:TRAIN_CONFIGS], graphs[TRAIN_CONFIGS:] + train_y, test_y = y[:TRAIN_CONFIGS].unsqueeze(-1), y[TRAIN_CONFIGS:].unsqueeze(-1) + + # Scale the data + train_x, test_x = min_max_scale(train_x), min_max_scale(test_x) + + return { + "train_x": train_x, + "test_x": test_x, + "train_graphs": train_graphs, + "test_graphs": test_graphs, + "train_y": train_y, + "test_y": test_y, + "N_NUMERICAL": N_NUMERICAL, + "N_CATEGORICAL": N_CATEGORICAL, + "N_CATEGORICAL_VALUES_PER_CATEGORY": N_CATEGORICAL_VALUES_PER_CATEGORY, + "N_GRAPH": N_GRAPH, + } + + def test_gp_fit_and_predict(self, setup_data): + """Test fitting the GP and making predictions.""" + train_x = setup_data["train_x"] + train_y = setup_data["train_y"] + test_x = setup_data["test_x"] + train_graphs = setup_data["train_graphs"] + setup_data["test_graphs"] + + # Define the kernels + kernels = [ + ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=setup_data["N_NUMERICAL"], + active_dims=range(setup_data["N_NUMERICAL"]))), + ScaleKernel( + CategoricalKernel(ard_num_dims=setup_data["N_CATEGORICAL"], + active_dims=range(setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + + setup_data["N_CATEGORICAL"]) + ) + ), + ScaleKernel( + BoTorchWLKernel(graph_lookup=train_graphs, n_iter=5, normalize=True, + active_dims=(train_x.shape[1] - 1,))) + ] + + # Create the GP model + gp = SingleTaskGP(train_X=train_x, train_Y=train_y, + covar_module=AdditiveKernel(*kernels)) + + # Fit the GP + mll = ExactMarginalLogLikelihood(gp.likelihood, gp) + fit_gpytorch_mll(mll) + + # Make predictions on the test set + with torch.no_grad(): + posterior = gp.forward(test_x) + predictions = posterior.mean + uncertainties = posterior.variance.sqrt() + + # Ensure predictions are in the correct shape (10, 1) + predictions = predictions.unsqueeze(-1) # Reshape to (10, 1) + + # Basic checks + assert predictions.shape == (setup_data["test_x"].shape[0], 1) + assert uncertainties.shape == (setup_data["test_x"].shape[0],) + + def test_acquisition_function_optimization(self, setup_data): + """Test optimizing the acquisition function with graph sampling.""" + train_x = setup_data["train_x"] + train_y = setup_data["train_y"] + train_graphs = setup_data["train_graphs"] + + # Define the kernels + kernels = [ + ScaleKernel(MaternKernel(nu=2.5, ard_num_dims=setup_data["N_NUMERICAL"], + active_dims=range(setup_data["N_NUMERICAL"]))), + ScaleKernel( + CategoricalKernel( + ard_num_dims=setup_data["N_CATEGORICAL"], + active_dims=range(setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + + setup_data["N_CATEGORICAL"]) + ) + ), + ScaleKernel( + BoTorchWLKernel(graph_lookup=train_graphs, n_iter=5, normalize=True, + active_dims=(train_x.shape[1] - 1,))) + ] + + # Create the GP model + gp = SingleTaskGP(train_X=train_x, train_Y=train_y, + covar_module=AdditiveKernel(*kernels)) + + # Fit the GP + mll = ExactMarginalLogLikelihood(gp.likelihood, gp) + fit_gpytorch_mll(mll) + + # Define the acquisition function + acq_function = qLogNoisyExpectedImprovement( + model=gp, + X_baseline=train_x, + objective=LinearMCObjective(weights=torch.tensor([-1.0])), + prune_baseline=True, + ) + + # Define bounds for optimization + bounds = torch.tensor([ + [0.0] * setup_data["N_NUMERICAL"] + [0.0] * setup_data["N_CATEGORICAL"] + [ + -1.0] * setup_data["N_GRAPH"], + [1.0] * setup_data["N_NUMERICAL"] + [ + float(setup_data["N_CATEGORICAL_VALUES_PER_CATEGORY"] - 1)] * setup_data[ + "N_CATEGORICAL"] + [len(train_x) - 1] * setup_data["N_GRAPH"], + ]) + + # Define fixed categorical features + cats_per_column = {i: list(range(setup_data["N_CATEGORICAL_VALUES_PER_CATEGORY"])) + for i in range(setup_data["N_NUMERICAL"], + setup_data["N_NUMERICAL"] + setup_data[ + "N_CATEGORICAL"])} + fixed_cats = [dict(zip(cats_per_column.keys(), combo, strict=False)) for combo in + product(*cats_per_column.values())] + + # Optimize the acquisition function + best_candidate, best_score = optimize_acqf_graph( + acq_function=acq_function, + bounds=bounds, + fixed_features_list=fixed_cats, + train_graphs=train_graphs, + num_graph_samples=2, + num_restarts=2, + raw_samples=16, + q=1, + ) + + # Basic checks + assert best_candidate.shape == (1, train_x.shape[1]) + assert isinstance(best_score, float) + + def test_graph_sampling(self, setup_data): + """Test the graph sampling functionality.""" + train_graphs = setup_data["train_graphs"] + num_samples = 5 + + # Sample graphs + sampled_graphs = sample_graphs(train_graphs, num_samples=num_samples) + + # Basic checks + assert len(sampled_graphs) == num_samples + for graph in sampled_graphs: + assert isinstance(graph, nx.Graph) + assert nx.is_connected(graph) + + def test_set_graph_lookup(self, setup_data): + """Test the set_graph_lookup context manager.""" + train_graphs = setup_data["train_graphs"] + test_graphs = setup_data["test_graphs"] + + # Define the kernel + kernel = BoTorchWLKernel(graph_lookup=train_graphs, n_iter=5, normalize=True, + active_dims=(0,)) + + # Use the context manager to temporarily set the graph lookup + with set_graph_lookup(kernel, test_graphs, append=True): + assert len(kernel.graph_lookup) == len(train_graphs) + len(test_graphs) + + # Check that the original graph lookup is restored + assert len(kernel.graph_lookup) == len(train_graphs) diff --git a/tests/test_graphs/test_torch_wl_kernel.py b/tests/test_graphs/test_torch_wl_kernel.py new file mode 100644 index 00000000..26311ab6 --- /dev/null +++ b/tests/test_graphs/test_torch_wl_kernel.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import networkx as nx +import numpy as np +import pytest +import torch +from grakel import WeisfeilerLehman, graph_from_networkx +from neps.graphs.kernels import TorchWLKernel +from neps.graphs.utils import graphs_to_tensors + + +class TestTorchWLKernel: + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + @pytest.fixture() + def example_graphs_set(self): + # Create example graphs for testing + G1 = nx.Graph() + G1.add_edges_from([(0, 1), (1, 2), (1, 3), (2, 3), (3, 4)]) + for node in G1.nodes(): + G1.nodes[node]["label"] = str(node) + + G2 = nx.Graph() + G2.add_edges_from([(0, 1), (1, 2), (3, 4), (4, 0)]) + for node in G2.nodes(): + G2.nodes[node]["label"] = str(node) + + G3 = nx.Graph() + G3.add_edges_from([(0, 1), (1, 3), (3, 2), (2, 4), (4, 0), (1, 2)]) + for node in G3.nodes(): + G3.nodes[node]["label"] = str(node) + + return [G1, G2, G3] + + @pytest.fixture() + def random_graphs_sets(self): + # Set a seed for reproducibility + seed = 100 + np.random.seed(seed) + torch.manual_seed(seed) + random_graph_sets = [] + + # Generate 10 random sets of graphs + for _ in range(10): + # Number of graphs in the set (2 to 10) + num_graphs = np.random.randint(2, 11) + graph_set = [] + + for _ in range(num_graphs): + # Number of nodes in the graph (3 to 50) + num_nodes = np.random.randint(3, 51) + G = nx.Graph() + + # Add nodes with labels + for node in range(num_nodes): + G.add_node(node, label=str(node)) + + # Add random edges + for u in range(num_nodes): + for v in range(u + 1, num_nodes): + if np.random.rand() > 0.5: # 50% chance to add an edge + G.add_edge(u, v) + + graph_set.append(G) + + random_graph_sets.append(graph_set) + + return random_graph_sets + + @pytest.mark.parametrize("n_iter", [1, 2, 3, 5, 10]) + @pytest.mark.parametrize("normalize", [True, False]) + def test_wl_kernel_against_grakel(self, n_iter, normalize, random_graphs_sets): + for graph_set in random_graphs_sets: + adjacency_matrices, label_tensors = graphs_to_tensors( + graph_set, device=self.device) + + # Initialize Torch WL Kernel + torch_kernel = TorchWLKernel(n_iter=n_iter, normalize=normalize) + torch_kernel_matrix = torch_kernel(adjacency_matrices, + label_tensors).cpu().numpy() + + # Initialize GraKel WL Kernel + grakel_graphs = list( + graph_from_networkx(graph_set, node_labels_tag="label", as_Graph=True)) + grakel_kernel = WeisfeilerLehman(n_iter=n_iter, normalize=normalize) + grakel_kernel_matrix = grakel_kernel.fit_transform(grakel_graphs) + + # Compare the kernel matrices + np.testing.assert_allclose( + torch_kernel_matrix, + grakel_kernel_matrix, + rtol=1e-5, + atol=1e-8, + err_msg=f"Kernel matrices differ for graph={graph_set}, n_iter={n_iter}" + ) + + def test_empty_graph(self): + G_empty = nx.Graph() + G_empty.add_node(0) + G_empty.nodes[0]["label"] = "0" + + adjacency_matrices, label_tensors = graphs_to_tensors([G_empty], + device=self.device) + + # Initialize kernel and compute + kernel = TorchWLKernel(n_iter=3, normalize=True) + kernel_matrix = kernel(adjacency_matrices, label_tensors) + + # For a single graph, should get a 1x1 matrix with value 1.0 + expected = torch.ones(1, 1, device=self.device) + torch.testing.assert_close(kernel_matrix, expected) + + def test_invalid_input(self): + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + + with pytest.raises(ValueError, + match="Mismatch between adjacency matrices and label tensors"): + wl_kernel([], [torch.tensor([0])]) + + def test_kernel_on_single_node_graph(self): + G_single = nx.Graph() + G_single.add_node(0) + G_single.nodes[0]["label"] = "0" + + adjacency_matrices, label_tensors = graphs_to_tensors([G_single], + device=self.device) + + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + expected = torch.ones(1, 1, device=self.device) + torch.testing.assert_close(K, expected) + + def test_wl_kernel_with_empty_graph_and_reordered_edges(self, random_graphs_sets): + """Test the TorchWLKernel with an empty graph and a graph with reordered edges.""" + for graph_set in random_graphs_sets: + # Create an empty graph + G_empty = nx.Graph() + G_empty.add_node(0) + G_empty.nodes[0]["label"] = "0" + + # Select the first graph from the set to reorder its edges + G = graph_set[0] + G_reordered = nx.Graph() + + # Add all nodes from the original graph to G_reordered + for node in G.nodes(): + G_reordered.add_node(node, label=G.nodes[node]["label"]) + + # Reorder edges randomly + edges = list(G.edges()) + np.random.shuffle(edges) # Randomly shuffle the edges + G_reordered.add_edges_from(edges) + + # Combine the empty graph, original graph, and reordered graph + graphs = [G_empty, G, G_reordered] + adjacency_matrices, label_tensors = graphs_to_tensors( + graphs, device=self.device + ) + + # Initialize and compute the kernel + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + assert K.shape == (3, 3), "Kernel matrix shape is incorrect" + assert torch.allclose(K[1, 1], K[2, 2]), \ + "Kernel value for original and reordered graphs should be the same" + + @pytest.mark.parametrize("n_iter", [1, 2, 3, 4, 5, 6, 7]) + @pytest.mark.parametrize("normalize", [True, False]) + def test_wl_kernel_with_different_node_labels(self, n_iter, normalize, + example_graphs_set): + graphs = [] + for i, G in enumerate(example_graphs_set): + G_copy = G.copy() + prefix = ["node_", "vertex_", "n"][i] + for node in G_copy.nodes(): + G_copy.nodes[node]["label"] = f"{prefix}{node}" + graphs.append(G_copy) + + adjacency_matrices, label_tensors = graphs_to_tensors(graphs, + device=self.device) + + wl_kernel = TorchWLKernel(n_iter=n_iter, normalize=normalize) + torch_kernel_matrix = wl_kernel(adjacency_matrices, label_tensors).cpu().numpy() + + grakel_graphs = graph_from_networkx(graphs, node_labels_tag="label") + grakel_wl = WeisfeilerLehman(n_iter=n_iter, normalize=normalize) + grakel_kernel_matrix = grakel_wl.fit_transform(grakel_graphs) + + np.testing.assert_allclose( + torch_kernel_matrix, + grakel_kernel_matrix, + rtol=1e-5, + atol=1e-8, + err_msg=f"Kernel matrices differ for n_iter={n_iter}, normalize={normalize}" + ) + + def test_wl_kernel_with_same_node_labels(self, example_graphs_set): + """Test WL kernel behavior with same node labels but different structures. + + Even when all nodes have the same label, the WL kernel should: + 1. Produce a symmetric matrix + 2. Have 1.0 on the diagonal (self-similarity) + 3. Have off-diagonal values less than 1.0 (different structures) + 4. Maintain non-negative values (it's a valid kernel) + """ + graphs = [] + for G in example_graphs_set: + G_copy = G.copy() + for node in G_copy.nodes(): + G_copy.nodes[node]["label"] = "A" + graphs.append(G_copy) + + adjacency_matrices, label_tensors = graphs_to_tensors( + graphs, device=self.device) + + wl_kernel = TorchWLKernel(n_iter=3, normalize=True) + K = wl_kernel(adjacency_matrices, label_tensors) + + # Check basic properties + assert K.shape == (3, 3), "Kernel matrix shape is incorrect" + assert torch.allclose(K, K.T, atol=1e-4), "Kernel matrix is not symmetric" + + # Check diagonal elements are 1 (normalized self-similarity) + assert torch.allclose(torch.diag(K), torch.ones_like(torch.diag(K)), atol=1e-4), \ + "Diagonal elements should be 1.0" + + # Check off-diagonal elements are less than 1 (different structures) + off_diag_mask = ~torch.eye(K.shape[0], dtype=torch.bool, device=self.device) + assert torch.all(K[off_diag_mask] < 1.0), \ + "Off-diagonal elements should be less than 1.0 for different structures" + + # Check all elements are non-negative (valid kernel) + assert torch.all(K >= 0), "Kernel values should be non-negative"