From 8edd3cd13fecdeec9096a712ab830a1076d54a96 Mon Sep 17 00:00:00 2001 From: Joe Ranalli Date: Tue, 14 Nov 2023 12:24:23 -0500 Subject: [PATCH] Fix _fftcorrelate and tests --- src/solartoolbox/signalproc.py | 78 ++++++++++++++++++++++++++++------ tests/test_signalproc.py | 6 ++- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/src/solartoolbox/signalproc.py b/src/solartoolbox/signalproc.py index ada1a96..6e8e3e6 100644 --- a/src/solartoolbox/signalproc.py +++ b/src/solartoolbox/signalproc.py @@ -82,27 +82,77 @@ def correlation(baseline, estimation, scaling='coeff'): def _fftcorrelate(baseline, estimation, scaling='coeff'): + """ + Compute the cross correlation between two signals, including the full range + of possible time lags, using cross correlation. Can handle multiple time + series. + + Parameters + ---------- + baseline: pd.DataFrame + estimation: pd.DataFrame + scaling: str (default 'coeff') + Type of scaling to use for cross correlation. Note that debiased + scaling options are not available here. Valid options are: + 'coeff' - computes the correlation coefficient + 'none' - no scaling is applied + + Returns + ------- + corr : numeric + A vector of the cross correlations for each pairing in the inputs + + lag : numeric + A vector of the lag for each cross correlation + """ + + # Compute the lags + lags = signal.correlation_lags(len(baseline), len(estimation)) + + # Rename for brevity + ts_inm = baseline + ts_outm = estimation + + # Condition pandas input types + if isinstance(ts_inm, pd.DataFrame): + ts_inm = np.array(ts_inm).T + if isinstance(ts_outm, pd.DataFrame): + ts_outm = np.array(ts_outm).T + + # Condition vector inputs + if ts_inm.ndim == 1: + ts_inm = np.expand_dims(ts_inm, axis=0) + if ts_outm.ndim == 1: + ts_outm = np.expand_dims(ts_outm, axis=0) + + # Perform the scaling calculations if scaling == 'coeff': - baseline -= np.mean(baseline) - estimation -= np.mean(estimation) - den = len(baseline) * (np.std(baseline) * np.std(estimation)) + # Subtract means + ts_inm = ts_inm - np.expand_dims(np.mean(ts_inm, axis=1), axis=1) + ts_outm = ts_outm - np.expand_dims(np.mean(ts_outm, axis=1), axis=1) + # Compute the scaling factor (normalizing by stdev) + corr_scale = (np.size(ts_inm, axis=1) + * np.std(ts_inm, axis=1) + * np.std(ts_outm, axis=1)) ** -1 elif scaling == 'none': - den = 1 + corr_scale = 1 else: raise ValueError(f"Illegal scaling specified: {scaling}.") - # zero pad by nA+nB-1 - nl = len(baseline) + len(estimation) - 1 - baseline = np.pad(baseline, (0, nl-len(baseline)), 'constant') - estimation = np.pad(estimation, (0, nl-len(estimation)), 'constant') - ffta = np.fft.fft(baseline) - fftb = np.fft.fft(estimation) - corr = np.real(np.fft.ifft(ffta * np.conj(fftb))) - corr = np.roll(corr, nl//2) + na = np.size(ts_inm, axis=1) + nl = 2 * na - 1 + addon = np.zeros((ts_inm.shape[0], nl - na)) + ts_inm = np.concatenate([ts_inm, addon], axis=1) + ts_outm = np.concatenate([ts_outm, addon], axis=1) + + # Compute correlation via fft and re-center to match lags + ffta = scipy.fft.fft(ts_inm, axis=1) + fftb = scipy.fft.fft(ts_outm, axis=1) + corrxy = np.real(np.fft.ifft(ffta * np.conj(fftb), axis=1)) * corr_scale - corr /= den - return corr + corrxy = np.roll(corrxy, nl // 2, axis=1) + return corrxy, lags def averaged_psd(input_tsig, navgs, overlap=0.5, diff --git a/tests/test_signalproc.py b/tests/test_signalproc.py index 687309d..bca3dfe 100644 --- a/tests/test_signalproc.py +++ b/tests/test_signalproc.py @@ -56,17 +56,19 @@ def test_correlation_illegal(corr_data): @pytest.mark.parametrize("scaling", ['coeff', 'none']) def test_fftcorrelate_identity(corr_data, scaling): dt, t, x1, x2, dly = corr_data - c = _fftcorrelate(x1, x1, scaling) + c, l = _fftcorrelate(x1, x1, scaling) cr, lag = correlation(x1, x1, scaling=scaling) assert np.allclose(c, cr) + assert np.allclose(l, lag) @pytest.mark.parametrize("scaling", ['coeff', 'none']) def test_fftcorrelate_shift(corr_data, scaling): d, t, x1, x2, dly = corr_data - c = _fftcorrelate(x1, x2, scaling) + c, l = _fftcorrelate(x1, x2, scaling) cr, lag = correlation(x1, x2, scaling=scaling) assert np.allclose(c, cr) + assert np.allclose(l, lag) def test_averaged_psd():