Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transformer and detector on rolling cross correlation #107

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/adtk/detector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
VolatilityShiftAD,
)
from ._detector_hd import (
CrossCorrelationAD,
CustomizedDetectorHD,
MinClusterDetector,
OutlierDetector,
Expand Down Expand Up @@ -67,6 +68,7 @@ def print_all_models() -> None:
"OutlierDetector",
"RegressionAD",
"PcaAD",
"CrossCorrelationAD",
"CustomizedDetectorHD",
"print_all_models",
]
100 changes: 98 additions & 2 deletions src/adtk/detector/_detector_hd.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
"""

from collections import Counter
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import pandas as pd

from .._detector_base import _TrainableMultivariateDetector
from ..aggregator import AndAggregator
from ..aggregator import AndAggregator, OrAggregator
from ..detector import InterQuartileRangeAD, ThresholdAD
from ..pipe import Pipeline, Pipenet
from ..transformer import (
CustomizedTransformer1D,
PcaReconstructionError,
RegressionResidual,
RollingCrossCorrelation,
)


Expand Down Expand Up @@ -381,3 +382,98 @@ def _fit_core(self, s: pd.DataFrame) -> None:
def _predict_core(self, s: pd.DataFrame) -> pd.Series:
self._sync_params()
return self.pipe_.detect(s)


class CrossCorrelationAD(_TrainableMultivariateDetector):
"""Detector that detects anomalous inter-series cross correlation.

This detector tracks rolling cross correlations between series and
identifies periods when the cross correlations is anomalous.

This detector is internally implemented as a `Pipeline` object. Advanced
users may learn more details by checking attribute `pipe_`.

Parameters
----------
window: int or str
Size of the rolling time window.

- If int, it is the number of time point in this time window.
- If str, it must be able to be converted into a pandas Timedelta
object.

pairs : tuple or list, optional
Pairs of series to calculate cross correlation.

- If 2-tuple, return the cross correlation of these two series.
- If list of tuples, return the cross correlation of every pair in the
list.
- If None, return the cross correlation of all possible pairs.

Default: None.

c: float, optional
Factor used to determine the bound of normal range based on historical
interquartile range. Default: 5.0.

center: bool, optional
Whether the calculation is at the center of time window or on the right
edge. Default: False.

min_periods: int, optional
Minimum number of observations in window required to have a value.
Default: None, i.e. all observations must have values.

"""

def __init__(
self,
window: Union[int, str],
pairs: Optional[Union[Tuple[str, str], List[Tuple[str, str]]]] = None,
c: float = 3.0,
center: bool = False,
min_periods: Optional[int] = None,
):
self.pipe_ = Pipeline(
[
(
"rolling_cross_corr",
RollingCrossCorrelation(
window=window,
pairs=pairs,
center=center,
min_periods=min_periods,
),
),
("ad", InterQuartileRangeAD(c=c)),
("agg", OrAggregator()),
]
)
super().__init__()
self.window = window
self.pairs = pairs
self.center = center
self.min_periods = min_periods
self.c = c
self._sync_params()

@property
def _param_names(self) -> Tuple[str, ...]:
return ("window", "pairs", "c", "center", "min_periods")

def _sync_params(self) -> None:
self.pipe_.steps[0][1].set_params(
window=self.window,
pairs=self.pairs,
center=self.center,
min_periods=self.min_periods,
)
self.pipe_.steps[1][1].set_params(c=self.c)

def _fit_core(self, s: pd.DataFrame) -> None:
self._sync_params()
self.pipe_.fit(s)

def _predict_core(self, s: pd.DataFrame) -> pd.Series:
self._sync_params()
return self.pipe_.detect(s)
2 changes: 2 additions & 0 deletions src/adtk/transformer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
PcaReconstruction,
PcaReconstructionError,
RegressionResidual,
RollingCrossCorrelation,
SumAll,
)

Expand Down Expand Up @@ -64,6 +65,7 @@ def print_all_models() -> None:
"PcaReconstruction",
"PcaReconstructionError",
"SumAll",
"RollingCrossCorrelation",
"CustomizedTransformerHD",
"print_all_models",
]
108 changes: 107 additions & 1 deletion src/adtk/transformer/_transformer_hd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

"""

from typing import Any, Callable, Dict, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import pandas as pd
from sklearn.decomposition import PCA
Expand Down Expand Up @@ -308,3 +308,109 @@ def _predict_core(self, df: pd.DataFrame) -> pd.Series:
self._model.transform(df.dropna().values)
)
return ((results - df) ** 2).sum(axis=1, skipna=False)


