From 9be5adb91270a207817dd2988a89743bff6cd30d Mon Sep 17 00:00:00 2001 From: ymatzkevich Date: Tue, 12 Nov 2024 13:18:13 +0100 Subject: [PATCH 1/5] Add a function to find the intersection of multiple time series --- darts/tests/test_timeseries.py | 29 +++++++++++++++++++++++++++++ darts/timeseries.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/darts/tests/test_timeseries.py b/darts/tests/test_timeseries.py index bd5e1b1562..6c8452436f 100644 --- a/darts/tests/test_timeseries.py +++ b/darts/tests/test_timeseries.py @@ -10,6 +10,7 @@ from scipy.stats import kurtosis, skew from darts import TimeSeries, concatenate +from darts.timeseries import intersect from darts.utils.timeseries_generation import constant_timeseries, linear_timeseries from darts.utils.utils import freqs, generate_index @@ -603,12 +604,32 @@ def check_intersect(other, start_, end_, freq_): s_int_idx = series.slice_intersect_times(other, copy=False) assert s_int.time_index.equals(s_int_idx) + def check_intersect_sequence(series, other, start_, end_, freq_): + intersected_series = intersect([series, other]) + s_int = intersected_series[0] + o_int = intersected_series[1] + + assert intersected_series == [ + series.slice_intersect(other), + other.slice_intersect(series), + ] + + if start_ is None: # empty slice + assert len(s_int) == 0 + assert len(o_int) == 0 + return + + assert s_int.start_time() == o_int.start_time() == start_ + assert s_int.end_time() == o_int.end_time() == end_ + assert s_int.freq == o_int.freq == freq_ + # slice with exact range startA = start endA = end idxA = generate_index(startA, endA, freq=freq_other) seriesA = TimeSeries.from_series(pd.Series(range(len(idxA)), index=idxA)) check_intersect(seriesA, startA, endA, freq_expected) + check_intersect_sequence(series, seriesA, start, end, freq_expected) # entire slice within the range startA = start + freq @@ -616,6 +637,7 @@ def check_intersect(other, start_, end_, freq_): idxA = generate_index(startA, endA, freq=freq_other) seriesA = TimeSeries.from_series(pd.Series(range(len(idxA)), index=idxA)) check_intersect(seriesA, startA, endA, freq_expected) + check_intersect_sequence(series, seriesA, startA, endA, freq_expected) # start outside of range startC = start - 4 * freq @@ -623,6 +645,7 @@ def check_intersect(other, start_, end_, freq_): idxC = generate_index(startC, endC, freq=freq_other) seriesC = TimeSeries.from_series(pd.Series(range(len(idxC)), index=idxC)) check_intersect(seriesC, start, endC, freq_expected) + check_intersect_sequence(series, seriesC, start, endC, freq_expected) # end outside of range startC = start + 4 * freq @@ -630,6 +653,7 @@ def check_intersect(other, start_, end_, freq_): idxC = generate_index(startC, endC, freq=freq_other) seriesC = TimeSeries.from_series(pd.Series(range(len(idxC)), index=idxC)) check_intersect(seriesC, startC, end, freq_expected) + check_intersect_sequence(series, seriesC, startC, end, freq_expected) # small intersect startE = start + (n_steps - 1) * freq @@ -637,6 +661,7 @@ def check_intersect(other, start_, end_, freq_): idxE = generate_index(startE, endE, freq=freq_other) seriesE = TimeSeries.from_series(pd.Series(range(len(idxE)), index=idxE)) check_intersect(seriesE, startE, end, freq_expected) + check_intersect_sequence(series, seriesE, startE, end, freq_expected) # No intersect startG = end + 3 * freq @@ -645,6 +670,10 @@ def check_intersect(other, start_, end_, freq_): seriesG = TimeSeries.from_series(pd.Series(range(len(idxG)), index=idxG)) # for empty slices, we expect the original freq check_intersect(seriesG, None, None, freq) + check_intersect_sequence(series, seriesG, None, None, freq) + + # Empty sequence + assert intersect([]) == [] @staticmethod def helper_test_shift(test_case, test_series: TimeSeries): diff --git a/darts/timeseries.py b/darts/timeseries.py index 5f7878eb56..5b25d2ad60 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -5629,6 +5629,37 @@ def concatenate( return TimeSeries.from_xarray(da_concat, fill_missing_dates=False) +def intersect(series: Sequence[TimeSeries]): + """Returns the intersection with respect to the time index of multiple ``TimeSeries``. + + Parameters + ---------- + series : Sequence[TimeSeries] + sequence of ``TimeSeries`` to intersect + + Returns + ------- + Sequence[TimeSeries] + Intersected series + """ + + data_arrays = [] + has_datetime_index = series[0].has_datetime_index + for ts in series: + if ts.has_datetime_index != has_datetime_index: + raise_log( + IndexError( + "The time index type must be the same for all TimeSeries in the Sequence." + ), + logger, + ) + data_arrays.append(ts.data_array(copy=False)) + + intersected_series = xr.align(*data_arrays, exclude=["component", "sample"]) + + return [TimeSeries.from_xarray(array) for array in intersected_series] + + def _finite_rows_boundaries( values: np.ndarray, how: str = "all" ) -> Tuple[Optional[int], Optional[int]]: From 5153fd9899d1f077a836b0ac8f27fe520ec1904b Mon Sep 17 00:00:00 2001 From: ymatzkevich Date: Tue, 12 Nov 2024 16:50:52 +0100 Subject: [PATCH 2/5] Add empty sequence treatment. --- darts/timeseries.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/darts/timeseries.py b/darts/timeseries.py index d9b670e011..6d84433401 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -5672,6 +5672,8 @@ def intersect(series: Sequence[TimeSeries]): Sequence[TimeSeries] Intersected series """ + if not series: + return [] data_arrays = [] has_datetime_index = series[0].has_datetime_index From 3e5d5a0fdfa3bf42b82845f9845cd8895c832234 Mon Sep 17 00:00:00 2001 From: ymatzkevich Date: Tue, 3 Dec 2024 14:08:30 +0100 Subject: [PATCH 3/5] Used logic of slice_intersect for faster code. --- darts/tests/test_timeseries.py | 83 ++++++++++++++++++---------------- darts/timeseries.py | 28 +++++------- 2 files changed, 55 insertions(+), 56 deletions(-) diff --git a/darts/tests/test_timeseries.py b/darts/tests/test_timeseries.py index daefbfd317..5626c2d159 100644 --- a/darts/tests/test_timeseries.py +++ b/darts/tests/test_timeseries.py @@ -10,7 +10,7 @@ from scipy.stats import kurtosis, skew from darts import TimeSeries, concatenate -from darts.timeseries import intersect +from darts.timeseries import slice_intersect from darts.utils.timeseries_generation import constant_timeseries, linear_timeseries from darts.utils.utils import expand_arr, freqs, generate_index @@ -604,40 +604,24 @@ def check_intersect(other, start_, end_, freq_): s_int_idx = series.slice_intersect_times(other, copy=False) assert s_int.time_index.equals(s_int_idx) - def check_intersect_sequence(series, other, start_, end_, freq_): - intersected_series = intersect([series, other]) - s_int = intersected_series[0] - o_int = intersected_series[1] - - assert intersected_series == [ + assert slice_intersect([series, other]) == [ series.slice_intersect(other), other.slice_intersect(series), ] - if start_ is None: # empty slice - assert len(s_int) == 0 - assert len(o_int) == 0 - return - - assert s_int.start_time() == o_int.start_time() == start_ - assert s_int.end_time() == o_int.end_time() == end_ - assert s_int.freq == o_int.freq == freq_ - # slice with exact range startA = start endA = end idxA = generate_index(startA, endA, freq=freq_other) seriesA = TimeSeries.from_series(pd.Series(range(len(idxA)), index=idxA)) check_intersect(seriesA, startA, endA, freq_expected) - check_intersect_sequence(series, seriesA, start, end, freq_expected) # entire slice within the range - startA = start + freq - endA = startA + 6 * freq_other - idxA = generate_index(startA, endA, freq=freq_other) - seriesA = TimeSeries.from_series(pd.Series(range(len(idxA)), index=idxA)) - check_intersect(seriesA, startA, endA, freq_expected) - check_intersect_sequence(series, seriesA, startA, endA, freq_expected) + startB = start + freq + endB = startB + 6 * freq_other + idxB = generate_index(startB, endB, freq=freq_other) + seriesB = TimeSeries.from_series(pd.Series(range(len(idxB)), index=idxB)) + check_intersect(seriesB, startB, endB, freq_expected) # start outside of range startC = start - 4 * freq @@ -645,15 +629,13 @@ def check_intersect_sequence(series, other, start_, end_, freq_): idxC = generate_index(startC, endC, freq=freq_other) seriesC = TimeSeries.from_series(pd.Series(range(len(idxC)), index=idxC)) check_intersect(seriesC, start, endC, freq_expected) - check_intersect_sequence(series, seriesC, start, endC, freq_expected) # end outside of range - startC = start + 4 * freq - endC = end + 4 * freq_other - idxC = generate_index(startC, endC, freq=freq_other) - seriesC = TimeSeries.from_series(pd.Series(range(len(idxC)), index=idxC)) - check_intersect(seriesC, startC, end, freq_expected) - check_intersect_sequence(series, seriesC, startC, end, freq_expected) + startD = start + 4 * freq + endD = end + 4 * freq_other + idxD = generate_index(startD, endD, freq=freq_other) + seriesD = TimeSeries.from_series(pd.Series(range(len(idxD)), index=idxD)) + check_intersect(seriesD, startD, end, freq_expected) # small intersect startE = start + (n_steps - 1) * freq @@ -661,19 +643,42 @@ def check_intersect_sequence(series, other, start_, end_, freq_): idxE = generate_index(startE, endE, freq=freq_other) seriesE = TimeSeries.from_series(pd.Series(range(len(idxE)), index=idxE)) check_intersect(seriesE, startE, end, freq_expected) - check_intersect_sequence(series, seriesE, startE, end, freq_expected) # No intersect - startG = end + 3 * freq - endG = startG + 6 * freq_other - idxG = generate_index(startG, endG, freq=freq_other) - seriesG = TimeSeries.from_series(pd.Series(range(len(idxG)), index=idxG)) + startF = end + 3 * freq + endF = startF + 6 * freq_other + idxF = generate_index(startF, endF, freq=freq_other) + seriesF = TimeSeries.from_series(pd.Series(range(len(idxF)), index=idxF)) # for empty slices, we expect the original freq - check_intersect(seriesG, None, None, freq) - check_intersect_sequence(series, seriesG, None, None, freq) + check_intersect(seriesF, None, None, freq) - # Empty sequence - assert intersect([]) == [] + # sequence with zero or one element + assert slice_intersect([]) == [] + assert slice_intersect([series]) == [series] + + # sequence with more than 2 elements + intersected_series = slice_intersect([series, seriesA, seriesE]) + s1_int = intersected_series[0] + s2_int = intersected_series[1] + s3_int = intersected_series[2] + + assert ( + s1_int.start_time() == s2_int.start_time() == s3_int.start_time() == startE + ) + assert s1_int.end_time() == s2_int.end_time() == s3_int.end_time() == endA + + # check treatment different time index types + if series.has_datetime_index: + seriesF = TimeSeries.from_series( + pd.Series(range(len(idxF)), index=pd.to_numeric(idxF)) + ) + else: + seriesF = TimeSeries.from_series( + pd.Series(range(len(idxF)), index=pd.to_datetime(idxF)) + ) + + with pytest.raises(IndexError): + slice_intersect([series, seriesF]) @staticmethod def helper_test_shift(test_case, test_series: TimeSeries): diff --git a/darts/timeseries.py b/darts/timeseries.py index 6d84433401..c5b209240b 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -5659,8 +5659,8 @@ def concatenate( return TimeSeries.from_xarray(da_concat, fill_missing_dates=False) -def intersect(series: Sequence[TimeSeries]): - """Returns the intersection with respect to the time index of multiple ``TimeSeries``. +def slice_intersect(series: Sequence[TimeSeries]) -> Sequence[TimeSeries]: + """Returns a list of ``TimeSeries``, where all `series` have been intersected along the time index. Parameters ---------- @@ -5670,26 +5670,20 @@ def intersect(series: Sequence[TimeSeries]): Returns ------- Sequence[TimeSeries] - Intersected series + Intersected series. """ if not series: return [] - data_arrays = [] - has_datetime_index = series[0].has_datetime_index - for ts in series: - if ts.has_datetime_index != has_datetime_index: - raise_log( - IndexError( - "The time index type must be the same for all TimeSeries in the Sequence." - ), - logger, - ) - data_arrays.append(ts.data_array(copy=False)) - - intersected_series = xr.align(*data_arrays, exclude=["component", "sample"]) + intersected_series = [] + for i, ts_i in enumerate(series): + intersected_ts = ts_i + for j, ts_j in enumerate(series): + if i != j: + intersected_ts = intersected_ts.slice_intersect(ts_j) + intersected_series.append(intersected_ts) - return [TimeSeries.from_xarray(array) for array in intersected_series] + return intersected_series def _finite_rows_boundaries( From e90107fc10122770cb8a97a798d525c78409393c Mon Sep 17 00:00:00 2001 From: ymatzkevich Date: Thu, 12 Dec 2024 10:25:38 +0100 Subject: [PATCH 4/5] Improved complexity of slice_intersect. --- darts/timeseries.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/darts/timeseries.py b/darts/timeseries.py index c5b209240b..3f5941cd8e 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -5675,15 +5675,17 @@ def slice_intersect(series: Sequence[TimeSeries]) -> Sequence[TimeSeries]: if not series: return [] - intersected_series = [] - for i, ts_i in enumerate(series): - intersected_ts = ts_i - for j, ts_j in enumerate(series): - if i != j: - intersected_ts = intersected_ts.slice_intersect(ts_j) - intersected_series.append(intersected_ts) - - return intersected_series + int_series = [] + int_ts = series[0] + for ts in series[1:]: + int_ts = int_ts.slice_intersect(ts) + int_series.append(int_ts) + + for ts in series[1:]: + ts = ts.slice_intersect(int_series[-1]) + int_series.append(ts) + + return int_series def _finite_rows_boundaries( From 05a1667a79ddb438aee18b1f778dee9c3f1efd48 Mon Sep 17 00:00:00 2001 From: ymatzkevich Date: Fri, 13 Dec 2024 10:26:24 +0100 Subject: [PATCH 5/5] Intersect times indexes to avoid creating new TimeSeries for optimization purposes. --- darts/timeseries.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/darts/timeseries.py b/darts/timeseries.py index 3f5941cd8e..8b45c77e9f 100644 --- a/darts/timeseries.py +++ b/darts/timeseries.py @@ -2494,14 +2494,7 @@ def slice_intersect(self, other: Self) -> Self: TimeSeries a new series, containing the values of this series, over the time-span common to both time series. """ - if other.has_same_time_as(self): - return self.__class__(self._xa) - if other.freq == self.freq: - start, end = self._slice_intersect_bounds(other) - return self[start:end] - else: - time_index = self.time_index.intersection(other.time_index) - return self[time_index] + return slice_intersect([self, other])[0] def slice_intersect_values(self, other: Self, copy: bool = False) -> np.ndarray: """ @@ -5675,17 +5668,18 @@ def slice_intersect(series: Sequence[TimeSeries]) -> Sequence[TimeSeries]: if not series: return [] - int_series = [] - int_ts = series[0] + int_time_index = series[0].time_index for ts in series[1:]: - int_ts = int_ts.slice_intersect(ts) - int_series.append(int_ts) + int_time_index = int_time_index.intersection(ts.time_index) + ts_other = series[0] for ts in series[1:]: - ts = ts.slice_intersect(int_series[-1]) - int_series.append(ts) + int_time_index = int_time_index.intersection( + ts.time_index.intersection(ts_other.time_index) + ) + ts_other = ts - return int_series + return [ts[int_time_index] for ts in series] def _finite_rows_boundaries(