From fe7d0e510042a724f19fdc479ff736aa82257bc5 Mon Sep 17 00:00:00 2001 From: withmywoessner Date: Wed, 7 Feb 2024 16:36:29 -0600 Subject: [PATCH 1/4] Init --- .gitignore | 1 + mne/stats/__init__.pyi | 2 + mne/stats/erp/__init__.pyi | 2 + mne/stats/erp/_erp.py | 386 +++++++++++++++++++++++++++++++++++++ 4 files changed, 391 insertions(+) create mode 100644 mne/stats/erp/__init__.pyi create mode 100644 mne/stats/erp/_erp.py diff --git a/.gitignore b/.gitignore index 118eebd9c76..fdb1e8a0574 100644 --- a/.gitignore +++ b/.gitignore @@ -101,3 +101,4 @@ venv/ .hypothesis/ .ruff_cache/ .ipynb_checkpoints/ +mne/test_JAW.ipynb diff --git a/mne/stats/__init__.pyi b/mne/stats/__init__.pyi index 5d15a0462be..783874d58ea 100644 --- a/mne/stats/__init__.pyi +++ b/mne/stats/__init__.pyi @@ -19,7 +19,9 @@ __all__ = [ "summarize_clusters_stc", "ttest_1samp_no_p", "ttest_ind_no_p", + "erp", ] +from . import erp from ._adjacency import combine_adjacency from .cluster_level import ( _st_mask_from_s_inds, diff --git a/mne/stats/erp/__init__.pyi b/mne/stats/erp/__init__.pyi new file mode 100644 index 00000000000..9e11b5a98e4 --- /dev/null +++ b/mne/stats/erp/__init__.pyi @@ -0,0 +1,2 @@ +# ruff: noqa +# flake8: noqa diff --git a/mne/stats/erp/_erp.py b/mne/stats/erp/_erp.py new file mode 100644 index 00000000000..59a25d92aaf --- /dev/null +++ b/mne/stats/erp/_erp.py @@ -0,0 +1,386 @@ +# ruff: noqa +# flake8: noqa + + +def _erp_measure_setup(evoked, ch_type, merge_grads): + supported = ( + "mag", + "grad", + "eeg", + "seeg", + "dbs", + "ecog", + "misc", + "None", + ) + _FNIRS_CH_TYPES_SPLIT + types_used = evoked.get_channel_types(unique=True, only_data_chs=True) + + _check_option("ch_type", str(ch_type), supported) + + if ch_type is not None and ch_type not in types_used: + raise ValueError(f'Channel type "{ch_type}" not found in this evoked object.') + + elif len(types_used) > 1 and ch_type is None: + raise RuntimeError( + 'Multiple data channel types found. Please pass the "ch_type" ' "parameter." + ) + + if merge_grads: + if ch_type != "grad": + raise ValueError('Channel type must be "grad" for merge_grads') + elif mode == "neg": + raise ValueError( + "Negative mode (mode=neg) does not make " "sense with merge_grads=True" + ) + + if ch_type is not None: + if merge_grads: + picks = _pair_grad_sensors(evoked.info, topomap_coords=False) + else: + sel_evoked = evoked.pick( + ["meg", "eeg", "misc", "seeg", "ecog", "dbs", "fnirs"] + ) + + data = sel_evoked.data + ch_names = sel_evoked.ch_names + + if merge_grads: + data, _ = _merge_ch_data(data, ch_type, []) + ch_names = [ch_name[:-1] + "X" for ch_name in ch_names[::2]] + + return data, ch_names + + +def get_peak( + insta, + ch_type=None, + tmin=None, + tmax=None, + mode="abs", + time_as_index=False, + merge_grads=False, + return_amplitude=False, + *, + strict=True, +): + """Get location and latency of peak amplitude. + + Parameters + ---------- + ch_type : str | None + The channel type to use. Defaults to None. If more than one channel + type is present in the data, this value **must** be provided. + tmin : float | None + The minimum point in time to be considered for peak getting. + If None (default), the beginning of the data is used. + tmax : float | None + The maximum point in time to be considered for peak getting. + If None (default), the end of the data is used. + mode : 'pos' | 'neg' | 'abs' + How to deal with the sign of the data. If 'pos' only positive + values will be considered. If 'neg' only negative values will + be considered. If 'abs' absolute values will be considered. + Defaults to 'abs'. + time_as_index : bool + Whether to return the time index instead of the latency in seconds. + merge_grads : bool + If True, compute peak from merged gradiometer data. + return_amplitude : bool + If True, return also the amplitude at the maximum response. + + .. versionadded:: 0.16 + strict : bool + If True, raise an error if values are all positive when detecting + a minimum (mode='neg'), or all negative when detecting a maximum + (mode='pos'). Defaults to True. + + .. versionadded:: 1.7 + + Returns + ------- + ch_name : str + The channel exhibiting the maximum response. + latency : float | int + The time point of the maximum response, either latency in seconds + or index. + amplitude : float + The amplitude of the maximum response. Only returned if + return_amplitude is True. + + .. versionadded:: 0.16 + """ # noqa: E501 + supported = ( + "mag", + "grad", + "eeg", + "seeg", + "dbs", + "ecog", + "misc", + "None", + ) + _FNIRS_CH_TYPES_SPLIT + types_used = self.get_channel_types(unique=True, only_data_chs=True) + + _check_option("ch_type", str(ch_type), supported) + + if ch_type is not None and ch_type not in types_used: + raise ValueError(f'Channel type "{ch_type}" not found in this evoked object.') + + elif len(types_used) > 1 and ch_type is None: + raise RuntimeError( + 'Multiple data channel types found. Please pass the "ch_type" ' "parameter." + ) + + if merge_grads: + if ch_type != "grad": + raise ValueError('Channel type must be "grad" for merge_grads') + elif mode == "neg": + raise ValueError( + "Negative mode (mode=neg) does not make " "sense with merge_grads=True" + ) + + meg = eeg = misc = seeg = dbs = ecog = fnirs = False + picks = None + if ch_type in ("mag", "grad"): + meg = ch_type + elif ch_type == "eeg": + eeg = True + elif ch_type == "misc": + misc = True + elif ch_type == "seeg": + seeg = True + elif ch_type == "dbs": + dbs = True + elif ch_type == "ecog": + ecog = True + elif ch_type in _FNIRS_CH_TYPES_SPLIT: + fnirs = ch_type + + if ch_type is not None: + if merge_grads: + picks = _pair_grad_sensors(self.info, topomap_coords=False) + else: + picks = pick_types( + self.info, + meg=meg, + eeg=eeg, + misc=misc, + seeg=seeg, + ecog=ecog, + ref_meg=False, + fnirs=fnirs, + dbs=dbs, + ) + data = self.data + ch_names = self.ch_names + + if picks is not None: + data = data[picks] + ch_names = [ch_names[k] for k in picks] + + if merge_grads: + data, _ = _merge_ch_data(data, ch_type, []) + ch_names = [ch_name[:-1] + "X" for ch_name in ch_names[::2]] + + ch_idx, time_idx, max_amp = _get_peak( + data, + self.times, + tmin, + tmax, + mode, + strict=strict, + ) + + out = (ch_names[ch_idx], time_idx if time_as_index else self.times[time_idx]) + + if return_amplitude: + out += (max_amp,) + + return out + + +def _get_peak(data, times, tmin=None, tmax=None, mode="abs", *, strict=True): + """Get feature-index and time of maximum signal from 2D array. + + Note. This is a 'getter', not a 'finder'. For non-evoked type + data and continuous signals, please use proper peak detection algorithms. + + Parameters + ---------- + data : instance of numpy.ndarray (n_locations, n_times) + The data, either evoked in sensor or source space. + times : instance of numpy.ndarray (n_times) + The times in seconds. + tmin : float | None + The minimum point in time to be considered for peak getting. + tmax : float | None + The maximum point in time to be considered for peak getting. + mode : {'pos', 'neg', 'abs'} + How to deal with the sign of the data. If 'pos' only positive + values will be considered. If 'neg' only negative values will + be considered. If 'abs' absolute values will be considered. + Defaults to 'abs'. + strict : bool + If True, raise an error if values are all positive when detecting + a minimum (mode='neg'), or all negative when detecting a maximum + (mode='pos'). Defaults to True. + + Returns + ------- + max_loc : int + The index of the feature with the maximum value. + max_time : int + The time point of the maximum response, index. + max_amp : float + Amplitude of the maximum response. + """ + _check_option("mode", mode, ["abs", "neg", "pos"]) + + if tmin is None: + tmin = times[0] + if tmax is None: + tmax = times[-1] + + if tmin < times.min() or tmax > times.max(): + if tmin < times.min(): + param_name = "tmin" + param_val = tmin + else: + param_name = "tmax" + param_val = tmax + + raise ValueError( + f"{param_name} ({param_val}) is out of bounds. It must be " + f"between {times.min()} and {times.max()}" + ) + elif tmin > tmax: + raise ValueError(f"tmin ({tmin}) must be <= tmax ({tmax})") + + time_win = (times >= tmin) & (times <= tmax) + mask = np.ones_like(data).astype(bool) + mask[:, time_win] = False + + maxfun = np.argmax + if mode == "pos": + if strict and not np.any(data[~mask] > 0): + raise ValueError( + "No positive values encountered. Cannot " "operate in pos mode." + ) + elif mode == "neg": + if strict and not np.any(data[~mask] < 0): + raise ValueError( + "No negative values encountered. Cannot " "operate in neg mode." + ) + maxfun = np.argmin + + masked_index = np.ma.array(np.abs(data) if mode == "abs" else data, mask=mask) + + max_loc, max_time = np.unravel_index(maxfun(masked_index), data.shape) + + return max_loc, max_time, data[max_loc, max_time] + + +def get_mean_amplitude( + insta, + ch_type=None, + tmin=None, + tmax=None, + mode="abs", + time_as_index=False, + merge_grads=False, + return_amplitude=False, +): + """Get the mean amplitude in a specific time window. + + Parameters + ---------- + ch_type : str | None + The channel type to use. If None, the first available channel type from + the following list is used: 'mag', 'grad', 'planar1', 'planar2', 'eeg'. + tmin : float | None + The beginning of the time window in seconds. If None the beginning of + the data is used. + tmax : float | None + The end of the time window in seconds. If None the end of the data is + used. + mode : str + How to combine multiple channels. The following options are available: + 'abs' : Take the absolute value of each channel and then average. + 'mean' : Average across channels. + 'max' : Take the maximum across channels. + 'mean_signed' : Average across channels, retaining sign. + 'median' : Take the median across channels. + 'percentile' : Take the specified percentile across channels. If + percentile is 50, this is the same as 'median'. + time_as_index : bool + Whether to consider time as index or as float (default False). + merge_grads : bool + If True, merge gradiometer data into one value by taking the RMS + (root mean square) for each pair of gradiometers. The RMS is taken + over the pair of gradiometers before averaging across channels. + This is only used for MEG data. + return_amplitude : bool + If True, return the amplitude values. If False, return the evoked + instance (default False). + strict : bool + If True, raise an error if channels are missing. If False, ignore + channels that are missing. + + Returns + ------- + evoked : instance of Evoked + The modified evoked instance. If return_amplitude is True, the + amplitude values are returned instead. + + See Also + -------- + mne.Evoked.get_peak : Get the time and value of the peak amplitude.""" + # check that the data is preloaded + + pass + + +def get_area(insta, ch_type=None, tmin=None, tmax=None, mode="abs", merge_grads=False): + """Get the area under the curve in a specific time window. + + Parameters + ---------- + ch_type : str | None + The channel type to use. If None, the first available channel type from + the following list is used: 'mag', 'grad', 'planar1', 'planar2', 'eeg'. + tmin : float | None + The beginning of the time window in seconds. If None the beginning of + the data is used. + tmax : float | None + The end of the time window in seconds. If None the end of the data is + used. + mode : str + How to combine multiple channels. The following options are available: + 'abs' : Take the absolute value of each channel and then average. + 'mean' : Average across channels. + 'max' : Take the maximum across channels. + 'mean_signed' : Average across channels, retaining sign. + 'median' : Take the median across channels. + 'percentile' : Take the specified percentile across channels. If + percentile is 50, this is the same as 'median'. + merge_grads : bool + If True, merge gradiometer data into one value by taking the RMS + (root mean square) for each pair of gradiometers. The RMS is taken + over the pair of gradiometers before averaging across channels. + This is only used for MEG data. + return_area : bool + If True, return the area values. If False, return the evoked instance + (default False). + + Returns + ------- + evoked : instance of Evoked + The modified evoked instance. If return_area is True, the area values + are returned instead. + + See Also + -------- + mne.Evoked.get_peak : Get the time and value of the peak amplitude.""" + # check that the data is preloaded + + pass From fbbb35d4fdd7a6be3957303aca07b81e18dc8704 Mon Sep 17 00:00:00 2001 From: withmywoessner Date: Thu, 8 Feb 2024 14:36:04 -0600 Subject: [PATCH 2/4] Update find_area --- mne/stats/erp/_erp.py | 47 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/mne/stats/erp/_erp.py b/mne/stats/erp/_erp.py index 59a25d92aaf..0b451bdb60f 100644 --- a/mne/stats/erp/_erp.py +++ b/mne/stats/erp/_erp.py @@ -1,6 +1,13 @@ # ruff: noqa # flake8: noqa +from scipy import integrate +import numpy as np +from .channels.layout import _merge_ch_data, _pair_grad_sensors +from .utils import ( + _check_option, +) + def _erp_measure_setup(evoked, ch_type, merge_grads): supported = ( @@ -51,6 +58,34 @@ def _erp_measure_setup(evoked, ch_type, merge_grads): return data, ch_names +def _restrict_time_interval(tmin, tmax, times, data): + if tmin is None: + tmin = times[0] + if tmax is None: + tmax = times[-1] + + if tmin < times.min() or tmax > times.max(): + if tmin < times.min(): + param_name = "tmin" + param_val = tmin + else: + param_name = "tmax" + param_val = tmax + + raise ValueError( + f"{param_name} ({param_val}) is out of bounds. It must be " + f"between {times.min()} and {times.max()}" + ) + elif tmin > tmax: + raise ValueError(f"tmin ({tmin}) must be <= tmax ({tmax})") + + time_win = (times >= tmin) & (times <= tmax) + time_mask = np.ones_like(data).astype(bool) + time_mask[:, time_win] = False + + return time_mask + + def get_peak( insta, ch_type=None, @@ -382,5 +417,15 @@ def get_area(insta, ch_type=None, tmin=None, tmax=None, mode="abs", merge_grads= -------- mne.Evoked.get_peak : Get the time and value of the peak amplitude.""" # check that the data is preloaded + _check_option("mode", mode, ["intg", "abs", "neg", "pos"]) + data, ch_names = _erp_measure_setup(insta, ch_type, merge_grads) + time_mask = _restrict_time_interval(tmin, tmax, insta.times, data) - pass + if mode == "abs": + data = np.abs(data) + elif mode == "neg": + # Set positive values to zero + data[data > 0] = 0 + elif mode == "pos": + # Set negative values to zero + data[data < 0] = 0 From d7a066bce140ea76b5d8ff6b1f9d6ead774a2fa4 Mon Sep 17 00:00:00 2001 From: withmywoessner Date: Thu, 8 Feb 2024 14:46:24 -0600 Subject: [PATCH 3/4] Undo ignore change --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index fdb1e8a0574..118eebd9c76 100644 --- a/.gitignore +++ b/.gitignore @@ -101,4 +101,3 @@ venv/ .hypothesis/ .ruff_cache/ .ipynb_checkpoints/ -mne/test_JAW.ipynb From d7d82488de8697f24c8b928ecf6ca095cb95829a Mon Sep 17 00:00:00 2001 From: withmywoessner Date: Mon, 25 Mar 2024 00:06:17 -0500 Subject: [PATCH 4/4] Implement erp functions --- mne/stats/erp/_erp.py | 602 +++++++++++++++++------------------------- mne/utils/docs.py | 13 + 2 files changed, 257 insertions(+), 358 deletions(-) diff --git a/mne/stats/erp/_erp.py b/mne/stats/erp/_erp.py index 0b451bdb60f..d3fa8999128 100644 --- a/mne/stats/erp/_erp.py +++ b/mne/stats/erp/_erp.py @@ -2,430 +2,316 @@ # flake8: noqa from scipy import integrate +import pandas as pd import numpy as np -from .channels.layout import _merge_ch_data, _pair_grad_sensors -from .utils import ( +from mne._fiff.pick import _pick_data_channels, _picks_to_idx, pick_info +from mne.channels.layout import _merge_ch_data, _pair_grad_sensors +from mne.utils import ( _check_option, + _time_mask, ) -def _erp_measure_setup(evoked, ch_type, merge_grads): - supported = ( - "mag", - "grad", - "eeg", - "seeg", - "dbs", - "ecog", - "misc", - "None", - ) + _FNIRS_CH_TYPES_SPLIT - types_used = evoked.get_channel_types(unique=True, only_data_chs=True) - - _check_option("ch_type", str(ch_type), supported) - - if ch_type is not None and ch_type not in types_used: - raise ValueError(f'Channel type "{ch_type}" not found in this evoked object.') - - elif len(types_used) > 1 and ch_type is None: - raise RuntimeError( - 'Multiple data channel types found. Please pass the "ch_type" ' "parameter." - ) - - if merge_grads: - if ch_type != "grad": - raise ValueError('Channel type must be "grad" for merge_grads') - elif mode == "neg": - raise ValueError( - "Negative mode (mode=neg) does not make " "sense with merge_grads=True" - ) - - if ch_type is not None: - if merge_grads: - picks = _pair_grad_sensors(evoked.info, topomap_coords=False) - else: - sel_evoked = evoked.pick( - ["meg", "eeg", "misc", "seeg", "ecog", "dbs", "fnirs"] - ) - - data = sel_evoked.data - ch_names = sel_evoked.ch_names - - if merge_grads: - data, _ = _merge_ch_data(data, ch_type, []) - ch_names = [ch_name[:-1] + "X" for ch_name in ch_names[::2]] - - return data, ch_names - +def _get_peak( + evoked, tmin=None, tmax=None, picks="all", mode="abs", average=False, strict=True +): + """Helper function to get the peak amplitude and latency of an evoked response.""" -def _restrict_time_interval(tmin, tmax, times, data): - if tmin is None: - tmin = times[0] - if tmax is None: - tmax = times[-1] + data = evoked.get_data(picks=picks) + times = evoked.times + mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"]) + data_masked = data[:, mask] - if tmin < times.min() or tmax > times.max(): - if tmin < times.min(): - param_name = "tmin" - param_val = tmin - else: - param_name = "tmax" - param_val = tmax + if average: + data_masked = np.mean(data_masked, axis=0) - raise ValueError( - f"{param_name} ({param_val}) is out of bounds. It must be " - f"between {times.min()} and {times.max()}" - ) - elif tmin > tmax: - raise ValueError(f"tmin ({tmin}) must be <= tmax ({tmax})") + if mode == "abs": + data_masked = np.abs(data_masked) + elif mode == "neg": + if strict and not np.any(data_masked < 0): + raise ValueError( + "No negative values encountered. Cannot operate in neg mode." + ) + data_masked = -data_masked + elif mode == "pos": + if strict and not np.any(data_masked > 0): + raise ValueError( + "No positive values encountered. Cannot operate in pos mode." + ) - time_win = (times >= tmin) & (times <= tmax) - time_mask = np.ones_like(data).astype(bool) - time_mask[:, time_win] = False + max_indices = np.argmax(data_masked, axis=1) + peak_amplitudes = data[np.arange(data.shape[0]), max_indices + np.where(mask)[0][0]] + peak_latencies = times[max_indices + np.where(mask)[0][0]] - return time_mask + return peak_latencies, peak_amplitudes, data_masked, mask, times def get_peak( - insta, - ch_type=None, + evoked, tmin=None, tmax=None, + picks="all", mode="abs", - time_as_index=False, - merge_grads=False, - return_amplitude=False, - *, + average=False, strict=True, ): - """Get location and latency of peak amplitude. + """Get the peak amplitude and latency of an evoked response and return a + DataFrame. Parameters ---------- - ch_type : str | None - The channel type to use. Defaults to None. If more than one channel - type is present in the data, this value **must** be provided. - tmin : float | None - The minimum point in time to be considered for peak getting. - If None (default), the beginning of the data is used. - tmax : float | None - The maximum point in time to be considered for peak getting. - If None (default), the end of the data is used. - mode : 'pos' | 'neg' | 'abs' - How to deal with the sign of the data. If 'pos' only positive - values will be considered. If 'neg' only negative values will - be considered. If 'abs' absolute values will be considered. - Defaults to 'abs'. - time_as_index : bool - Whether to return the time index instead of the latency in seconds. - merge_grads : bool - If True, compute peak from merged gradiometer data. - return_amplitude : bool - If True, return also the amplitude at the maximum response. - - .. versionadded:: 0.16 - strict : bool - If True, raise an error if values are all positive when detecting - a minimum (mode='neg'), or all negative when detecting a maximum - (mode='pos'). Defaults to True. - - .. versionadded:: 1.7 + evoked : instance of Evoked + The evoked response object. + %(erp_evoked_tmin_tmax)s + %(picks_all)s + mode : str + Specifies how the peak amplitude should be determined. Can be one of: + - 'abs' : The peak amplitude is the maximum absolute value. + - 'neg': The peak amplitude is the maximum negative value. If there are no + negative values and `strict` is True, a ValueError is raised. + - 'pos': The peak amplitude is the maximum positive value. If there are no + positive values and `strict` is True, a ValueError is raised. + Defaults to abs'. + average : bool + If True, the peak amplitude is computed by averaging the data across + channels before finding the peak. Defaults to False. + %(erp_strict)s Returns ------- - ch_name : str - The channel exhibiting the maximum response. - latency : float | int - The time point of the maximum response, either latency in seconds - or index. - amplitude : float - The amplitude of the maximum response. Only returned if - return_amplitude is True. - - .. versionadded:: 0.16 - """ # noqa: E501 - supported = ( - "mag", - "grad", - "eeg", - "seeg", - "dbs", - "ecog", - "misc", - "None", - ) + _FNIRS_CH_TYPES_SPLIT - types_used = self.get_channel_types(unique=True, only_data_chs=True) - - _check_option("ch_type", str(ch_type), supported) - - if ch_type is not None and ch_type not in types_used: - raise ValueError(f'Channel type "{ch_type}" not found in this evoked object.') - - elif len(types_used) > 1 and ch_type is None: - raise RuntimeError( - 'Multiple data channel types found. Please pass the "ch_type" ' "parameter." - ) - - if merge_grads: - if ch_type != "grad": - raise ValueError('Channel type must be "grad" for merge_grads') - elif mode == "neg": - raise ValueError( - "Negative mode (mode=neg) does not make " "sense with merge_grads=True" - ) + peak_df : pd.DataFrame + A DataFrame with columns 'channels', 'latency', and 'amplitude' + containing the peak amplitude and latency for each channel. + (Will only contain one row 'with 'latency' and 'amplitude' if average=True) + """ - meg = eeg = misc = seeg = dbs = ecog = fnirs = False - picks = None - if ch_type in ("mag", "grad"): - meg = ch_type - elif ch_type == "eeg": - eeg = True - elif ch_type == "misc": - misc = True - elif ch_type == "seeg": - seeg = True - elif ch_type == "dbs": - dbs = True - elif ch_type == "ecog": - ecog = True - elif ch_type in _FNIRS_CH_TYPES_SPLIT: - fnirs = ch_type - - if ch_type is not None: - if merge_grads: - picks = _pair_grad_sensors(self.info, topomap_coords=False) - else: - picks = pick_types( - self.info, - meg=meg, - eeg=eeg, - misc=misc, - seeg=seeg, - ecog=ecog, - ref_meg=False, - fnirs=fnirs, - dbs=dbs, - ) - data = self.data - ch_names = self.ch_names - - if picks is not None: - data = data[picks] - ch_names = [ch_names[k] for k in picks] - - if merge_grads: - data, _ = _merge_ch_data(data, ch_type, []) - ch_names = [ch_name[:-1] + "X" for ch_name in ch_names[::2]] - - ch_idx, time_idx, max_amp = _get_peak( - data, - self.times, - tmin, - tmax, - mode, - strict=strict, + _check_option("mode", mode, ["abs", "neg", "pos", "intg"]) + peak_latencies, peak_amplitudes, data_masked, mask, times = _get_peak( + evoked, tmin, tmax, picks, mode, average, strict ) - out = (ch_names[ch_idx], time_idx if time_as_index else self.times[time_idx]) - - if return_amplitude: - out += (max_amp,) - - return out + peak_df = pd.DataFrame( + { + "channels": evoked.ch_names, + "latency": peak_latencies, + "amplitude": peak_amplitudes, + } + ) + if average: + peak_df = peak_df.iloc[0] + return peak_df -def _get_peak(data, times, tmin=None, tmax=None, mode="abs", *, strict=True): - """Get feature-index and time of maximum signal from 2D array. - Note. This is a 'getter', not a 'finder'. For non-evoked type - data and continuous signals, please use proper peak detection algorithms. +def get_area( + evoked, + tmin=None, + tmax=None, + picks="all", + mode="abs", + average=False, +): + """ + Get the area under the curve of an evoked response within a given time window. Parameters ---------- - data : instance of numpy.ndarray (n_locations, n_times) - The data, either evoked in sensor or source space. - times : instance of numpy.ndarray (n_times) - The times in seconds. - tmin : float | None - The minimum point in time to be considered for peak getting. - tmax : float | None - The maximum point in time to be considered for peak getting. - mode : {'pos', 'neg', 'abs'} - How to deal with the sign of the data. If 'pos' only positive - values will be considered. If 'neg' only negative values will - be considered. If 'abs' absolute values will be considered. - Defaults to 'abs'. - strict : bool - If True, raise an error if values are all positive when detecting - a minimum (mode='neg'), or all negative when detecting a maximum - (mode='pos'). Defaults to True. + evoked : instance of Evoked + The evoked response object. + %(erp_evoked_tmin_tmax)s + %(picks_all)s + mode : str + Specifies how the area should be computed. Can be one of: + - 'abs': The absolute value of the data is used. + - 'neg': Only negative values are considered. + - 'pos': Only positive values are considered. + - 'intg': The integral of the data is computed without rectification. + Defaults to abs'. + average : bool + If True, the area is computed by averaging the data across channels + before integration. Defaults to False. Returns ------- - max_loc : int - The index of the feature with the maximum value. - max_time : int - The time point of the maximum response, index. - max_amp : float - Amplitude of the maximum response. - """ - _check_option("mode", mode, ["abs", "neg", "pos"]) + area_df : pd.DataFrame + A DataFrame with columns 'channels' and 'area' containing the area + under the curve for each channel. (Will only contain one row with + 'area' if average=True) - if tmin is None: - tmin = times[0] - if tmax is None: - tmax = times[-1] - - if tmin < times.min() or tmax > times.max(): - if tmin < times.min(): - param_name = "tmin" - param_val = tmin - else: - param_name = "tmax" - param_val = tmax - - raise ValueError( - f"{param_name} ({param_val}) is out of bounds. It must be " - f"between {times.min()} and {times.max()}" - ) - elif tmin > tmax: - raise ValueError(f"tmin ({tmin}) must be <= tmax ({tmax})") - - time_win = (times >= tmin) & (times <= tmax) - mask = np.ones_like(data).astype(bool) - mask[:, time_win] = False - - maxfun = np.argmax - if mode == "pos": - if strict and not np.any(data[~mask] > 0): - raise ValueError( - "No positive values encountered. Cannot " "operate in pos mode." - ) + """ + _check_option("mode", mode, ["abs", "neg", "pos", "intg"]) + data = evoked.get_data(picks=picks) + times = evoked.times + mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"]) + data_masked = data[:, mask] + + if average: + data_masked = np.mean(data_masked, axis=0) + if mode == "abs": + data_masked = np.abs(data_masked) elif mode == "neg": - if strict and not np.any(data[~mask] < 0): - raise ValueError( - "No negative values encountered. Cannot " "operate in neg mode." - ) - maxfun = np.argmin + data_masked = np.clip(data_masked, None, 0) + elif mode == "pos": + data_masked = np.clip(data_masked, 0, None) - masked_index = np.ma.array(np.abs(data) if mode == "abs" else data, mask=mask) + area = integrate.trapz(data_masked, times[mask], axis=1) - max_loc, max_time = np.unravel_index(maxfun(masked_index), data.shape) + if average: + area = area[0] + area_df = pd.DataFrame({"channels": evoked.ch_names, "area": area}) - return max_loc, max_time, data[max_loc, max_time] + return area_df -def get_mean_amplitude( - insta, - ch_type=None, +def get_frac_peak_latency( + evoked, + frac=0.5, tmin=None, tmax=None, + picks="all", mode="abs", - time_as_index=False, - merge_grads=False, - return_amplitude=False, + average=False, + strict=False, ): - """Get the mean amplitude in a specific time window. + """Get the latency at which the peak amplitude reaches a certain fraction of its + maximum value. Parameters ---------- - ch_type : str | None - The channel type to use. If None, the first available channel type from - the following list is used: 'mag', 'grad', 'planar1', 'planar2', 'eeg'. - tmin : float | None - The beginning of the time window in seconds. If None the beginning of - the data is used. - tmax : float | None - The end of the time window in seconds. If None the end of the data is - used. + evoked : instance of Evoked + The evoked response object. + frac : float + The fraction of the peak amplitude at which to compute the latency. + Defaults to 0.5. + %(erp_evoked_tmin_tmax)s + %(picks_all)s mode : str - How to combine multiple channels. The following options are available: - 'abs' : Take the absolute value of each channel and then average. - 'mean' : Average across channels. - 'max' : Take the maximum across channels. - 'mean_signed' : Average across channels, retaining sign. - 'median' : Take the median across channels. - 'percentile' : Take the specified percentile across channels. If - percentile is 50, this is the same as 'median'. - time_as_index : bool - Whether to consider time as index or as float (default False). - merge_grads : bool - If True, merge gradiometer data into one value by taking the RMS - (root mean square) for each pair of gradiometers. The RMS is taken - over the pair of gradiometers before averaging across channels. - This is only used for MEG data. - return_amplitude : bool - If True, return the amplitude values. If False, return the evoked - instance (default False). + Specifies how the peak amplitude should be determined. Can be one of: + - 'abs' : The peak amplitude is the maximum absolute value. + - 'neg': The peak amplitude is the maximum negative value. If there are no + negative values and `strict` is True, a ValueError is raised. + - 'pos': The peak amplitude is the maximum positive value. If there are no + positive values and `strict` is True, a ValueError is raised. + Defaults to abs'. + average : bool + If True, the fractional peak latency is computed by averaging the data + across channels before finding the latency. Defaults to False. strict : bool - If True, raise an error if channels are missing. If False, ignore - channels that are missing. + If True, raise an error if values are all positive when detecting + a minimum (mode='neg'), or all negative when detecting a maximum + (mode='pos'). Defaults to False. Returns ------- - evoked : instance of Evoked - The modified evoked instance. If return_amplitude is True, the - amplitude values are returned instead. + frac_peak_df : pd.DataFrame + A DataFrame with columns 'channels', 'fractional_peak_onset', + 'fractional_peak_offset', and 'amplitude' containing the latency at which... - See Also - -------- - mne.Evoked.get_peak : Get the time and value of the peak amplitude.""" - # check that the data is preloaded + """ + _check_option("mode", mode, ["abs", "neg", "pos"]) + + peak_latencies, peak_amplitudes, data_masked, mask, times = _get_peak( + evoked, tmin, tmax, picks, mode, average, strict + ) + frac_amplitudes = frac * peak_amplitudes[:, np.newaxis] + + # Find the first time point before the peak where the signal reaches the fractional threshold + frac_peak_onset = np.argmax(data_masked >= frac_amplitudes, axis=1) + frac_peak_onset_latency = times[mask][frac_peak_onset] + + # Find the first time point after the peak where the signal reaches the fractional threshold + peak_idx = np.argmax(data_masked, axis=1) + frac_peak_offset = np.array( + [ + peak_idx[i] + np.argmin(data_masked[i, peak_idx[i] :] <= frac_amplitudes[i]) + for i in range(data_masked.shape[0]) + ] + ) + frac_peak_offset_latency = times[mask][frac_peak_offset] + + frac_peak_df = pd.DataFrame( + { + "channels": evoked.ch_names, + "fractional_peak_onset": frac_peak_onset_latency, + "fractional_peak_offset": frac_peak_offset_latency, + "amplitude": peak_amplitudes, + } + ) - pass + if average: + frac_peak_df = frac_peak_df.iloc[0] + return frac_peak_df -def get_area(insta, ch_type=None, tmin=None, tmax=None, mode="abs", merge_grads=False): - """Get the area under the curve in a specific time window. + +def get_frac_area_latency( + evoked, + frac=0.5, + tmin=None, + tmax=None, + picks="all", + mode="abs", + average=False, + strict=False, +): + """Get the latency at which the area under the curve reaches a certain fraction of its + maximum value. Parameters ---------- - ch_type : str | None - The channel type to use. If None, the first available channel type from - the following list is used: 'mag', 'grad', 'planar1', 'planar2', 'eeg'. - tmin : float | None - The beginning of the time window in seconds. If None the beginning of - the data is used. - tmax : float | None - The end of the time window in seconds. If None the end of the data is - used. + evoked : instance of Evoked + The evoked response object. + frac : float + The fraction of the area at which to compute the latency. Defaults to 0.5. + %(erp_evoked_tmin_tmax)s + %(picks_all)s mode : str - How to combine multiple channels. The following options are available: - 'abs' : Take the absolute value of each channel and then average. - 'mean' : Average across channels. - 'max' : Take the maximum across channels. - 'mean_signed' : Average across channels, retaining sign. - 'median' : Take the median across channels. - 'percentile' : Take the specified percentile across channels. If - percentile is 50, this is the same as 'median'. - merge_grads : bool - If True, merge gradiometer data into one value by taking the RMS - (root mean square) for each pair of gradiometers. The RMS is taken - over the pair of gradiometers before averaging across channels. - This is only used for MEG data. - return_area : bool - If True, return the area values. If False, return the evoked instance - (default False). + Specifies how the area should be computed. Can be one of: + - 'abs': The absolute value of the data is used. + - 'neg': Only negative values are considered. + - 'pos': Only positive values are considered. + - 'intg': The integral of the data is computed without rectification. + Defaults to abs'. + average : bool + If True, the fractional area latency is computed by averaging the data + across channels before finding the latency. Defaults to False. + %(erp_strict)s + Returns ------- - evoked : instance of Evoked - The modified evoked instance. If return_area is True, the area values - are returned instead. - - See Also - -------- - mne.Evoked.get_peak : Get the time and value of the peak amplitude.""" - # check that the data is preloaded - _check_option("mode", mode, ["intg", "abs", "neg", "pos"]) - data, ch_names = _erp_measure_setup(insta, ch_type, merge_grads) - time_mask = _restrict_time_interval(tmin, tmax, insta.times, data) - + frac_area_df : pd.DataFrame + A DataFrame with columns 'channels', 'fractional_area_latency', + and 'area' containing the latency at which... + """ + _check_option("mode", mode, ["abs", "neg", "pos", "intg"]) + data = evoked.get_data(picks=picks) + times = evoked.times + mask = _time_mask(times, tmin, tmax, evoked.info["sfreq"]) + data_masked = data[:, mask] + times = times[mask] + if average: + data_masked = np.mean(data_masked, axis=0, keepdims=True) if mode == "abs": - data = np.abs(data) + data_masked = np.abs(data_masked) elif mode == "neg": - # Set positive values to zero - data[data > 0] = 0 + data_masked = np.clip(data_masked, None, 0) elif mode == "pos": - # Set negative values to zero - data[data < 0] = 0 + data_masked = np.clip(data_masked, 0, None) + area = np.trapz(data_masked, times, axis=1) + frac_area = frac * area + frac_area_latency = np.full(len(evoked.ch_names), np.nan) + for ch in range(data_masked.shape[0]): + idx = np.where(np.cumsum(data_masked[ch]) >= frac_area[ch])[0] + if len(idx) > 0: + frac_area_latency[ch] = times[idx[0]] + frac_area_df = pd.DataFrame( + { + "channels": evoked.ch_names, + "fractional_area_latency": frac_area_latency, + "area": area, + } + ) + if average: + frac_area_df = frac_area_df.iloc[0] + return frac_area_df diff --git a/mne/utils/docs.py b/mne/utils/docs.py index c82f9d74344..c102d8225f7 100644 --- a/mne/utils/docs.py +++ b/mne/utils/docs.py @@ -1285,6 +1285,19 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75): time are included. Defaults to ``-0.2`` and ``0.5``, respectively. """ +docdict["erp_evoked_tmin_tmax"] = """ +tmin, tmax : float + Start and end time of the ERP computation window in seconds. Defaults to + ``None`` and ``None``, which corresponds to the entire Evoked object. +""" +docdict["erp_strict"] = """ +strict : bool + If True, raise an error if values are all positive when detecting + a minimum (mode='neg'), or all negative when detecting a maximum + (mode='pos'). Defaults to True. +""" + + docdict["estimate_plot_psd"] = """\ estimate : str, {'auto', 'power', 'amplitude'} Can be "power" for power spectral density (PSD), "amplitude" for