class RollingCrossCorrelation(_NonTrainableMultivariateTransformer):
"""Transformer that rolls a sliding window along a multivariate time series
and returns moving cross correlation.

Parameters
----------
window: int or str
Size of the rolling time window.

- If int, it is the number of time point in this time window.
- If str, it must be able to be converted into a pandas Timedelta
object.

pairs : tuple or list, optional
Pairs of series to calculate cross correlation.

- If 2-tuple, return the cross correlation of these two series.
- If list of tuples, return the cross correlation of every pair in the
list.
- If None, return the cross correlation of all possible pairs.

Default: None.

center: bool, optional
Whether the calculation is at the center of time window or on the right
edge. Default: False.

min_periods: int, optional
Minimum number of observations in window required to have a value.
Default: None, i.e. all observations must have values.

"""

def __init__(
self,
window: Union[int, str],
pairs: Optional[Union[Tuple[str, str], List[Tuple[str, str]]]] = None,
center: bool = False,
min_periods: Optional[int] = None,
):
super().__init__()
self.window = window
self.pairs = pairs
self.center = center
self.min_periods = min_periods

@property
def _param_names(self) -> Tuple[str, ...]:
return ("window", "pairs", "center", "min_periods")

def _predict_core(
self, df: pd.DataFrame
) -> Union[pd.Series, pd.DataFrame]:
if len(df.columns) <= 1:
raise ValueError(
"The input data frame must contain at least two series."
)

if self.pairs is None:
pairs = [
(df.columns[i], df.columns[j])
for i in range(len(df.columns))
for j in range(i + 1, len(df.columns))
]
elif isinstance(self.pairs, tuple):
pairs = [self.pairs]
else:
pairs = self.pairs

columns = list(
set(sum([[i, j] for (i, j) in pairs], []))
) # all columns in the list of pairs

if not set(columns) <= set(df.columns):
raise ValueError(
"Parameter `pairs` contains a column that is not included in "
"the data frame."
)

rolling_corr = (
df.loc[:, columns]
.rolling(
window=self.window,
center=self.center,
min_periods=self.min_periods,
)
.corr()
)

rolling_corr = pd.DataFrame(
{
"{}:{}".format(col_0, col_1): rolling_corr.loc[:, col_0].loc[
:, col_1
]
for col_0, col_1 in pairs
}
)

if (len(pairs) == 1) and (
(self.pairs is None) or isinstance(self.pairs, tuple)
):
rolling_corr = rolling_corr.iloc[:, 0].rename(None)

return rolling_corr
28 changes: 28 additions & 0 deletions tests/test_detectorhd.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@
],
"a": [0, 0, 0, 0, 1, 1, 0, 0, 0, nan, 0, 0],
},
{
"model": detector.CrossCorrelationAD,
"params": {"window": 2},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"a": [nan, 0, 0, 0, 1, 0, 0, 0, nan, nan, 0],
},
{
"model": detector.CrossCorrelationAD,
"params": {"window": 3, "center": True, "min_periods": 1},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"a": [0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0],
},
{
"model": detector.CrossCorrelationAD,
"params": {"window": 2},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"a": [nan, 0, 0, 0, 1, 0, 0, 0, nan, nan, 0],
},
]


Expand Down
61 changes: 61 additions & 0 deletions tests/test_transformerhd.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,67 @@
],
"t": [0, 0, 0, 0, 0.02, 0.02, 0, 0, 0, nan, 0, 0],
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 2},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1],
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 3, "center": True},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": [nan, 1, 1, 0, 0, 1, 1, nan, nan, nan, nan],
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 3, "min_periods": 1},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": [nan, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1],
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 2},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": {
"0:1": [nan, 1, 1, 1, 1, 1, 1, 1, nan, nan, 1],
"0:2": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1],
"1:2": [nan, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1],
},
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 2, "pairs": (2, 0)},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1],
},
{
"model": transformer.RollingCrossCorrelation,
"params": {"window": 2, "pairs": [(2, 0)]},
"df": [
[0, 1, 2, 3, 4, 5, 6, 7, nan, 9, 10],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
[0, 1, 2, 3, 2, 3, 4, 5, 6, 7, 8],
],
"t": {"2:0": [nan, 1, 1, 1, -1, 1, 1, 1, nan, nan, 1]},
},
]


Expand Down