From dfcaff883a85ad9e1aa4f381c6f4db20c1342ff1 Mon Sep 17 00:00:00 2001 From: tailaiw <29800495+tailaiw@users.noreply.github.com> Date: Wed, 29 Apr 2020 11:25:49 -0500 Subject: [PATCH 1/5] added rolling cross correlation --- src/adtk/transformer/__init__.py | 2 + src/adtk/transformer/_transformer_hd.py | 108 +++++++++++++++++++++++- 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/src/adtk/transformer/__init__.py b/src/adtk/transformer/__init__.py index 1eac2f1..c9299f3 100644 --- a/src/adtk/transformer/__init__.py +++ b/src/adtk/transformer/__init__.py @@ -24,6 +24,7 @@ PcaReconstruction, PcaReconstructionError, RegressionResidual, + RollingCrossCorrelation, SumAll, ) @@ -64,6 +65,7 @@ def print_all_models() -> None: "PcaReconstruction", "PcaReconstructionError", "SumAll", + "RollingCrossCorrelation", "CustomizedTransformerHD", "print_all_models", ] diff --git a/src/adtk/transformer/_transformer_hd.py b/src/adtk/transformer/_transformer_hd.py index d07851f..4252637 100644 --- a/src/adtk/transformer/_transformer_hd.py +++ b/src/adtk/transformer/_transformer_hd.py @@ -6,7 +6,7 @@ """ -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import pandas as pd from sklearn.decomposition import PCA @@ -308,3 +308,109 @@ def _predict_core(self, df: pd.DataFrame) -> pd.Series: self._model.transform(df.dropna().values) ) return ((results - df) ** 2).sum(axis=1, skipna=False) + + +class RollingCrossCorrelation(_NonTrainableMultivariateTransformer): + """Transformer that rolls a sliding window along a multivariate time series + and returns moving cross correlation. + + Parameters + ---------- + window: int or str + Size of the rolling time window. + + - If int, it is the number of time point in this time window. + - If str, it must be able to be converted into a pandas Timedelta + object. + + pairs : tuple or list, optional + Pairs of series to calculate cross correlation. + + - If 2-tuple, return the cross correlation of these two series. + - If list of tuples, return the cross correlation of every pair in the + list. + - If None, return the cross correlation of all possible pairs. + + Default: None. + + center: bool, optional + Whether the calculation is at the center of time window or on the right + edge. Default: False. + + min_periods: int, optional + Minimum number of observations in window required to have a value. + Default: None, i.e. all observations must have values. + + """ + + def __init__( + self, + window: Union[int, str], + pairs: Optional[Union[Tuple[str, str], List[Tuple[str, str]]]] = None, + center: bool = False, + min_periods: Optional[int] = None, + ): + super().__init__() + self.window = window + self.pairs = pairs + self.center = center + self.min_periods = min_periods + + @property + def _param_names(self) -> Tuple[str, ...]: + return ("window", "pairs", "center", "min_periods") + + def _predict_core( + self, df: pd.DataFrame + ) -> Union[pd.Series, pd.DataFrame]: + if len(df.columns) <= 1: + raise ValueError( + "The input data frame must contain at least two series." + ) + + if self.pairs is None: + pairs = [ + (df.columns[i], df.columns[j]) + for i in range(len(df.columns)) + for j in range(i + 1, len(df.columns)) + ] + elif isinstance(self.pairs, tuple): + pairs = [self.pairs] + else: + pairs = self.pairs + + columns = list( + set(sum([[i, j] for (i, j) in pairs], [])) + ) # all columns in the list of pairs + + if not set(columns) <= set(df.columns): + raise ValueError( + "Parameter `pairs` contains a column that is not included in " + "the data frame." + ) + + rolling_corr = ( + df.loc[:, columns] + .rolling( + window=self.window, + center=self.center, + min_periods=self.min_periods, + ) + .corr() + ) + + rolling_corr = pd.DataFrame( + { + "{}:{}".format(col_0, col_1): rolling_corr.loc[:, col_0].loc[ + :, col_1 + ] + for col_0, col_1 in pairs + } + ) + + if (len(pairs) == 1) and ( + (self.pair is None) or isinstance(self.pairs, tuple) + ): + rolling_corr = rolling_corr.iloc[:, 0].rename(None) + + return rolling_corr From b5909a0c1ed4736c9c648352575045395a9075c8 Mon Sep 17 00:00:00 2001 From: tailaiw <29800495+tailaiw@users.noreply.github.com> Date: Wed, 29 Apr 2020 13:20:13 -0500 Subject: [PATCH 2/5] bug fix --- src/adtk/transformer/_transformer_hd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adtk/transformer/_transformer_hd.py b/src/adtk/transformer/_transformer_hd.py index 4252637..158115a 100644 --- a/src/adtk/transformer/_transformer_hd.py +++ b/src/adtk/transformer/_transformer_hd.py @@ -409,7 +409,7 @@ def _predict_core( ) if (len(pairs) == 1) and ( - (self.pair is None) or isinstance(self.pairs, tuple) + (self.pairs is None) or isinstance(self.pairs, tuple) ): rolling_corr = rolling_corr.iloc[:, 0].rename(None) From 01da5f9607a6056ae6f4a20f2e155bca0b414e73 Mon Sep 17 00:00:00 2001 From: tailaiw <29800495+tailaiw@users.noreply.github.com> Date: Wed, 29 Apr 2020 13:37:50 -0500 Subject: [PATCH 3/5] added unit test --- tests/test_transformerhd.py | 61 +++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/tests/test_transformerhd.py b/tests/test_transformerhd.py index fb7b0b9..b28d36c 100644 --- a/tests/test_transformerhd.py +++ b/tests/test_transformerhd.py @@ -245,6 +245,67 @@ ], "t": [0, 0, 0, 0, 0.02, 0.02, 0, 0, 0, nan, 0, 0], }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 2}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1], + }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 3, "center": True}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": [nan, 1, 1, 0, 0, 1, 1, nan, nan, nan, nan], + }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 3, "min_periods": 1}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": [nan, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1], + }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 2}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": { + "0:1": [nan, 1, 1, 1, 1, 1, 1, 1, nan, nan, 1], + "0:2": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1], + "1:2": [nan, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1], + }, + }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 2, "pairs": (2, 0)}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1], + }, + { + "model": transformer.RollingCrossCorrelation, + "params": {"window": 2, "pairs": [(2, 0)]}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "t": {"2:0": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1]}, + }, ] From f547b854599c6387908d819cf0c722f4c54030a1 Mon Sep 17 00:00:00 2001 From: tailaiw <29800495+tailaiw@users.noreply.github.com> Date: Wed, 29 Apr 2020 14:33:07 -0500 Subject: [PATCH 4/5] added cross correlation ad --- src/adtk/detector/__init__.py | 2 + src/adtk/detector/_detector_hd.py | 100 +++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/src/adtk/detector/__init__.py b/src/adtk/detector/__init__.py index c3bdd7d..dab67c3 100644 --- a/src/adtk/detector/__init__.py +++ b/src/adtk/detector/__init__.py @@ -22,6 +22,7 @@ VolatilityShiftAD, ) from ._detector_hd import ( + CrossCorrelationAD, CustomizedDetectorHD, MinClusterDetector, OutlierDetector, @@ -67,6 +68,7 @@ def print_all_models() -> None: "OutlierDetector", "RegressionAD", "PcaAD", + "CrossCorrelationAD", "CustomizedDetectorHD", "print_all_models", ] diff --git a/src/adtk/detector/_detector_hd.py b/src/adtk/detector/_detector_hd.py index 85ca06a..a5a56ac 100644 --- a/src/adtk/detector/_detector_hd.py +++ b/src/adtk/detector/_detector_hd.py @@ -5,18 +5,19 @@ """ from collections import Counter -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import pandas as pd from .._detector_base import _TrainableMultivariateDetector -from ..aggregator import AndAggregator +from ..aggregator import AndAggregator, OrAggregator from ..detector import InterQuartileRangeAD, ThresholdAD from ..pipe import Pipeline, Pipenet from ..transformer import ( CustomizedTransformer1D, PcaReconstructionError, RegressionResidual, + RollingCrossCorrelation, ) @@ -381,3 +382,98 @@ def _fit_core(self, s: pd.DataFrame) -> None: def _predict_core(self, s: pd.DataFrame) -> pd.Series: self._sync_params() return self.pipe_.detect(s) + + +class CrossCorrelationAD(_TrainableMultivariateDetector): + """Detector that detects anomalous inter-series cross correlation. + + This detector tracks rolling cross correlations between series and + identifies periods when the cross correlations is anomalous. + + This detector is internally implemented as a `Pipeline` object. Advanced + users may learn more details by checking attribute `pipe_`. + + Parameters + ---------- + window: int or str + Size of the rolling time window. + + - If int, it is the number of time point in this time window. + - If str, it must be able to be converted into a pandas Timedelta + object. + + pairs : tuple or list, optional + Pairs of series to calculate cross correlation. + + - If 2-tuple, return the cross correlation of these two series. + - If list of tuples, return the cross correlation of every pair in the + list. + - If None, return the cross correlation of all possible pairs. + + Default: None. + + c: float, optional + Factor used to determine the bound of normal range based on historical + interquartile range. Default: 5.0. + + center: bool, optional + Whether the calculation is at the center of time window or on the right + edge. Default: False. + + min_periods: int, optional + Minimum number of observations in window required to have a value. + Default: None, i.e. all observations must have values. + + """ + + def __init__( + self, + window: Union[int, str], + pairs: Optional[Union[Tuple[str, str], List[Tuple[str, str]]]] = None, + c: float = 3.0, + center: bool = False, + min_periods: Optional[int] = None, + ): + self.pipe_ = Pipeline( + [ + ( + "rolling_cross_corr", + RollingCrossCorrelation( + window=window, + pairs=pairs, + center=center, + min_periods=min_periods, + ), + ), + ("ad", InterQuartileRangeAD(c=c)), + ("agg", OrAggregator()), + ] + ) + super().__init__() + self.window = window + self.pairs = pairs + self.center = center + self.min_periods = min_periods + self.c = c + self._sync_params() + + @property + def _param_names(self) -> Tuple[str, ...]: + return ("window", "pairs", "c", "center", "min_periods") + + def _sync_params(self) -> None: + self.pipe_.steps[0][1].set_params( + window=self.window, + pairs=self.pairs, + center=self.center, + min_periods=self.min_periods, + ) + self.pipe_.steps[1][1].set_params(c=self.c) + + def _fit_core(self, s: pd.DataFrame) -> None: + self._sync_params() + self.pipe_.fit(s) + + def _predict_core(self, s: pd.DataFrame) -> pd.Series: + self._sync_params() + return self.pipe_.detect(s) From a9fd3ee0ec586dbd4ff6ce0bd4916bb2a168865a Mon Sep 17 00:00:00 2001 From: tailaiw <29800495+tailaiw@users.noreply.github.com> Date: Wed, 29 Apr 2020 14:49:46 -0500 Subject: [PATCH 5/5] added unit test --- tests/test_detectorhd.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/test_detectorhd.py b/tests/test_detectorhd.py index e14dfb6..5e98274 100644 --- a/tests/test_detectorhd.py +++ b/tests/test_detectorhd.py @@ -180,6 +180,34 @@ ], "a": [0, 0, 0, 0, 1, 1, 0, 0, 0, nan, 0, 0], }, + { + "model": detector.CrossCorrelationAD, + "params": {"window": 2}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "a": [nan, 0, 0, 0, 1, 0, 0, 0, nan, nan, 0], + }, + { + "model": detector.CrossCorrelationAD, + "params": {"window": 3, "center": True, "min_periods": 1}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "a": [0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0], + }, + { + "model": detector.CrossCorrelationAD, + "params": {"window": 2}, + "df": [ + [0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10], + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8], + ], + "a": [nan, 0, 0, 0, 1, 0, 0, 0, nan, nan, 0], + }, ]