diff --git a/.flake8 b/.flake8 index c4e987c..5c06765 100644 --- a/.flake8 +++ b/.flake8 @@ -9,7 +9,7 @@ per-file-ignores = examples/*/*: D103, D205, D301, D400 # - docstrings rules that should not be applied to doc doc/*: D100, D103, F401 -ignore = D105, D107, E402, W503, W504, W605, BLK100 +ignore = D105, D107, E402, W503, W504, W605, BLK100, E501 # for compatibility with black # https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#flake8 extend-ignore = E203 \ No newline at end of file diff --git a/examples/plot_alignment_methods_benchmark.py b/examples/plot_alignment_methods_benchmark.py index 246a27b..e343639 100644 --- a/examples/plot_alignment_methods_benchmark.py +++ b/examples/plot_alignment_methods_benchmark.py @@ -147,7 +147,11 @@ aligned_score = roi_masker.inverse_transform(method_error) title = f"Correlation of prediction after {method} alignment" display = plotting.plot_stat_map( - aligned_score, display_mode="z", cut_coords=[-15, -5], vmax=1, title=title + aligned_score, + display_mode="z", + cut_coords=[-15, -5], + vmax=1, + title=title, ) ############################################################################### diff --git a/examples/plot_alignment_simulated_2D_data.py b/examples/plot_alignment_simulated_2D_data.py index ca18ec6..393bc43 100644 --- a/examples/plot_alignment_simulated_2D_data.py +++ b/examples/plot_alignment_simulated_2D_data.py @@ -161,7 +161,9 @@ def _plot_distributions_and_alignment( Y = np.roll(Y, 6, axis=0) # We plot them and observe that their initial matching is wrong R_identity = np.eye(n_points, dtype=np.float64) -_plot_distributions_and_alignment(X, Y, R=R_identity, title="Initial Matching", thr=0.1) +_plot_distributions_and_alignment( + X, Y, R=R_identity, title="Initial Matching", thr=0.1 +) ############################################################################### # Alignment : finding the right transform @@ -193,7 +195,9 @@ def _plot_distributions_and_alignment( title="Procrustes between distributions", thr=0.1, ) -_plot_mixing_matrix(R=scaled_orthogonal_alignment.R.T, title="Orthogonal mixing matrix") +_plot_mixing_matrix( + R=scaled_orthogonal_alignment.R.T, title="Orthogonal mixing matrix" +) ############################################################################### # Ridge alignment @@ -206,7 +210,11 @@ def _plot_distributions_and_alignment( ridge_alignment = RidgeAlignment(alphas=[0.01, 0.1], cv=2).fit(X.T, Y.T) _plot_distributions_and_alignment( - X, Y, R=ridge_alignment.R.coef_, title="Ridge between distributions", thr=0.1 + X, + Y, + R=ridge_alignment.R.coef_, + title="Ridge between distributions", + thr=0.1, ) _plot_mixing_matrix(R=ridge_alignment.R.coef_, title="Ridge coefficients") diff --git a/examples/plot_int_alignment.py b/examples/plot_int_alignment.py new file mode 100644 index 0000000..2c92de3 --- /dev/null +++ b/examples/plot_int_alignment.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- + +""" +Co-smoothing Prediction using the IndividualNeuralTuning Model. +See article : https://doi.org/10.1162/imag_a_00032 + +========================== + +In this tutorial, we show how to better predict new contrasts for a target +subject using many source subjects corresponding contrasts. For this purpose, +we create a template to which we align the target subject, using shared information. +We then predict new images for the target and compare them to a baseline. + +We mostly rely on Python common packages and on nilearn to handle +functional data in a clean fashion. + + +To run this example, you must launch IPython via ``ipython +--matplotlib`` in a terminal, or use ``jupyter-notebook``. + +.. contents:: **Contents** + :local: + :depth: 1 + +""" +# %% +import warnings + +warnings.filterwarnings("ignore") +############################################################################### +# Retrieve the data +# ----------------- +# In this example we use the IBC dataset, which includes a large number of +# different contrasts maps for 12 subjects. +# We download the images for subjects sub-01, sub-02, sub-04, sub-05, sub-06 +# and sub-07 (or retrieve them if they were already downloaded). +# imgs is the list of paths to available statistical images for each subjects. +# df is a dataframe with metadata about each of them. +# mask is a binary image used to extract grey matter regions. +# + +from fmralign.fetch_example_data import fetch_ibc_subjects_contrasts + +sub_list = ["sub-01", "sub-02", "sub-04", "sub-05", "sub-06", "sub-07"] +imgs, df, mask_img = fetch_ibc_subjects_contrasts(sub_list) + +############################################################################### +# Define a masker +# ----------------- +# We define a nilearn masker that will be used to handle relevant data. +# For more information, visit : +# 'https://nilearn.github.io/stable/manipulating_images/masker_objects.html' +# + +from nilearn.maskers import NiftiMasker + +masker = NiftiMasker(mask_img=mask_img).fit() + +############################################################################### +# Prepare the data +# ---------------- +# For each subject, we will use two series of contrasts acquired during +# two independent sessions with a different phase encoding: +# Antero-posterior(AP) or Postero-anterior(PA). +# + + +# To infer a template for subjects sub-01 to sub-06 for both AP and PA data, +# we make a list of 4D niimgs from our list of list of files containing 3D images + +from nilearn.image import concat_imgs + +template_train = [] +for i in range(6): + template_train.append(concat_imgs(imgs[i])) + + +# For subject sub-07, we split it in two folds: +# - target train: sub-07 AP contrasts, used to learn alignment to template +# - target test: sub-07 PA contrasts, used as a ground truth to score predictions +# We make a single 4D Niimg from our list of 3D filenames +target_train = df[df.subject == "sub-07"][df.acquisition == "ap"].path.values +target_train = concat_imgs(target_train) +target_train_data = masker.transform(target_train) +target_test = df[df.subject == "sub-07"][df.acquisition == "pa"].path.values + + +############################################################################### +# Compute a baseline (average of subjects) +# ---------------------------------------- +# We create an image with as many contrasts as any subject representing for +# each contrast the average of all train subjects maps. +# + +import numpy as np + +masked_imgs = [masker.transform(img) for img in template_train] +average_img = np.mean(masked_imgs[:-1], axis=0) +average_subject = masker.inverse_transform(average_img) + +############################################################################### +# Create a template from the training subjects. +# --------------------------------------------- +# We define an estimator using the class TemplateAlignment: +# * We align the whole brain through multiple local alignments. +# * These alignments are calculated on a parcellation of the brain in 100 pieces, +# this parcellation creates group of functionnally similar voxels. +# * The template is created iteratively, aligning all subjects data into a +# common space, from which the template is inferred and aligning again to this +# new template space. +# + +from nilearn.image import index_img +from fmralign.alignment_methods import IndividualizedNeuralTuning +from fmralign.hyperalignment.piecewise_alignment import PiecewiseAlignment +from fmralign.hyperalignment.regions import compute_parcels + +############################################################################### +# Predict new data for left-out subject +# ------------------------------------- +# We use target_train data to fit the transform, indicating it corresponds to +# the contrasts indexed by train_index and predict from this learnt alignment +# contrasts corresponding to template test_index numbers. +# For each train subject and for the template, the AP contrasts are sorted from +# 0, to 53, and then the PA contrasts from 53 to 106. +# + +train_index = range(53) +test_index = range(53, 106) + +denoising_data = np.array(masked_imgs)[:, train_index, :] +training_data = np.array(masked_imgs)[:-1] +target_test_masked = np.array(masked_imgs)[:, test_index, :] + + +parcels = compute_parcels(niimg=template_train[0], mask=masker, n_parcels=100, n_jobs=5) +denoiser = PiecewiseAlignment(n_jobs=5) +denoised_signal = denoiser.fit_transform(X=denoising_data, regions=parcels) +target_denoised_data = denoised_signal[-1] +model = IndividualizedNeuralTuning( + parcels=parcels, +) +model.fit(training_data, verbose=False) +stimulus_ = np.copy(model.shared_response) + +# From the denoised data and the stimulus, we can now extract the tuning +# matrix from sub-07 AP contrasts, and use it to predict the PA contrasts. +target_tuning = model._tuning_estimator( + shared_response=stimulus_[train_index], target=target_denoised_data +) +# %% +# We input the mapping image target_train in a list, we could have input more +# than one subject for which we'd want to predict : [train_1, train_2 ...] + + +pred = model._reconstruct_signal( + shared_response=stimulus_[test_index], individual_tuning=target_tuning +) +prediction_from_template = masker.inverse_transform(pred) + + +# As a baseline prediction, let's just take the average of activations across subjects. + +prediction_from_average = index_img(average_subject, test_index) + +############################################################################### +# Score the baseline and the prediction +# ------------------------------------- +# We use a utility scoring function to measure the voxelwise correlation +# between the prediction and the ground truth. That is, for each voxel, we +# measure the correlation between its profile of activation without and with +# alignment, to see if alignment was able to predict a signal more alike the ground truth. +# +# %% +from fmralign.metrics import score_voxelwise + +# Now we use this scoring function to compare the correlation of predictions +# made from group average and from template with the real PA contrasts of sub-07 + + +average_score = masker.inverse_transform( + score_voxelwise(target_test, prediction_from_average, masker, loss="corr") +) + +template_score = masker.inverse_transform( + score_voxelwise(target_test, prediction_from_template, masker, loss="corr") +) + + +############################################################################### +# Plotting the measures +# --------------------- +# Finally we plot both scores +# + +# %% +from nilearn import plotting + +baseline_display = plotting.plot_stat_map( + average_score, display_mode="z", vmax=1, cut_coords=[-15, -5] +) +baseline_display.title("Group average correlation wrt ground truth") +display = plotting.plot_stat_map( + template_score, display_mode="z", cut_coords=[-15, -5], vmax=1 +) +display.title("INT prediction correlation wt ground truth") + +############################################################################### +# We observe that creating a template and aligning a new subject to it yields +# a prediction that is better correlated with the ground truth than just using +# the average activations of subjects. +# + +plotting.show() + +# %% diff --git a/examples/plot_toy_int_experiment.py b/examples/plot_toy_int_experiment.py new file mode 100644 index 0000000..ad06d57 --- /dev/null +++ b/examples/plot_toy_int_experiment.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- + +""" +Co-smoothing Prediction using the IndividualNeuralTuning Model. +See article : https://doi.org/10.1162/imag_a_00032 + +========================== + +This is a toy experiment to test Individual Tuning Model (INT) on two parts of the +data (or different runs) to assess the validity of tuning computation. This code has +no intention to be an explanatory example, but rather a test to check the validity of +the INT model. + + +To run this example, you must launch IPython via ``ipython +--matplotlib`` in a terminal, or use ``jupyter-notebook``. + +.. contents:: **Contents** + :local: + :depth: 1 + +""" +# %% +import numpy as np +import matplotlib.pyplot as plt +from fmralign.alignment_methods import IndividualizedNeuralTuning as INT +from fmralign.fetch_example_data import generate_dummy_signal +from fmralign.hyperalignment.correlation import ( + tuning_correlation, + stimulus_correlation, + compute_pearson_corr, + matrix_MDS, +) + +# %% +############################################################################### +# Generate the data +# ----------------- +# In this example we use toy data to test the INT model. We generate two runs of +# the experiment, and we use the INT model to align the two runs. We then compare +# the tuning matrices and the shared response to assess the validity of the INT model. +# We also compare the reconstructed images to the ground truth to assess the validity +# of the INT model. +# The toy generation function allows us to get the ground truth stimulus and tuning +# matrices that were used to generate the data, and we can also control the level of +# noise in the data. + +n_subjects = 10 +n_timepoints = 200 +n_voxels = 500 +S_std = 5 # Standard deviation of the source components +T_std = 1 +SNR = 100 # Signal to noise ratio +latent_dim = 15 # if None, latent_dim = n_t +decomposition_method = "pca" # if None, SVD is used + + +( + data_run_1, + data_run_2, + stimulus_run_1, + stimulus_run_2, + data_tuning, +) = generate_dummy_signal( + n_subjects=n_subjects, + n_timepoints=n_timepoints, + n_voxels=n_voxels, + S_std=S_std, + T_std=T_std, + latent_dim=latent_dim, + SNR=SNR, + seed=42, +) + +parcels = [range(n_voxels)] + +# %% +############################################################################# +# Create two independant instances of the model +# --------------------------------------------- +# We create two instances of the INT model to align the two runs of +# the experiment, then extract the tuning matrices and the shared from the two +# runs to compare them. + +int1 = INT( + n_components=latent_dim, + parcels=parcels, + decomp_method=decomposition_method, +) +int2 = INT( + n_components=latent_dim, + parcels=parcels, + decomp_method=decomposition_method, +) +int1.fit(data_run_1, verbose=False) +int2.fit(data_run_2, verbose=False) + +# save individual components +tuning_pred_run_1 = int1.tuning_data +tuning_pred_run_1 = np.array(tuning_pred_run_1) +tuning_pred_run_2 = int2.tuning_data +tuning_pred_run_2 = np.array(tuning_pred_run_2) + +stimulus_pred_run_1 = int1.shared_response +stimulus_pred_run_2 = int2.shared_response + +data_pred = int1.transform(data_run_2) + +# %% +############################################################################### +# Plotting validation metrics +# --------------------------- +# We compare the tuning matrices and the shared response to assess the validity +# of the INT model. To achieve that, we use Pearson correlation between true and +# estimated stimulus, as well as between true and estimated tuning matrices. +# For tuning matrices, this is dones by first computing the correlation between +# every pair of tuning matrices from the two runs of the experiment, and then +# averaging the correlation across the diagonal (ie the correlation between +# the same timepoint of the two runs). + +fig, ax = plt.subplots(2, 3, figsize=(15, 8)) + + +# Tunning matrices +correlation_tuning = tuning_correlation(tuning_pred_run_1, tuning_pred_run_2) +ax[0, 0].imshow(correlation_tuning) +ax[0, 0].set_title("Pearson correlation of tuning matrices (Run 1 vs Run 2)") +ax[0, 0].set_xlabel("Subjects, Run 1") +ax[0, 0].set_ylabel("Subjects, Run 2") +fig.colorbar(ax[0, 0].imshow(correlation_tuning), ax=ax[0, 0]) + +random_colors = np.random.rand(n_subjects, 3) +# MDS of predicted images +corr_tunning = compute_pearson_corr(data_pred, data_run_2) +data_pred_reduced, data_test_reduced = matrix_MDS( + data_pred, data_run_2, n_components=2, dissimilarity=1 - corr_tunning +) + +ax[0, 1].scatter( + data_pred_reduced[:, 0], + data_pred_reduced[:, 1], + label="Run 1", + c=random_colors, +) +ax[0, 1].scatter( + data_test_reduced[:, 0], + data_test_reduced[:, 1], + label="Run 2", + c=random_colors, +) +ax[0, 1].set_title("MDS of predicted images, dim=2") + +# MDS of tunning matrices +corr_tunning = compute_pearson_corr(tuning_pred_run_1, tuning_pred_run_2) +T_first_part_transformed, T_second_part_transformed = matrix_MDS( + tuning_pred_run_1, tuning_pred_run_2, n_components=2, dissimilarity=1 - corr_tunning +) + +ax[0, 2].scatter( + T_first_part_transformed[:, 0], + T_first_part_transformed[:, 1], + label="Run 1", + c=random_colors, +) +ax[0, 2].scatter( + T_second_part_transformed[:, 0], + T_second_part_transformed[:, 1], + label="Run 2", + c=random_colors, +) +ax[0, 2].set_title("MDS of tunning matrices, dim=2") +# Set square aspect +ax[0, 1].set_aspect("equal", "box") +ax[0, 2].set_aspect("equal", "box") + +# Stimulus matrix correlation +correlation_stimulus_true_est_first_part = stimulus_correlation( + stimulus_pred_run_1.T, stimulus_run_1.T +) +ax[1, 0].imshow(correlation_stimulus_true_est_first_part) +ax[1, 0].set_title("Correlation of estimated stimulus vs ground truth (Run 1)") +ax[1, 0].set_xlabel("Latent components, Run 1") +ax[1, 0].set_ylabel("Latent components, ground truth") +fig.colorbar(ax[1, 0].imshow(correlation_stimulus_true_est_first_part), ax=ax[1, 0]) + +correlation_stimulus_true_est_second_part = stimulus_correlation( + stimulus_pred_run_2.T, stimulus_run_2.T +) +ax[1, 1].imshow(correlation_stimulus_true_est_second_part) +ax[1, 1].set_title("Correlation of estimated stimulus vs ground truth (Run 2))") +ax[1, 1].set_xlabel("Latent components, Run 2") +ax[1, 1].set_ylabel("Latent components, ground truth") +fig.colorbar(ax[1, 1].imshow(correlation_stimulus_true_est_second_part), ax=ax[1, 1]) + + +# Reconstruction +corr_reconstruction = tuning_correlation(data_pred, data_run_2) +ax[1, 2].imshow(corr_reconstruction) +ax[1, 2].set_title("Correlation of brain response (Run 2 vs Ground truth)") +ax[1, 2].set_xlabel("Subjects, Run 2") +ax[1, 2].set_ylabel("Subjects, Ground truth") +fig.colorbar(ax[1, 2].imshow(corr_reconstruction), ax=ax[1, 2]) + + +plt.rc("font", size=10) +# Define small font for titles +fig.suptitle( + "Correlation metrics for the Individual Tuning Model\n" + + f"{n_subjects} subjects, {n_timepoints} timepoints, {n_voxels} voxels, {latent_dim} latent components\n" + + f"SNR={SNR}" +) + +plt.tight_layout() +# %% +plt.show() diff --git a/fmralign/alignment_methods.py b/fmralign/alignment_methods.py index ca0ad6e..e3b8b5b 100644 --- a/fmralign/alignment_methods.py +++ b/fmralign/alignment_methods.py @@ -2,11 +2,11 @@ """Module implementing alignment estimators on ndarrays.""" import warnings -import ot import numpy as np import scipy from joblib import Parallel, delayed from scipy import linalg +import ot from scipy.optimize import linear_sum_assignment from scipy.sparse import diags from scipy.spatial.distance import cdist @@ -14,6 +14,10 @@ from sklearn.linear_model import RidgeCV from sklearn.metrics.pairwise import pairwise_distances +# Fast implementation for parallelized computing +from fmralign.hyperalignment.linalg import safe_svd, svd_pca +from fmralign.hyperalignment.piecewise_alignment import PiecewiseAlignment + def scaled_procrustes(X, Y, scaling=False, primal=None): """ @@ -363,7 +367,7 @@ def fit(self, X, Y): """ Parameters - -------------- + ---------- X: (n_samples, n_features) nd array source data Y: (n_samples, n_features) nd array @@ -436,7 +440,7 @@ def fit(self, X, Y): """ Parameters - -------------- + ---------- X: (n_samples, n_features) nd array source data Y: (n_samples, n_features) nd array @@ -465,3 +469,265 @@ def fit(self, X, Y): def transform(self, X): """Transform X using optimal coupling computed during fit.""" return X.dot(self.R) + + +class IndividualizedNeuralTuning(Alignment): + """ + Method of alignment based on the Individualized Neural Tuning model. + It works on 4D fMRI data, and is based on the assumption that the neural + response to a stimulus is shared across subjects. It uses searchlight/ + parcelation alignment to denoise the data, and then computes the stimulus + response matrix. + See article : https://doi.org/10.1162/imag_a_00032 + """ + + def __init__( + self, + decomp_method="pca", + n_components=None, + searchlights=None, + parcels=None, + dists=None, + radius=20, + tuning=True, + n_jobs=1, + ): + """ + Initialize the IndividualizedNeuralTuning object. + + Parameters: + ---------- + decomp_method : str + The decomposition method to use. + Can be ["pca", "pcav1", "procrustes"] + Default is "pca". + searchlights : array-like + The searchlight indices for each subject, + of shape (n_s, n_searchlights). + parcels : array-like + The parcel indices for each subject, + of shape (n_s, n_parcels) (if not using searchlights) + dists : array-like + The distances of vertices to the center of their searchlight, + of shape (n_searchlights, n_vertices_sl) + radius : int(optional) + The radius of the searchlight sphere, in milimeters. + Defaults to 20. + tuning : bool(optional) + Whether to compute the tuning weights. Defaults to True. + n_components : int + The number of latent dimensions to use in the shared stimulus + information + matrix. Default is None. + n_jobs : int + The number of parallel jobs to run. Default is -1. + + Returns: + -------- + None + """ + + self.n_subjects = None + self.n_time_points = None + self.labels = None + self.alphas = None + + if searchlights is None and parcels is None: + raise ValueError("searchlights or parcels must be provided") + + if searchlights is not None and parcels is not None: + raise ValueError( + "searchlights and parcels cannot be provided at the same time" + ) + + if searchlights is not None: + self.regions = searchlights + else: + self.regions = parcels + + self.dists = dists + self.radius = radius + self.tuning = tuning + + self.tuning_data = [] + self.denoised_signal = [] + self.decomp_method = decomp_method + self.n_components = n_components + self.n_jobs = n_jobs + + ################################################################ + # Computing decomposition + + @staticmethod + def _tuning_estimator(shared_response, target): + """ + Estimate the tuning matrix for individualized neural tuning. + + Parameters: + ---------- + shared_response : array-like + The shared response matrix of shape (n_timepoints, k) + where k is the dimension of the sources latent space. + target : array-like + The target matrix. + latent_dim : int, optional + The number of latent dimensions (if PCA is used). Defaults to None. + + Returns: + -------- + array-like: The estimated tuning matrix for the given target. + + """ + if shared_response.shape[1] != shared_response.shape[0]: + return (np.linalg.pinv(shared_response)).dot(target) + return np.linalg.inv(shared_response).dot(target) + + @staticmethod + def _stimulus_estimator(full_signal, n_subjects, latent_dim=None, scaling=True): + """ + Estimates the stimulus matrix for the Individualized Neural Tuning model. + + Parameters: + ----------- + full_signal : ndarray + Concatenated signal for all subjects, + of shape (n_timepoints, n_subjects * n_voxels). + n_subjects : int + The number of subjects. + latent_dim : int, optional + The number of latent dimensions to use. Defaults to None. + scaling : bool, optional + Whether to scale the stimulus matrix sources. Defaults to True. + + Returns: + -------- + stimulus : ndarray + The stimulus matrix of shape (n_timepoints, n_subjects * n_voxels) + """ + n_timepoints = full_signal.shape[0] + if scaling: + U = svd_pca(full_signal) + else: + U, _, _ = safe_svd(full_signal) + if latent_dim is not None and latent_dim < n_timepoints: + U = U[:, :latent_dim] + + stimulus = np.sqrt(n_subjects) * U + stimulus = stimulus.astype(np.float32) + return stimulus + + @staticmethod + def _reconstruct_signal(shared_response, individual_tuning): + """ + Reconstructs the signal using the stimulus as shared + response and individual tuning. + + Parameters: + -------- + shared_response : ndarray + The shared response of shape (n_timeframes, n_timeframes) or + (n_timeframes, latent_dim). + individual_tuning : ndarray + The individual tuning of shape (latent_dim, n_voxels) or + (n_timeframes, n_voxels). + + Returns: + -------- + ndarray: + The reconstructed signal of shape (n_timeframes, n_voxels). + """ + return (shared_response @ individual_tuning).astype(np.float32) + + def fit( + self, + X, + verbose=True, + ): + """ + Fits the IndividualizedNeuralTuning model to the training data. + + Parameters: + ----------- + X : array-like + The training data of shape (n_subjects, n_samples, n_voxels). + verbose : bool(optional) + Whether to print progress information. Defaults to True. + + Returns: + -------- + + self : Instance of IndividualizedNeuralTuning) + The fitted model. + """ + + X_ = np.array(X, copy=True, dtype=np.float32) + + self.n_subjects, self.n_time_points, self.n_voxels = X_.shape + + self.tuning_data = np.empty(self.n_subjects, dtype=np.float32) + self.denoised_signal = np.empty(self.n_subjects, dtype=np.float32) + + denoiser = PiecewiseAlignment( + template_kind=self.decomp_method, + n_jobs=self.n_jobs, + verbose=verbose, + ) + self.denoised_signal = denoiser.fit_transform( + X_, + regions=self.regions, + dists=self.dists, + radius=self.radius, + ) + + # Stimulus matrix computation + full_signal = np.concatenate(self.denoised_signal, axis=1) + self.shared_response = self._stimulus_estimator( + full_signal, self.n_subjects, self.n_components + ) + if self.tuning: + self.tuning_data = Parallel(n_jobs=self.n_jobs)( + delayed(self._tuning_estimator)( + self.shared_response, + self.denoised_signal[i], + ) + for i in range(self.n_subjects) + ) + + return self + + def transform(self, X, verbose=False): + """ + Transforms the input test data using the hyperalignment model. + + Parameters: + ---------- + X : array-like + The test data of shape (n_subjects, n_timepoints, n_voxels). + verbose : bool(optional) + Whether to print progress information. Defaults to False. + + Returns: + -------- + ndarray : + The transformed data of shape (n_subjects, n_timepoints, n_voxels). + """ + + full_signal = np.concatenate(X, axis=1, dtype=np.float32) + + if verbose: + print("Predict : Computing stimulus matrix...") + + stimulus_ = self._stimulus_estimator( + full_signal, self.n_subjects, self.n_components + ) + print("Predict : stimulus matrix shape: ", stimulus_.shape) + + if verbose: + print("Predict : stimulus matrix shape: ", stimulus_.shape) + + reconstructed_signal = Parallel(n_jobs=self.n_jobs)( + delayed(self._reconstruct_signal)(stimulus_, T_est) + for T_est in self.tuning_data + ) + + return np.array(reconstructed_signal, dtype=np.float32) diff --git a/fmralign/fetch_example_data.py b/fmralign/fetch_example_data.py index 245e3a7..f43770e 100644 --- a/fmralign/fetch_example_data.py +++ b/fmralign/fetch_example_data.py @@ -3,6 +3,8 @@ import pandas as pd from nilearn.datasets._utils import fetch_files, get_dataset_dir +from fastsrm.srm import projection +import numpy as np def fetch_ibc_subjects_contrasts(subjects, data_dir=None, verbose=1): @@ -69,7 +71,13 @@ def fetch_ibc_subjects_contrasts(subjects, data_dir=None, verbose=1): # download / retrieve mask niimg and find its path mask = fetch_files( data_dir, - [("gm_mask_3mm.nii.gz", "https://osf.io/yvju3/download", {"uncompress": True})], + [ + ( + "gm_mask_3mm.nii.gz", + "https://osf.io/yvju3/download", + {"uncompress": True}, + ) + ], verbose=verbose, )[0] @@ -108,3 +116,152 @@ def fetch_ibc_subjects_contrasts(subjects, data_dir=None, verbose=1): ) files.append(fetch_files(data_dir, filenames, verbose=verbose)) return files, metadata_df, mask + + +def generate_dummy_signal( + n_subjects: int, + n_timepoints: int, + n_voxels: int, + S_std=1, + latent_dim=None, + T_mean=0, + T_std=1, + SNR=1, + generative_method="custom", + seed=0, +): + """Generate dummy signal for testing INT model + + Parameters + ---------- + n_subjects : int + Number of subjects. + n_timepoints : int + Number of timepoints. + n_voxels : int + Number of voxels. + S_std : float, default=1 + Standard deviation of latent variables. + latent_dim: int, defult=None + Number of latent dimensions. Defualts to n_timepoints + T_mean : float + Mean of weights. + T_std : float + Standard deviation of weights. + SNR : float + Signal-to-noise ratio. + generative_method : str, default="custom" + Method for generating data. Options are "custom", "fastsrm". + seed : int + Random seed. + + + Returns + ------- + imgs_train : ndarray of shape (n_subjects, n_timepoints, n_voxels) + Training data. + imgs_test : ndarray of shape (n_subjects, n_timepoints, n_voxels) + Testing data. + S_train : ndarray of shape (n_timepoints, latent_dim) + Training latent variables. + S_test : ndarray of shape (n_timepoints, latent_dim) + Testing latent variables. + Ts : ndarray of shape (n_subjects, latent_dim , n_voxels) + Tuning matrices. + """ + if latent_dim is None: + latent_dim = n_timepoints + + rng = np.random.RandomState(seed=seed) + + if generative_method == "custom": + sigma = n_subjects * np.arange(1, latent_dim + 1) + # np.random.shuffle(sigma) + # Generate common signal matrix + S_train = S_std * np.random.randn(n_timepoints, latent_dim) + # Normalize each row to have unit norm + S_train = S_train / np.linalg.norm(S_train, axis=0, keepdims=True) + S_train = S_train @ np.diag(sigma) + S_test = S_std * np.random.randn(n_timepoints, latent_dim) + S_test = S_test / np.linalg.norm(S_test, axis=0, keepdims=True) + S_test = S_test @ np.diag(sigma) + + elif generative_method == "fastsrm": + Sigma = rng.dirichlet(np.ones(latent_dim), 1).flatten() + S_train = np.sqrt(Sigma)[:, None] * rng.randn(n_timepoints, latent_dim) + S_test = np.sqrt(Sigma)[:, None] * rng.randn(n_timepoints, latent_dim) + + elif generative_method == "multiviewica": + S_train = np.random.laplace(size=(n_timepoints, latent_dim)) + S_test = np.random.laplace(size=(n_timepoints, latent_dim)) + + else: + raise ValueError("Unknown generative method") + + # Generate indiivdual spatial components + data_train, data_test = [], [] + Ts = [] + for _ in range(n_subjects): + if generative_method == "custom" or generative_method == "multiviewica": + W = T_mean + T_std * np.random.randn(latent_dim, n_voxels) + else: + W = projection(rng.randn(latent_dim, n_voxels)) + + Ts.append(W) + X_train = S_train @ W + noise = np.random.randn(n_timepoints, n_voxels) + noise = ( + noise + * np.linalg.norm(X_train) + / (SNR * np.linalg.norm(noise, axis=0, keepdims=True)) + ) + X_train += noise + data_train.append(X_train) + X_test = S_test @ W + noise = np.random.randn(n_timepoints, n_voxels) + noise = ( + noise + * np.linalg.norm(X_test) + / (SNR * np.linalg.norm(noise, axis=0, keepdims=True)) + ) + X_test += noise + data_test.append(X_test) + + data_train = np.array(data_train) + data_test = np.array(data_test) + return data_train, data_test, S_train, S_test, Ts + + +def generate_dummy_searchlights( + n_searchlights: int, + n_voxels: int, + radius: float, + sl_size: int = 5, + seed: int = 0, +): + """Generate dummy searchlights for testing INT model + + Parameters + ---------- + n_searchlights : int + Number of searchlights. + n_voxels : int + Number of voxels. + radius : float, + Radius of searchlights. + sl_size : int, default=5 + Size of each searchlight (easier for dummy signal generation). + seed : int + Random seed. + + Returns + ------- + searchlights : ndarray of shape (n_searchlights, sl_size) + Searchlights. + dists : ndarray of shape (n_searchlights, sl_size) + Distances. + """ + rng = np.random.RandomState(seed=seed) + searchlights = rng.randint(n_voxels, size=(n_searchlights, sl_size)) + dists = rng.randint(radius, size=searchlights.shape) + return searchlights, dists diff --git a/fmralign/hyperalignment/__init__.py b/fmralign/hyperalignment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fmralign/hyperalignment/correlation.py b/fmralign/hyperalignment/correlation.py new file mode 100644 index 0000000..f698088 --- /dev/null +++ b/fmralign/hyperalignment/correlation.py @@ -0,0 +1,279 @@ +"""Some tools to compute correlation matrices. Functions in this module are +meant to be used as a test for the hyperalignment algorithm only.""" + +import numpy as np +from sklearn.manifold import MDS +from scipy.optimize import linear_sum_assignment +from itertools import combinations + + +def compute_pearson_corr(X, Y, linear_assignment: bool = False): + """Compute Pearson correlation between X and Y. + X and Y are two lists of matrices of the same shape. + The returned matrix will be of shape 2N x 2N, where N is the number of matrices in X and Y. + + Parameters: + ---------- + X : ndarray + First set of matrices. + Y : ndarray + Second set of matrices. + linear_assignment : (bool, optional) + Whether to perform linear assignment optimization. Defaults to False. + + Returns: + ------- + ndarray: Pearson correlation matrix. + """ + + assert X.shape == Y.shape + + XY = np.concatenate((X, Y), axis=0) + n = XY.shape[0] + corr_mat = np.zeros((n, n)) + for i in range(n): + for j in range(n): + corr_i_j = pearson_corr_coeff( + XY[i], XY[j], linear_assignment=linear_assignment + ) + corr_mat[i, j] = corr_i_j + + return corr_mat + + +def pearson_corr_coeff( + M1: np.ndarray, + M2: np.ndarray, + absolute: bool = True, + linear_assignment: bool = True, +): + """ + Compute Pearson correlation coefficient between matrices M1 and M2 by averaging diagonal elements of the correlation matrix. + This function can also perform linear assignment optimization to maximize the correlation coefficient. + + Parameters: + ---------- + M1 : ndarray + First matrix. + M2 : ndarray) + Second matrix. + absolute : (bool, optional) + Whether to compute absolute correlation coefficients. Defaults to True. + linear_assignment : (bool, optional) + Whether to perform linear assignment optimization. Defaults to True. + + Returns: + ------- + float: Pearson correlation coefficient. + """ + assert M1.shape == M2.shape + + n = M1.shape[0] + corr = np.corrcoef(M1, M2)[:n, n:] + + corr = np.abs(corr) if absolute else corr + + if linear_assignment: + row_ind, col_ind = linear_sum_assignment(corr, maximize=True) + + # permutation of columns and rows + corr = corr[row_ind, :] + corr = corr[:, col_ind] + + corr_diag = np.diag(corr) + + corr_coeff = np.mean(corr_diag) + + return corr_coeff + + +def tuning_correlation(X, Y): + """Compute pairwise Pearson correlation matrix between two sets of matrices. + X and Y are two lists of matrices of the same shape. + + Parameters: + ---------- + X : ndarray + First set of matrices, with shape (n_subjects, n_samples, n_features). + Y : ndarray + Second set of matrices, with shape (n_subjects, n_samples, n_features). + + Returns: + ------- + ndarray: Pearson correlation matrix. + """ + assert X.shape == Y.shape + n = X.shape[0] + corr_mat = np.zeros((n, n)) + for i in range(n): + for j in range(i, n): + corr_i_j = pearson_corr_coeff(X[i], Y[j], absolute=True) + corr_mat[i, j] = corr_i_j + corr_mat[j, i] = corr_i_j + + return corr_mat + + +def stimulus_correlation(X, Y, linear_assignment=True, absolute=True): + """Compute pairwise Pearson correlation matrix between two stimulus matrices. + + Parameters: + ---------- + X : ndarray + First stimulus matrix, with shape (n_samples, n_features). + Y : ndarray + Second stimulus matrix, with shape (n_samples, n_features). + linear_assignment : (bool, optional) + Whether to perform linear assignment optimization. Defaults to True. + absolute : (bool, optional) + Whether to compute absolute correlation coefficients. Defaults to True. + + Returns: + ------- + ndarray: Pearson correlation matrix. + """ + assert X.shape == Y.shape + n = X.shape[0] + corr_mat = np.corrcoef(X, Y)[:n, n:] + + if absolute: + corr_mat = np.abs(corr_mat) + + if linear_assignment: + row_ind, col_ind = linear_sum_assignment(corr_mat, maximize=True) + corr_mat = corr_mat[row_ind, :] + corr_mat = corr_mat[:, col_ind] + + return corr_mat + + +def matrix_MDS(X, Y, n_components=2, dissimilarity="euclidean"): + """ + Perform multidimensional scaling (MDS) on the rows of X and Y. + + Parameters: + ---------- + + X : list of ndarray + The first data matrix. + Y : list of ndarray + The second data matrix. + n_components : int + The number of dimensions in the output space (default is 2). + dissimilarity : str or array-like + The dissimilarity measure to use. + If it is a string other than "precomputed", the dissimilarity is + computed using the Euclidean distance between flattened data points. + If it is "precomputed", the dissimilarity is assumed to be a + precomputed dissimilarity matrix. + + Returns: + ------- + tuple: A tuple containing two arrays. The first array represents + the transformed data points from matrix X, and the second array + represents the transformed data points from matrix Y. + """ + assert len(X) == len(Y) + + if isinstance(dissimilarity, str) and dissimilarity != "precomputed": + X_flat = [x.flatten() for x in X] + Y_flat = [y.flatten() for y in Y] + XY = np.array(X_flat + Y_flat) + mds = MDS(n_components=n_components, dissimilarity=dissimilarity) + transformed = mds.fit_transform(XY) + + else: + mds = MDS(n_components=n_components, dissimilarity="precomputed") + transformed = mds.fit_transform(dissimilarity) + + return np.array(transformed[: len(X)]), np.array(transformed[len(X) :]) + + +def multithread_compute_correlation( + X, Y, absolute=False, linear_assignment=True, n_jobs=1 +): + """ + Compute correlations between pairs of samples in X and Y using multiple threads. + + Parameters: + ---------- + + X : ndarray) + The first set of samples, with shape (n_samples, n_features). + Y : ndarray + The second set of samples, with shape (n_samples, n_features). + absolute : bool(optional) + Whether to compute absolute correlations. Defaults to False. + linear_assignment : bool(optional) + Whether to use linear assignment for correlation computation. Defaults to True. + n_jobs : int(optional) + The number of threads to use for parallel computation. Defaults to 40. + + Returns: + ------- + + tuple + A tuple containing four arrays + - corr_same_sub_diff_TR: Correlations between different time points + of the same subject. + - corr_same_sub_same_TR: Correlations between the same time points + of the same subject. + - corr_diff_sub_diff_TR: Correlations between different time points + of different subjects. + - corr_diff_sub_same_TR: Correlations between the same time points + of different subjects. + """ + from joblib import Parallel, delayed + + def thread_compute_correlation(X, Y, i, j): + X_i, Y_i = X[i], Y[i] + corr = stimulus_correlation( + X_i, Y_i, absolute=absolute, linear_assignment=linear_assignment + ) + same_TR_corr = np.diag(corr) + # Get all the values except the diagonal in a list + diff_TR_corr = corr[np.where(~np.eye(corr.shape[0], dtype=bool))] + if i == j: + return ( + np.array([]), + np.array([]), + [x for x in diff_TR_corr], + [x for x in same_TR_corr], + ) + + else: + return ( + diff_TR_corr.astype(np.float16), + same_TR_corr.astype(np.float16), + np.array([]), + np.array([]), + ) + + assert X.shape == Y.shape + n_s = X.shape[0] + coordinates = list(combinations(range(n_s), 2)) + [(i, i) for i in range(n_s)] + results = Parallel(n_jobs=n_jobs)( + delayed(thread_compute_correlation)( + X, + Y, + i, + j, + ) + for (i, j) in coordinates + ) + results = list(zip(*results)) + corr_same_sub_diff_TR = results[2] + corr_same_sub_same_TR = results[3] + corr_diff_sub_diff_TR = results[0] + corr_diff_sub_same_TR = results[1] + + corr_same_sub_diff_TR = np.concatenate(corr_same_sub_diff_TR) + corr_same_sub_same_TR = np.concatenate(corr_same_sub_same_TR) + corr_diff_sub_diff_TR = np.concatenate(corr_diff_sub_diff_TR) + corr_diff_sub_same_TR = np.concatenate(corr_diff_sub_same_TR) + return ( + corr_same_sub_diff_TR, + corr_same_sub_same_TR, + corr_diff_sub_diff_TR, + corr_diff_sub_same_TR, + ) diff --git a/fmralign/hyperalignment/linalg.py b/fmralign/hyperalignment/linalg.py new file mode 100644 index 0000000..1448349 --- /dev/null +++ b/fmralign/hyperalignment/linalg.py @@ -0,0 +1,157 @@ +""" +The linear algebra module. This module contains functions that are often +used in hyperalignment algorithms. Specifically, the robustness of +singular value decomposition (SVD) is enhanced in ``safe_svd`` to avoid +occasional crashes when the operation is performed many times (e.g., in a +searchlight algorithm), and ``svd_pca`` performs PCA based on +``safe_svd``. +""" + +import numpy as np +from scipy.linalg import LinAlgError +from scipy.linalg import svd + +__all__ = ["safe_svd", "svd_pca", "ridge"] + + +def safe_svd(X, remove_mean=True): + """ + Singular value decomposition without occasional LinAlgError crashes. + + The default ``lapack_driver`` of ``scipy.linalg.svd`` is ``'gesdd'``, + which occassionaly crashes even if the input matrix is not singular. + This function automatically handles the ``LinAlgError`` when it's + raised and switches to the ``'gesvd'`` driver in this case. + + The input matrix ``X`` is factorized as ``U @ np.diag(s) @ Vt``. + + Parameters + ---------- + X : ndarray of shape (M, N) + The matrix to be decomposed in NumPy array format. + remove_mean : bool, default=True + Whether to subtract the mean of each column before the actual SVD + (True) or not (False). Setting `remove_mean=True` is helpful when + the SVD is used to perform PCA. + + Returns + ------- + U : ndarray of shape (M, K) + Unitary matrix. + s : ndarray of shape (K,) + The singular values. + Vt : ndarray of shape (K, N) + Unitary matrix. + """ + if remove_mean: + X_ = X - X.mean(axis=0, keepdims=True) + else: + X_ = X.copy() + try: + U, s, Vt = svd(X_, full_matrices=False) + except LinAlgError: + U, s, Vt = svd(X_, full_matrices=False, lapack_driver="gesvd") + del X_ + return U, s, Vt + + +def svd_pca(X, remove_mean=True): + """ + Principal component analysis (PCA) based on SVD. + + This function performs a rotation and returns the transformed data in + PC space. Therefore, its behavior is similar to the ``fit_transform`` + method of ``sklearn.decomposition.PCA``. + + It does not throw away any PCs, and therefore there is no + dimensionality reduction in the PC space. However, the number of PCs + might be less than the number of features in ``X``, depending on the + rank of ``X``. + + Parameters + ---------- + X : ndarray of shape (M, N) + The data matrix to be transformed into PC space. + remove_mean : bool, default=True + Whether to subtract the mean of each column before the SVD (True) + or not (False). This parameter should be set to True unless the + columns already have zero mean. + + Returns + ------- + X_new : ndarray of shape (M, K) + The transformed data matrix in PC space. + """ + U, s, Vt = safe_svd(X, remove_mean=remove_mean) + X_new = U * s[np.newaxis] + return X_new + + +def ridge(X, Y, alpha=10): + """Solve ridge regression problem for matrix target using SVD. + + Parameters + ---------- + X : ndarray + The data matrix of shape (n_samples, n_features). + Y : ndarray of shape (n_samples, n_targets) + The target matrix. + alpha : float + The regularization parameter. + + Returns + ------- + betas : ndarray of shape (n_features, n_targets) + The solution to the ridge regression problem. + """ + U, s, Vt = safe_svd(X, remove_mean=True) + d = s / (alpha + s**2) + d_UT_Y = d[:, np.newaxis] * (U.T @ Y) + betas = Vt.T @ d_UT_Y + return betas + + +def procrustes(X, Y, reflection=True, scaling=False): + r""" + The orthogonal Procrustes algorithm. + + Parameters + ---------- + X : ndarray + The data matrix to be aligned to Y. + Y : ndarray + The "target" data matrix -- the matrix to be aligned to. + reflection : bool, default=True + Whether allows reflection in the transformation (True) or not + (False). Note that even with ``reflection=True``, the solution + may not contain a reflection if the alignment cannot be improved + by adding a reflection to the rotation. + scaling : bool, default=False + Whether allows global scaling (True) or not (False). Allowing + scaling can improve alignment quality, but it also changes the + geometry of data. + + Returns + ------- + T : ndarray + The transformation matrix which can be used to align X to Y. + Depending on the parameters ``reflection`` and ``scaling``, the + transformation can be a pure rotation, an improper rotation, or a + pure/improper rotation with global scaling. + """ + + A = Y.T.dot(X).T + U, s, Vt = safe_svd(A, remove_mean=False) + T = np.dot(U, Vt) + + if not reflection: + sign = np.sign(np.linalg.det(T)) + s[-1] *= sign + if sign < 0: + T -= np.outer(U[:, -1], Vt[-1, :]) * 2 + + if scaling: + scale = s.sum() / (X.var(axis=0).sum() * X.shape[0]) + T *= scale + + return T diff --git a/fmralign/hyperalignment/local_template.py b/fmralign/hyperalignment/local_template.py new file mode 100644 index 0000000..60197d8 --- /dev/null +++ b/fmralign/hyperalignment/local_template.py @@ -0,0 +1,208 @@ +""" Local template computation functions. Those functions are part of the warp hyperalignment +introducted by Feilong Ma et al. 2023. +""" + +import numpy as np +from sklearn.decomposition import PCA +from sklearn.utils.extmath import randomized_svd + +from .linalg import safe_svd +from .linalg import procrustes + + +def PCA_decomposition( + X, n_components=None, flavor="sklearn", adjust_ns=True, demean=True +): + """Decompose concatenated data matrices using PCA/SVD. + + Parameters + ---------- + X : ndarray of shape (n_subjects, n_timepoints, n_voxels) + The input data array. + n_components : int or None + The number of components to keep. If None, all components are kept. + flavor : {'sklearn', 'svd'} + Whether to use sklearn or the custom SVD implementation. + adjust_ns : bool + Whether to adjust the variance of the output so that it doesn't increase with the number of subjects. + demean : bool + Whether to remove the mean of the columns prior to SVD. + + Returns + ------- + XX : ndarray of shape (n_timepoints, n_components) + The decomposed data array with reduced dimensionality. + cc : ndarray of shape (n_components, n_subjects, n_voxels) + Column-wise principal components (from Vt) + """ + ns, nt, nv = X.shape + X = X.transpose(1, 0, 2).reshape(nt, ns * nv).astype(np.float32) + if flavor == "sklearn": + try: + if demean: + pca = PCA(n_components=n_components, random_state=0) + XX = pca.fit_transform(X) + cc = pca.components_.reshape(-1, ns, nv) + if adjust_ns: + XX /= np.sqrt(ns) + return XX.astype(np.float32), cc + else: + U, s, Vt = randomized_svd( + X, + (n_components if n_components is not None else min(X.shape)), + random_state=0, + ) + if adjust_ns: + XX = U[:, :n_components] * ( + s[np.newaxis, :n_components] / np.sqrt(ns) + ) + else: + XX = U[:, :n_components] * (s[np.newaxis, :n_components]) + cc = Vt[:n_components].reshape(-1, ns, nv) + return XX.astype(np.float32), cc + except: # noqa: E722 + return PCA_decomposition( + X, + n_components=n_components, + flavor="svd", + adjust_ns=adjust_ns, + demean=demean, + ) + elif flavor == "svd": + U, s, Vt = safe_svd(X) + if adjust_ns: + XX = U[:, :n_components] * (s[np.newaxis, :n_components] / np.sqrt(ns)) + else: + XX = U[:, :n_components] * (s[np.newaxis, :n_components]) + cc = Vt[:n_components].reshape(-1, ns, nv) + return XX.astype(np.float32), cc + else: + raise NotImplementedError + + +def compute_PCA_template(X, sl=None, n_components=None, flavor="sklearn", demean=False): + """ + Compute the PCA template from the input data. + + Parameters: + ----------- + + X : ndarray of shape (n_samples, n_features, n_timepoints) + The input data array. + sl : slice(optional) + The region indices for searchlight-based template computation. Defaults to None. + n_components : int(optional) + The maximum number of principal components to keep. If None, all components will be kept. Defaults to None. + flavor : str(optional) + The flavor of PCA algorithm to use. Defaults to "sklearn". + demean : bool(optional) + Whether to demean the data before performing PCA. Defaults to False. + + Returns: + -------- + + XX : ndarray + The PCA template array of shape (n_samples, n_features, n_components). + """ + if sl is not None: + X_ = X[:, :, sl] + else: + X_ = X + n = min(X_.shape[1], X_.shape[2]) + n_components = min(n, n_components) + XX, cc = PCA_decomposition( + X_, n_components=n_components, flavor=flavor, adjust_ns=True, demean=demean + ) + return XX.astype(np.float32) + + +def compute_PCA_var1_template( + X, sl=None, n_components=None, flavor="sklearn", demean=True +): + """ + Compute the PCA template from the input data. + + Parameters: + ----------- + + X : ndarray of shape (n_samples, n_features, n_timepoints) + The input data array. + sl : slice(optional) + The region indices for searchlight-based template computation. Defaults to None. + n_components : int(optional) + The maximum number of principal components to keep. If None, all components will be kept. Defaults to None. + flavor : str(optional) + The flavor of PCA algorithm to use. Defaults to "sklearn". + demean : bool(optional) + Whether to demean the data before performing PCA. Defaults to False. + + Returns: + -------- + + XX : ndarray + The PCA template array of shape (n_samples, n_features, n_components). + """ + if sl is not None: + X = X[:, :, sl] + XX, cc = PCA_decomposition( + X, n_components=n_components, flavor=flavor, adjust_ns=False, demean=demean + ) + w = np.sqrt(np.sum(cc**2, axis=2)).mean(axis=1) + XX *= w[np.newaxis] + return XX.astype(np.float32) + + +def compute_template( + X, + region, + kind="pca", + n_components=150, + common_topography=True, + demean=True, +): + """ + Compute a template from a set of datasets. + + Parameters: + ----------- + + X : ndarray of shape (n_subjects, n_timepoints, n_voxels) + The input datasets. + region : ndarray or None + The region indices for searchlight or region-based template computation. + sl : ndarray or None + The searchlight indices for searchlight-based template computation. + region : int + The index of the region to consider. + kind : str + The type of template computation algorithm to use. Can be "pca", "pcav1", "pcav2", or "cls". + n_components : int(optional) + The maximum number of principal components to use for PCA-based template computation. Defaults to 150. + common_topography : bool(optional) + Whether to enforce common topography across datasets. Defaults to True. + demean : bool(optional) + Whether to demean the datasets before template computation. Defaults to True. + + Returns: + -------- + tmpl : ndaray of shape (n_timepoints, n_voxels) + The computed template on all parcels (or searchlights). + """ + mapping = { + "pca": compute_PCA_template, + "pcav1": compute_PCA_var1_template, + } + if kind in mapping: + tmpl = mapping[kind](X, sl=region, n_components=n_components, demean=demean) + else: + raise ValueError("Unknown template kind") + + if common_topography: + if region is not None: + X_ = X[:, :, region] + else: + X_ = np.copy(X) + ns, nt, nv = X_.shape + T = procrustes(np.tile(tmpl, (ns, 1)), X_.reshape(ns * nt, nv)) + tmpl = tmpl @ T + return tmpl.astype(np.float32) diff --git a/fmralign/hyperalignment/piecewise_alignment.py b/fmralign/hyperalignment/piecewise_alignment.py new file mode 100644 index 0000000..dd56002 --- /dev/null +++ b/fmralign/hyperalignment/piecewise_alignment.py @@ -0,0 +1,157 @@ +"""Piecewise alignment model. This model decomposes the data into regions (pieces). +Those can either be searchlights or parcels (computed with standard parcellation algorithms). +See the ```nilearn``` documentation for more details: +- https://nilearn.github.io/stable/modules/generated/nilearn.regions.Parcellations.html +- https://nilearn.github.io/stable/modules/generated/nilearn.decoding.SearchLight.html +""" + +import numpy as np +from sklearn.base import BaseEstimator, TransformerMixin +from .regions import ( + template, + piece_ridge, + searchlight_weights, +) +from joblib import Parallel, delayed + + +class PiecewiseAlignment(BaseEstimator, TransformerMixin): + """Searchlight alignment model. This model decomposes the data into a + global template and a linear transformation for each subject. + The global template is computed using a searchlight/parcellation approach. + The linear transformation is computed using a ridge regression. + This step is enssential to the hyperalignment model, as it is + used to remove noise from the raw data. + """ + + def __init__( + self, + template_kind="pca", + common_topography=True, + verbose=True, + n_jobs=1, + ): + """ + Parameters + ---------- + alignment_method : str, default="ridge" + The alignment method to use. Can be "ridge" or "ensemble_ridge". + template_kind : str, default="pca" + The kind of template to use. Can be "pca" or "mean". + demean : bool, default=False + Whether to demean the data before alignment. + verbose : bool, default=True + Whether to display progress bar. + n_jobs : int, default=-1 + """ + self.W = [] + self.Xhat = [] + self.n_s = None + self.n_t = None + self.n_v = None + self.template_kind = template_kind + self.verbose = verbose + self.common_topography = common_topography + self.n_jobs = n_jobs + self.regions = None + self.distances = None + self.radius = None + self.weights = None + + def compute_linear_transformation(self, data, template): + """Compute the linear transformation for a given subject provided the global template. + + Parameters + ---------- + data : ndarray of shape (n_samples, n_voxels) + The brain images for one subject. + Those are the B_1, ..., B_n in the paper. + template : ndarray of shape (n_samples, n_voxels) + The global template M. + + Returns + ------- + Xhat : ndarray of shape (n_samples, n_voxels) + The denoised estimation signal for each subject. + """ + + x_hat = piece_ridge( + X=template, + Y=data, + alpha=10, + regions=self.regions, + verbose=self.verbose, + ) + return x_hat + + def fit_transform( + self, + X: np.ndarray, + regions=None, + dists=None, + radius=None, + weights=None, + ): + """From given fmri data, compute the global template and the linear transformation. + This provides denoised signal estimations using template alignment. + + Parameters + ---------- + X : list of ndarray of shape (n_samples, n_voxels) + The brain images for one subject. + searchlights : list of searchlights + The searchlight indices. + dists : list of distances + + radius : int + The radius of the searchlight (in millimeters) + + Returns + ------- + Xhat : list of ndarray of shape (n_samples, n_voxels) + The denoised estimations B_1, ... B_p for each subject. + """ + + self.FUNC = "RegionAlignment" + if dists is not None and radius is not None: + self.FUNC = "SearchlightAlignment" + + if self.verbose: + print(f"[{self.FUNC}] Shape of input data: ", X.shape) + + self.n_s, self.n_t, self.n_v = X.shape + self.regions = regions + self.FUNC = "ParcelAlignment" + + if weights is None and dists is not None and radius is not None: + self.distances = dists + self.radius = radius + self.FUNC = "SearchlightAlignment" + + # Compute global template M (sl_template) + if self.verbose: + print(f"[{self.FUNC}]Computing global template M ...") + + if dists is None or radius is None: + self.weights = weights + elif weights is None: + self.weights = searchlight_weights( + searchlights=regions, dists=dists, radius=radius + ) + else: + self.weights = weights + + sl_template = template( + X, + regions=regions, + n_jobs=self.n_jobs, + template_kind=self.template_kind, + common_topography=self.common_topography, + weights=self.weights, + ) + + self.Xhat = Parallel(n_jobs=self.n_jobs)( + delayed(self.compute_linear_transformation)(X[i], sl_template) + for i in range(self.n_s) + ) + return np.array(self.Xhat) diff --git a/fmralign/hyperalignment/regions.py b/fmralign/hyperalignment/regions.py new file mode 100644 index 0000000..f99d284 --- /dev/null +++ b/fmralign/hyperalignment/regions.py @@ -0,0 +1,547 @@ +"""Utilities for computing searchlights. Adapted from ```nilearn```.\n +See the ```nilearn``` documentation for more details: +- https://nilearn.github.io/modules/generated/nilearn.regions.Parcellations.html +- https://nilearn.github.io/dev/modules/generated/nilearn.decoding.SearchLight.html + +Author: Denis Fouchard, INRIA Saclay, MIND, 2023. +""" + +import functools +import numpy as np +from joblib import Parallel, delayed +from nilearn import image, masking +from nilearn._utils import check_niimg_4d, check_niimg_3d +from nilearn.image.resampling import coord_transform +import warnings +from sklearn import neighbors +from scipy.spatial import distance_matrix +from nilearn._utils.niimg_conversions import ( + safe_get_data, +) +from .linalg import procrustes +from .linalg import ridge +from .local_template import compute_template +from fmralign._utils import _make_parcellation +from nilearn.maskers import NiftiMasker +from nibabel.nifti1 import Nifti1Image + +################################################################################################### +# Compute parcels +################################################################################################### + + +def create_parcels_from_labels(labels: np.ndarray): + """ + + Parameters: + ---------- + labels : ndarray + Array of labels. + + Returns: + ------- + parcles : list + List of parcels, where each parcel is an array of indices. + """ + n_labels = labels.max() + parcels = [] + for i in range(1, n_labels + 1): + parcels.append(np.where(labels == i)[0]) + return parcels + + +def compute_parcels( + niimg, mask, n_parcels=100, verbose=True, smoothing_fwhm=5, n_jobs=1 +): + """ + Compute parcels using a given mask and input image. + + Parameters: + - niimg: Input image to be parcellated. + - mask: Mask image defining the region of interest. + - n_parcels: Number of parcels to be created (default: 100). + - verbose: Whether to print progress messages (default: True). + - smoothing_fwhm: Full Width at Half Maximum (FWHM) for smoothing (default: 5). + - n_jobs: Number of parallel jobs to run (default: 1). + + Returns: + - Parcels created from the input image and mask. + """ + if verbose: + print("[Loading/Parcel] : Parcellating...") + + if isinstance(mask, Nifti1Image): + mask = NiftiMasker( + mask_img=mask, standardize=True, smoothing_fwhm=smoothing_fwhm + ) + # Parcellation + indexes = [1] + labels = _make_parcellation( + imgs=niimg, + clustering_index=indexes, + clustering="kmeans", + n_pieces=n_parcels, + masker=mask, + smoothing_fwhm=mask.smoothing_fwhm, + ) + + parcels = create_parcels_from_labels(labels) + + print("Minimum length parcel: ", min([len(p) for p in parcels])) + + return parcels + + +################################################################################################### +# Computing searchlights +################################################################################################### + + +def _apply_mask_and_get_affinity( + seeds, niimg, radius, allow_overlap, mask_img=None, n_jobs=1 +): + """Get only the rows which are occupied by sphere \ + at given seed locations and the provided radius. + + Rows are in target_affine and target_shape space. + + Parameters + ---------- + seeds : List of triplets of coordinates in native space + Seed definitions. List of coordinates of the seeds in the same space + as target_affine. + + niimg : 3D/4D Niimg-like object + See :ref:`extracting_data`. + Images to process. + If a 3D niimg is provided, a singleton dimension will be added to + the output to represent the single scan in the niimg. + + radius : float + Indicates, in millimeters, the radius for the sphere around the seed. + + allow_overlap : boolean + If False, a ValueError is raised if VOIs overlap + + mask_img : Niimg-like object, optional + Mask to apply to regions before extracting signals. If niimg is None, + mask_img is used as a reference space in which the spheres 'indices are + placed. + + Returns + ------- + X : 2D numpy.ndarray + Signal for each brain voxel in the (masked) niimgs. + shape: (number of scans, number of voxels) + + A : scipy.sparse.lil_matrix + Contains the boolean indices for each sphere. + shape: (number of seeds, number of voxels) + + """ + seeds = list(seeds) + + # Compute world coordinates of all in-mask voxels. + if niimg is None: + mask, affine = masking._load_mask_img(mask_img) + # Get coordinate for all voxels inside of mask + mask_coords = np.asarray(np.nonzero(mask)).T.tolist() + X = None + + elif mask_img is not None: + affine = niimg.affine + mask_img = check_niimg_3d(mask_img) + mask_img = image.resample_img( + mask_img, + target_affine=affine, + target_shape=niimg.shape[:3], + interpolation="nearest", + ) + mask, _ = masking.load_mask_img(mask_img) + mask_coords = list(zip(*np.where(mask != 0))) + + X = masking.apply_mask_fmri(niimg, mask_img) + + elif niimg is not None: + affine = niimg.affine + if np.isnan(np.sum(safe_get_data(niimg))): + warnings.warn( + "The imgs you have fed into fit_transform() contains NaN " + "values which will be converted to zeroes." + ) + X = safe_get_data(niimg, True).reshape([-1, niimg.shape[3]]).T + else: + X = safe_get_data(niimg).reshape([-1, niimg.shape[3]]).T + + mask_coords = list(np.ndindex(niimg.shape[:3])) + + else: + raise ValueError("Either a niimg or a mask_img must be provided.") + + # For each seed, get coordinates of nearest voxel + nearests = [] + for sx, sy, sz in seeds: + nearest = np.round( + image.resampling.coord_transform(sx, sy, sz, np.linalg.inv(affine)) + ) + nearest = nearest.astype(int) + nearest = (nearest[0], nearest[1], nearest[2]) + try: + nearests.append(mask_coords.index(nearest)) + except ValueError: + nearests.append(None) + + mask_coords = np.asarray(list(zip(*mask_coords))) + mask_coords = image.resampling.coord_transform( + mask_coords[0], mask_coords[1], mask_coords[2], affine + ) + mask_coords = np.asarray(mask_coords).T + + clf = neighbors.NearestNeighbors(radius=radius, n_jobs=n_jobs) + A = clf.fit(mask_coords).radius_neighbors_graph(seeds) + A = A.tolil() + for i, nearest in enumerate(nearests): + if nearest is None: + continue + + A[i, nearest] = True + + mask_coords_floats = mask_coords.copy() + + # Include the voxel containing the seed itself if not masked + mask_coords = mask_coords.astype(int).tolist() + for i, seed in enumerate(seeds): + try: + A[i, mask_coords.index(list(map(int, seed)))] = True + except ValueError: + # seed is not in the mask + pass + + sphere_sizes = np.asarray(A.tocsr().sum(axis=1)).ravel() + empty_spheres = np.nonzero(sphere_sizes == 0)[0] + if len(empty_spheres) != 0: + raise ValueError(f"These spheres are empty: {empty_spheres}") + + if (not allow_overlap) and np.any(A.sum(axis=0) >= 2): + raise ValueError("Overlap detected between spheres") + + return X, A, mask_coords_floats + + +def compute_searchlights( + niimg, + mask_img, + process_mask_img=None, + radius=20, + return_dist_mat=False, + n_jobs=1, +): + """ + Compute searchlights for a given 4D image and mask. + + Parameters + ---------- + miimg : Niimg-like object + See :ref:`extracting_data`. + 4D image. + + mask_img : Niimg-like object + See :ref:`extracting_data`. + Boolean image giving location of voxels containing usable signals. + + process_mask_img : Niimg-like object, optional + See :ref:`extracting_data`. + Boolean image giving voxels on which searchlight should be + computed. + + radius : float, optional + radius of the searchlight ball, in millimeters. Defaults to 20. + + return_dist_mat : bool, optional + Whether to return the distance matrix between voxels in the mask. + Defaults to False. + + verbose : int, optional + Verbosity level (0 means no message). + Defaults to 0. + + Returns + ------- + X : 2D numpy.ndarray + Signal for each brain voxel in the (masked) niimgs. + shape: (number of scans, number of voxels) + + A_list : list of lists + Contains the boolean indices for each sphere. + shape: (number of seeds, number of voxels) + + dists : list of lists + Contains the distance between each voxel and the seed. + shape: (number of seeds, number of voxels) + + """ + + # check if image is 4D + niimg = check_niimg_4d(niimg) + + # Get the seeds + if process_mask_img is None: + process_mask_img = mask_img + + # Compute world coordinates of the seeds + process_mask, process_mask_affine = masking.load_mask_img(process_mask_img) + process_mask_coords = np.where(process_mask != 0) + process_mask_coords = coord_transform( + process_mask_coords[0], + process_mask_coords[1], + process_mask_coords[2], + process_mask_affine, + ) + process_mask_coords = np.asarray(process_mask_coords).T + + X, A, mask_coords = _apply_mask_and_get_affinity( + process_mask_coords, + niimg, + radius=radius, + allow_overlap=True, + mask_img=mask_img, + n_jobs=n_jobs, + ) + + A_list = [] + for i in range(A.shape[0]): + A_list.append(A[i].nonzero()[1].tolist()) + + dist_matrix = distance_matrix(mask_coords, mask_coords) + dists = [] + for i, sl in enumerate(A_list): + dists.append(dist_matrix[i, sl]) + + if return_dist_mat: + return X, A_list, dist_matrix + + return X, A_list, dists + + +def searchlight_weights(searchlights, dists, radius): + """ + Calculate the weights for each searchlight based on the distances from the center. + + Parameters: + ---------- + searchlights :list of arrays + List of searchlights, where each searchlight is represented as an array of voxel indices. + dists : array + Array of distances from the center for each searchlight. + radius : float + Radius of the searchlight. + + Returns: + -------- + weights : list + List of weights for each searchlight. + + """ + nv = np.concatenate(searchlights).max() + 1 + weights_sum = np.zeros((nv,)) + for sl, d in zip(searchlights, dists): + w = (radius - d) / radius + weights_sum[sl] += w + # print(np.percentile(weights_sum, np.linspace(0, 100, 11))) + weights = [] + for sl, d in zip(searchlights, dists): + w = (radius - d) / radius + w /= weights_sum[sl] + weights.append(w) + return weights + + +################################################################################################### +# Hyperalignment +################################################################################################### + + +def iter_hyperalignment( + X, + Y, + regions, + sl_func, + return_betas=False, +): + """ + Tool function to iterate hyperalignment over pieces of data. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The source data matrix. + Y : array-like of shape (n_samples, n_features) + The target data matrix. + regions : array-like + The indices of the regions. + sl_func : function + The function to use for hyperalignment. + weights : array-like, optional + The weights to use for weighted hyperalignment. Defaults to None. + return_betas : bool, optional + Whether to return the coefficients of regression instead of the prediciton. + Defaults to False. + + Returns + ------- + res : array-like + The transformed data matrix. + + """ + if return_betas: + T = np.zeros((X.shape[1], Y.shape[1]), dtype=np.float32) + else: + Yhat = np.zeros_like(X, dtype=np.float32) + + searchlights_iter = regions + for sl in searchlights_iter: + x, y = X[:, sl], Y[:, sl] + t = sl_func(x, y) + if return_betas: + T[np.ix_(sl, sl)] += t + else: + Yhat[:, sl] += x @ t + + res = T if return_betas else Yhat + return res + + +def piece_procrustes( + X, + Y, + regions, + T0=None, + reflection=True, + scaling=False, +): + """ + Computes a transformation matrix from a template and a target signal using Procrustes hyperalignment. + + Parameters: + ---------- + X : ndarray + The source data matrix of shape (n_samples, n_features). + Y : ndarray + The target data matrix of shape (n_samples, n_features). + regions : list of arrays + List of brain regions. Contains the indices of the voxels in each region (either parcels or searchlights). + T0 : array-like, optional + The initial transformation matrix. Defaults to None. + reflection : bool, optional + Whether to allow reflection. Defaults to True. + + Returns: + ------- + T : array-like + The transformation matrix T. + + + """ + sl_func = functools.partial(procrustes, reflection=reflection, scaling=scaling) + T = iter_hyperalignment( + X, + Y, + regions, + T0=T0, + sl_func=sl_func, + ) + return T + + +def piece_ridge( + X, + Y, + regions, + alpha=1e3, + verbose=False, + return_betas=False, +): + """ + Perform searchlight ridge regression for hyperalignment. + + Parameters: + ---------- + X : ndarray + The source data matrix of shape (n_samples, n_features). + Y : ndarray + The target data matrix of shape (n_samples, n_features). + regions : list of arrays + List of brain regions. Contains the indices of the voxels in each region (either parcels or searchlights). + alpha : float(optional) + The regularization parameter for Ridge regression. Defaults to 1e3. + return_betas : bool(optional) + Whether to return the coefficients of regression instead of the prediciton. Defaults to False. + + Returns: + ------- + T : array-like + The transformation matrix T. + + """ + sl_func = functools.partial(ridge, alpha=alpha) + + T = iter_hyperalignment( + X, + Y, + regions, + sl_func=sl_func, + return_betas=return_betas, + ) + return T + + +def template( + X, + regions, + n_jobs=1, + template_kind="pca", + common_topography=True, + weights=None, +): + """ + Compute a template by aggregating local templates within searchlights. + + Parameters: + ---------- + + X : ndarray + The input data matrix of shape (n_subjects, n_samples, n_features). + regions : list of ndarrays + List of regions composed of indices of voxels. + n_jobs : int(optional) + The number of parallel jobs to run. Defaults to 1. + template_kind : str(optional) + The kind of template to compute. Defaults to "pca". + + + Returns: + ------- + template : ndarray of shape (n_timepoints, n_voxels) + The computed template. + + """ + with Parallel(n_jobs=n_jobs, batch_size=1, verbose=1) as parallel: + local_templates = parallel( + delayed(compute_template)( + X, + region=region, + kind=template_kind, + n_components=150, + common_topography=common_topography, + ) + for region in regions + ) + + template = np.zeros_like(X[0]) + if weights is not None: + for local_template, w, region in zip(local_templates, weights, regions): + template[:, region] += local_template * w[np.newaxis] + else: + for local_template, region in zip(local_templates, regions): + template[:, region] += local_template + return template diff --git a/fmralign/hyperalignment/test_hyperalignment.py b/fmralign/hyperalignment/test_hyperalignment.py new file mode 100644 index 0000000..9100381 --- /dev/null +++ b/fmralign/hyperalignment/test_hyperalignment.py @@ -0,0 +1,126 @@ +from fmralign.alignment_methods import IndividualizedNeuralTuning as INT +from fmralign.fetch_example_data import ( + generate_dummy_signal, + generate_dummy_searchlights, +) +import numpy as np + +from fmralign.hyperalignment.correlation import ( + tuning_correlation, + stimulus_correlation, +) + + +def test_int_fit_predict(): + """Test if the outputs and arguments of the INT are the correct format, + and if decomposition is working. Without proper searchlight input + (ie all voxels are used)""" + # Create random data + X_train, X_test, S_true_first_part, S_true_second_part, _ = generate_dummy_signal( + n_subjects=7, + n_timepoints=50, + n_voxels=300, + S_std=1, + T_std=1, + latent_dim=6, + SNR=100, + generative_method="custom", + seed=0, + ) + + # Testing without searchlights + searchlights = [np.arange(300)] + dists = [np.ones((300,))] + + # Test INT on the two parts of the data (ie different runs of the experiment) + int1 = INT(n_components=6, searchlights=searchlights, dists=dists) + int2 = INT(n_components=6, searchlights=searchlights, dists=dists) + int1.fit(X_train) + int2.fit(X_test) + + X_pred = int1.transform(X_test) + # save individual components + + tuning_data_run_1 = int1.tuning_data + tuning_data_run_2 = int2.tuning_data + tuning_data_run_1 = np.array(tuning_data_run_1) + tuning_data_run_2 = np.array(tuning_data_run_2) + + stimulus_run_1 = int1.shared_response + S_estimated_second_part = int2.shared_response + + corr1 = tuning_correlation(tuning_data_run_1, tuning_data_run_2) + corr2 = stimulus_correlation(stimulus_run_1.T, S_true_first_part.T) + corr3 = stimulus_correlation(S_estimated_second_part.T, S_true_second_part.T) + corr4 = tuning_correlation(X_pred, X_test) + + # Check that the correlation between the two parts of the data is high + corr1_out = corr1 - np.diag(corr1) + corr2_out = corr2 - np.diag(corr2) + corr3_out = corr3 - np.diag(corr3) + corr4_out = corr4 - np.diag(corr4) + assert 3 * np.mean(corr1_out) < np.mean(np.diag(corr1)) + assert 3 * np.mean(corr2_out) < np.mean(np.diag(corr2)) + assert 3 * np.mean(corr3_out) < np.mean(np.diag(corr3)) + assert 3 * np.mean(corr4_out) < np.mean(np.diag(corr4)) + + # Check that predicted components have the same shape as original data + assert int1.tuning_data[0].shape == (6, int1.n_voxels) + assert int2.tuning_data[0].shape == (6, int2.n_voxels) + assert int1.shared_response.shape == (int1.n_time_points, 6) + assert X_pred.shape == X_test.shape + + +def test_int_with_searchlight(): + """Test if the outputs and arguments of the INT are the correct format and + if the decomposition is working, with searchlight input""" + X_train, X_test, stimulus_train, stimulus_test, _ = generate_dummy_signal( + n_subjects=5, + n_timepoints=50, + n_voxels=300, + S_std=1, + T_std=1, + latent_dim=6, + SNR=100, + generative_method="custom", + seed=0, + ) + searchlights, dists = generate_dummy_searchlights( + n_searchlights=10, n_voxels=30, radius=5, seed=0 + ) + + # Test INT on the two parts of the data (ie different runs of the experiment) + model1 = INT(n_components=6, searchlights=searchlights, dists=dists, radius=5) + model2 = INT(n_components=6, searchlights=searchlights, dists=dists, radius=5) + model1.fit(X_train) + model2.fit(X_test) + X_pred = model1.transform(X_test) + + tuning_data_run_1 = model1.tuning_data + tuning_data_run_2 = model2.tuning_data + tuning_data_run_1 = np.array(tuning_data_run_1) + tuning_data_run_2 = np.array(tuning_data_run_2) + + stimulus_run_1 = model1.shared_response + stimulus_run_2 = model2.shared_response + + corr1 = tuning_correlation(tuning_data_run_1, tuning_data_run_2) + corr2 = stimulus_correlation(stimulus_run_1.T, stimulus_train.T) + corr3 = stimulus_correlation(stimulus_run_2.T, stimulus_test.T) + corr4 = tuning_correlation(X_pred, X_test) + + # Check that the correlation between the two parts of the data is high + corr1_out = corr1 - np.diag(corr1) + corr2_out = corr2 - np.diag(corr2) + corr3_out = corr3 - np.diag(corr3) + corr4_out = corr4 - np.diag(corr4) + assert 3 * np.mean(corr1_out) < np.mean(np.diag(corr1)) + assert 3 * np.mean(corr2_out) < np.mean(np.diag(corr2)) + assert 3 * np.mean(corr3_out) < np.mean(np.diag(corr3)) + assert 3 * np.mean(corr4_out) < np.mean(np.diag(corr4)) + + # Check that predicted components have the same shape as original data + assert model1.tuning_data[0].shape == (6, model1.n_voxels) + assert model2.tuning_data[0].shape == (6, model2.n_voxels) + assert model1.shared_response.shape == (model1.n_time_points, 6) + assert X_pred.shape == X_test.shape diff --git a/fmralign/metrics.py b/fmralign/metrics.py index cc2d29e..d9b330c 100644 --- a/fmralign/metrics.py +++ b/fmralign/metrics.py @@ -4,7 +4,9 @@ from sklearn.metrics import r2_score -def score_voxelwise(ground_truth, prediction, masker, loss, multioutput="raw_values"): +def score_voxelwise( + ground_truth, prediction, masker, loss, multioutput="raw_values" +): """ Calculate loss function for predicted, ground truth arrays. Supported scores are R2, correlation, and normalized @@ -51,7 +53,9 @@ def score_voxelwise(ground_truth, prediction, masker, loss, multioutput="raw_val if loss == "R2": score = r2_score(X_gt, X_pred, multioutput=multioutput) elif loss == "n_reconstruction_err": - score = normalized_reconstruction_error(X_gt, X_pred, multioutput=multioutput) + score = normalized_reconstruction_error( + X_gt, X_pred, multioutput=multioutput + ) elif loss == "corr": score = np.array( [ @@ -124,7 +128,9 @@ def normalized_reconstruction_error( # Calculate reconstruction error output_scores = np.ones([y_true.shape[-1]]) - output_scores[valid_score] = 1 - (numerator[valid_score] / denominator[valid_score]) + output_scores[valid_score] = 1 - ( + numerator[valid_score] / denominator[valid_score] + ) if multioutput == "raw_values": # return scores individually return output_scores diff --git a/fmralign/template_alignment.py b/fmralign/template_alignment.py index dac5655..65cdb37 100644 --- a/fmralign/template_alignment.py +++ b/fmralign/template_alignment.py @@ -11,7 +11,6 @@ from nilearn.image import concat_imgs, index_img, load_img from nilearn._utils.masker_validation import check_embedded_masker from sklearn.base import BaseEstimator, TransformerMixin - from fmralign.pairwise_alignment import PairwiseAlignment @@ -265,7 +264,7 @@ def __init__( alignment_method: string Algorithm used to perform alignment between X_i and Y_i : * either 'identity', 'scaled_orthogonal', 'optimal_transport', - 'ridge_cv', 'permutation', 'diagonal' + 'ridge_cv', 'permutation', 'diagonal', * or an instance of one of alignment classes (imported from functional_alignment.alignment_methods) n_pieces: int, optional (default = 1) @@ -376,6 +375,7 @@ def fit(self, imgs): Length : n_samples """ + # Check if the input is a list, if list of lists, concatenate each subjects # data into one unique image. if not isinstance(imgs, (list, np.ndarray)) or len(imgs) < 2: @@ -441,6 +441,7 @@ def transform(self, imgs, train_index, test_index): Each Niimg has the same length as the list test_index """ + if not isinstance(imgs, (list, np.ndarray)): raise ValueError( "The method TemplateAlignment.transform() need a list input. " diff --git a/fmralign/tests/test_alignment_methods.py b/fmralign/tests/test_alignment_methods.py index 7a9059e..7dbddaa 100644 --- a/fmralign/tests/test_alignment_methods.py +++ b/fmralign/tests/test_alignment_methods.py @@ -91,7 +91,11 @@ def test_scaled_procrustes_on_simple_exact_cases(): """3D Rotation""" R = np.array( - [[1.0, 0.0, 0.0], [0.0, np.cos(1), -np.sin(1)], [0.0, np.sin(1), np.cos(1)]] + [ + [1.0, 0.0, 0.0], + [0.0, np.cos(1), -np.sin(1)], + [0.0, np.sin(1), np.cos(1)], + ] ) X = np.random.rand(3, 4) X = X - X.mean(axis=1, keepdims=True) diff --git a/fmralign/tests/test_metrics.py b/fmralign/tests/test_metrics.py index fc72f74..62ebcef 100644 --- a/fmralign/tests/test_metrics.py +++ b/fmralign/tests/test_metrics.py @@ -7,8 +7,12 @@ def test_score_voxelwise(): - A = np.asarray([[[[1, 1.2, 1, 1.2, 1]], [[1, 1, 1, 0.2, 1]], [[1, -1, 1, -1, 1]]]]) - B = np.asarray([[[[0, 0.2, 0, 0.2, 0]], [[0.2, 1, 1, 1, 1]], [[-1, 1, -1, 1, -1]]]]) + A = np.asarray( + [[[[1, 1.2, 1, 1.2, 1]], [[1, 1, 1, 0.2, 1]], [[1, -1, 1, -1, 1]]]] + ) + B = np.asarray( + [[[[0, 0.2, 0, 0.2, 0]], [[0.2, 1, 1, 1, 1]], [[-1, 1, -1, 1, -1]]]] + ) im_A = nib.Nifti1Image(A, np.eye(4)) im_B = nib.Nifti1Image(B, np.eye(4)) mask_img = nib.Nifti1Image(np.ones(im_A.shape[0:3]), np.eye(4)) @@ -29,7 +33,9 @@ def test_score_voxelwise(): assert_array_almost_equal(r2, [-1.0, -1.0, -1.0]) # check normalized reconstruction - norm_rec = metrics.score_voxelwise(im_A, im_B, masker, loss="n_reconstruction_err") + norm_rec = metrics.score_voxelwise( + im_A, im_B, masker, loss="n_reconstruction_err" + ) assert_array_almost_equal(norm_rec, [0.14966, 0.683168, -1.0]) diff --git a/fmralign/tests/test_pairwise_alignment.py b/fmralign/tests/test_pairwise_alignment.py index 1420bf7..5698d7d 100644 --- a/fmralign/tests/test_pairwise_alignment.py +++ b/fmralign/tests/test_pairwise_alignment.py @@ -29,7 +29,12 @@ def test_pairwise_identity(): args_list = [ {"alignment_method": "identity", "mask": mask_img}, {"alignment_method": "identity", "n_pieces": 3, "mask": mask_img}, - {"alignment_method": "identity", "n_pieces": 3, "n_bags": 4, "mask": mask_img}, + { + "alignment_method": "identity", + "n_pieces": 3, + "n_bags": 4, + "mask": mask_img, + }, { "alignment_method": "identity", "n_pieces": 3, @@ -58,7 +63,9 @@ def test_pairwise_identity(): ) with pytest.warns(UserWarning): algo.fit(img1, img1) - assert (algo.mask.get_fdata() > 0).sum() == (clustering.get_fdata() > 0).sum() + assert (algo.mask.get_fdata() > 0).sum() == ( + clustering.get_fdata() > 0 + ).sum() # test warning raised if parcel is 0 : null_im = new_img_like(img1, np.zeros_like(img1.get_fdata())) diff --git a/fmralign/tests/test_template_alignment.py b/fmralign/tests/test_template_alignment.py index 8798b4a..f04b150 100644 --- a/fmralign/tests/test_template_alignment.py +++ b/fmralign/tests/test_template_alignment.py @@ -4,8 +4,14 @@ from nilearn.maskers import NiftiMasker from numpy.testing import assert_array_almost_equal -from fmralign.template_alignment import TemplateAlignment, _rescaled_euclidean_mean -from fmralign.tests.utils import random_niimg, zero_mean_coefficient_determination +from fmralign.template_alignment import ( + TemplateAlignment, + _rescaled_euclidean_mean, +) +from fmralign.tests.utils import ( + random_niimg, + zero_mean_coefficient_determination, +) def test_template_identity(): @@ -24,7 +30,9 @@ def test_template_identity(): # test euclidian mean function euclidian_template = _rescaled_euclidean_mean(subs, masker) - assert_array_almost_equal(ref_template.get_fdata(), euclidian_template.get_fdata()) + assert_array_almost_equal( + ref_template.get_fdata(), euclidian_template.get_fdata() + ) # test different fit() accept list of list of 3D Niimgs as input. algo = TemplateAlignment(alignment_method="identity", mask=masker) @@ -37,7 +45,12 @@ def test_template_identity(): {"alignment_method": "identity", "mask": masker}, {"alignment_method": "identity", "mask": masker, "n_jobs": 2}, {"alignment_method": "identity", "n_pieces": 3, "mask": masker}, - {"alignment_method": "identity", "n_pieces": 3, "n_bags": 2, "mask": masker}, + { + "alignment_method": "identity", + "n_pieces": 3, + "n_bags": 2, + "mask": masker, + }, ] for args in args_list: @@ -45,9 +58,13 @@ def test_template_identity(): # Learning a template which is algo.fit(subs) # test template - assert_array_almost_equal(ref_template.get_fdata(), algo.template.get_fdata()) + assert_array_almost_equal( + ref_template.get_fdata(), algo.template.get_fdata() + ) predicted_imgs = algo.transform( - [index_img(sub_1, range(8))], train_index=range(8), test_index=range(8, 10) + [index_img(sub_1, range(8))], + train_index=range(8), + test_index=range(8, 10), ) ground_truth = index_img(ref_template, range(8, 10)) assert_array_almost_equal( @@ -65,7 +82,9 @@ def test_template_identity(): for train_ind, test_ind in zip(train_inds, test_inds): with pytest.raises(Exception): assert algo.transform( - [index_img(sub_1, range(2))], train_index=train_ind, test_index=test_ind + [index_img(sub_1, range(2))], + train_index=train_ind, + test_index=test_ind, ) # test wrong images input in fit() and transform method @@ -116,4 +135,6 @@ def test_template_closer_to_target(): avg_data, template_data ) assert template_mean_distance >= mean_distance_1 - assert template_mean_distance >= mean_distance_2 - 1.0e-3 # for robustness + assert ( + template_mean_distance >= mean_distance_2 - 1.0e-3 + ) # for robustness diff --git a/fmralign/tests/test_utils.py b/fmralign/tests/test_utils.py index d82359a..0b70802 100644 --- a/fmralign/tests/test_utils.py +++ b/fmralign/tests/test_utils.py @@ -31,7 +31,9 @@ def test_make_parcellation(): # check that not inputing n_pieces yields problems with pytest.raises(Exception): - assert _make_parcellation(img, indexes, clustering_method, 0, masker) + assert _make_parcellation( + img, indexes, clustering_method, 0, masker + ) clustering = nibabel.Nifti1Image( np.hstack([np.ones((7, 3, 8)), 2 * np.ones((7, 3, 8))]), np.eye(4) diff --git a/fmralign/tests/utils.py b/fmralign/tests/utils.py index e2f9fc5..b30dbf2 100644 --- a/fmralign/tests/utils.py +++ b/fmralign/tests/utils.py @@ -41,7 +41,9 @@ def zero_mean_coefficient_determination( nonzero_numerator = numerator != 0 valid_score = nonzero_denominator & nonzero_numerator output_scores = np.ones([y_true.shape[1]]) - output_scores[valid_score] = 1 - (numerator[valid_score] / denominator[valid_score]) + output_scores[valid_score] = 1 - ( + numerator[valid_score] / denominator[valid_score] + ) output_scores[nonzero_numerator & ~nonzero_denominator] = 0 if multioutput == "raw_values": @@ -52,7 +54,8 @@ def zero_mean_coefficient_determination( avg_weights = None elif multioutput == "variance_weighted": avg_weights = ( - weight * (y_true - np.average(y_true, axis=0, weights=sample_weight)) ** 2 + weight + * (y_true - np.average(y_true, axis=0, weights=sample_weight)) ** 2 ).sum(axis=0, dtype=np.float64) # avoid fail on constant y or one-element arrays if not np.any(nonzero_denominator):