Skip to content

Commit

Permalink
Merge pull request #17 from au-imclab/dev
Browse files Browse the repository at this point in the history
Gap Filling, RQA and bugfixes
  • Loading branch information
zeyus authored Mar 8, 2024
2 parents 033fab6 + 6548bcd commit 79523ed
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 16 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ classifiers = [
]
dependencies = [
"pandas",
"scipy",
"StrEnum; python_version < '3.11'",
]

Expand Down
2 changes: 1 addition & 1 deletion src/mopipe/__about__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2023-present zeyus <[email protected]>
#
# SPDX-License-Identifier: MIT
__version__ = "0.1.1"
__version__ = "0.1.2"
2 changes: 2 additions & 0 deletions src/mopipe/core/analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .pipeline import Pipeline # noqa: F401, TID252
from .rqa import calc_rqa # noqa: F401, TID252
38 changes: 35 additions & 3 deletions src/mopipe/core/analysis/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def segments(self) -> t.MutableSequence[Segment]:

def _check_kwargs(self, **kwargs) -> None:
"""Check the arguments for the pipeline."""
if "input" not in kwargs:
if "x" not in kwargs:
msg = "No input provided to pipeline."
raise ValueError(msg)

Expand All @@ -43,14 +43,13 @@ def add_segment(self, segment: Segment) -> int:

def run(self, **kwargs) -> t.Any:
"""Run the pipeline."""
output = None
self._check_kwargs(**kwargs)
for segment in self._segments:
# most basic version here
# we could also keep track of the output from each step
# if that is useful, for now it's just I -> Segment -> O -> Segment -> O -> ...
kwargs["x"] = segment(**kwargs)
return output
return kwargs["x"]

def __repr__(self) -> str:
return f"Pipeline(segments={self._segments})"
Expand All @@ -75,3 +74,36 @@ def __reversed__(self) -> t.Iterator[Segment]:

def __contains__(self, value: object) -> bool:
return value in self._segments

@t.overload
def __setitem__(self, index: int, value: Segment) -> None: ...

@t.overload
def __setitem__(self, index: slice, value: t.Iterable[Segment]) -> None: ...

def __setitem__(self, index: t.Union[int, slice], value: t.Union[Segment, t.Iterable[Segment]]) -> None:
if isinstance(index, int):
if not isinstance(value, Segment):
msg = "Single value must be a Segment."
raise ValueError(msg)
self._segments[index] = value
else:
if not isinstance(value, t.Iterable):
msg = "Value must be an iterable of Segments."
raise ValueError(msg)
if not all(isinstance(v, Segment) for v in value):
msg = "All values must be Segments."
raise ValueError(msg)
self._segments[index] = list(value)

@t.overload
def __delitem__(self, index: int) -> None: ...

@t.overload
def __delitem__(self, index: slice) -> None: ...

def __delitem__(self, index: t.Union[int, slice]) -> None:
del self._segments[index]

def insert(self, index: int, value: Segment) -> None:
self._segments.insert(index, value)
65 changes: 65 additions & 0 deletions src/mopipe/core/analysis/rqa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import numpy as np
import scipy # type: ignore
from pandas.api.extensions import ExtensionArray


def calc_rqa(
x: ExtensionArray | np.ndarray,
y: ExtensionArray | np.ndarray,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
) -> list[float]:
embed_data_x: list[np.ndarray] | np.ndarray = []
embed_data_y: list[np.ndarray] | np.ndarray = []
for i in range(dim):
embed_data_x.append(x[i * tau : x.shape[0] - (dim - i - 1) * tau]) # type: ignore
embed_data_y.append(y[i * tau : y.shape[0] - (dim - i - 1) * tau]) # type: ignore
embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y)

distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T)
recurrence_matrix = distance_matrix < threshold
msize = recurrence_matrix.shape[0]

d_line_dist = np.zeros(msize + 1)
for i in range(-msize + 1, msize):
cline = 0
for e in np.diagonal(recurrence_matrix, i):
if e:
cline += 1
else:
d_line_dist[cline] += 1
cline = 0
d_line_dist[cline] += 1

v_line_dist = np.zeros(msize + 1)
for i in range(msize):
cline = 0
for e in recurrence_matrix[:, i]:
if e:
cline += 1
else:
v_line_dist[cline] += 1
cline = 0
v_line_dist[cline] += 1

rr_sum = recurrence_matrix.sum()
rr = rr_sum / msize**2
det = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0
lam = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0

d_sum = d_line_dist[lmin:].sum()
avg_diag_length = (d_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / d_sum if d_sum > 0 else 0
v_sum = d_line_dist[lmin:].sum()
avg_vert_length = (v_line_dist[lmin:] * np.arange(msize + 1)[lmin:]).sum() / v_sum if v_sum > 0 else 0

d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0]
d_probs /= d_probs.sum()
d_entropy = -(d_probs * np.log(d_probs)).sum()

v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0]
v_probs /= v_probs.sum()
v_entropy = -(v_probs * np.log(v_probs)).sum()

return [rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy]
13 changes: 7 additions & 6 deletions src/mopipe/core/common/qtm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def parse_time_stamp(time_stamp: list[str]) -> tuple[datetime, float]:
return ts, unk


def parse_event(event: list[str]) -> tuple[int, float]:
def parse_event(event: list[str]) -> list[tuple[str, int, float]]:
"""Parse the event data from a list of strings.
Parameters
Expand All @@ -76,12 +76,13 @@ def parse_event(event: list[str]) -> tuple[int, float]:
Returns
-------
Tuple[float, float]
Tuple containing the index and elapsed time.
Tuple[str, float, float]
Tuple containing the event name, index and elapsed time.
"""
index = int(event[0])
elapsed_time = float(event[1])
return index, elapsed_time
event_name = event[0]
index = int(event[1])
elapsed_time = float(event[2])
return [(event_name, index, elapsed_time)]


def parse_marker_names(marker_names: list[str]) -> list[str]:
Expand Down
6 changes: 5 additions & 1 deletion src/mopipe/core/data/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ def _parse_metadata_row(self, key: str, values: list[t.Any]) -> None:
The values of the metadata row.
"""
k, v = parse_metadata_row(key, values)
self._metadata[k] = v
if k not in self._metadata:
self._metadata[k] = v
else:
# Metadata entry for an existing key: append values to the list
self._metadata[k] += v

def _extract_metadata_from_file(self, path: Path) -> None:
"""Extract the metadata from a file and return it as a dict.
Expand Down
147 changes: 143 additions & 4 deletions src/mopipe/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import numpy as np
import pandas as pd

from mopipe.core.analysis import calc_rqa
from mopipe.core.common.util import int_or_str_slice
from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput
from mopipe.core.segments.outputs import SingleNumericValueOutput, UnivariateSeriesOutput
from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput, UnivariateSeriesInput
from mopipe.core.segments.outputs import MultivariateSeriesOutput, SingleNumericValueOutput, UnivariateSeriesOutput
from mopipe.core.segments.seg import Segment
from mopipe.core.segments.segmenttypes import SummaryType
from mopipe.core.segments.segmenttypes import AnalysisType, SummaryType, TransformType


class Mean(SummaryType, AnySeriesInput, SingleNumericValueOutput, Segment):
Expand All @@ -20,7 +21,10 @@ def process(self, x: t.Union[pd.Series, pd.DataFrame], **kwargs) -> float: # no

class ColMeans(SummaryType, MultivariateSeriesInput, UnivariateSeriesOutput, Segment):
def process(
self, x: pd.DataFrame, col: t.Union[str, int, slice, None] = None, **kwargs # noqa: ARG002
self,
x: pd.DataFrame,
col: t.Union[str, int, slice, None] = None,
**kwargs, # noqa: ARG002
) -> pd.Series:
slice_type = None
if x.empty:
Expand All @@ -35,3 +39,138 @@ def process(
return x.select_dtypes(include="number").mean()
msg = f"Invalid col type {type(col)} provided, Must be None, int, str, or a slice."
raise ValueError(msg)


class CalcShift(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self,
x: pd.DataFrame,
cols: pd.Index | None = None,
shift: int = 1,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
if cols is None:
cols = x.columns
for col_name in cols:
col_data = x[col_name].values
new_col_name = col_name + "_shift"
new_col_data = np.concatenate((np.zeros(shift), col_data[shift:] - col_data[:-shift]))
x[new_col_name] = new_col_data
return x


class SimpleGapFilling(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self,
x: pd.DataFrame,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
return x.interpolate(method="linear")


class RQAStats(AnalysisType, UnivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self,
x: pd.Series,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out

xv = x.values
out.loc[len(out)] = calc_rqa(xv, xv, dim, tau, threshold, lmin)
return out


class CrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self,
x: pd.DataFrame,
col_a: t.Union[str, int] = 0,
col_b: t.Union[str, int] = 0,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out
if isinstance(col_a, int):
xa = x.iloc[:, col_a].values
if isinstance(col_a, str):
xa = x.loc[:, col_a].values
if isinstance(col_b, int):
xb = x.iloc[:, col_b].values
if isinstance(col_b, str):
xb = x.loc[:, col_b].values

out.loc[len(out)] = calc_rqa(xa, xb, dim, tau, threshold, lmin)
return out


class WindowedCrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment):
def process(
self,
x: pd.DataFrame,
col_a: t.Union[str, int] = 0,
col_b: t.Union[str, int] = 0,
dim: int = 1,
tau: int = 1,
threshold: float = 0.1,
lmin: int = 2,
window: int = 100,
step: int = 10,
**kwargs, # noqa: ARG002
) -> pd.DataFrame:
out = pd.DataFrame(
columns=[
"recurrence_rate",
"determinism",
"laminarity",
"avg_diag_length",
"avg_vert_length",
"d_entropy",
"v_entropy",
]
)
if x.empty:
return out
if isinstance(col_a, int):
xa = x.iloc[:, col_a].values
if isinstance(col_a, str):
xa = x.loc[:, col_a].values
if isinstance(col_b, int):
xb = x.iloc[:, col_b].values
if isinstance(col_b, str):
xb = x.loc[:, col_b].values

for w in range(0, xa.shape[0] - window + 1, step):
out.loc[len(out)] = calc_rqa(xa[w : w + window], xb[w : w + window], dim, tau, threshold, lmin)
return out
30 changes: 30 additions & 0 deletions tests/core/analysis/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from mopipe.core.analysis import Pipeline


class TestPipeline:
def test_run_with_no_segments(self):
pipeline = Pipeline([])
output = pipeline.run(x=None)
assert output is None

def test_run_with_single_segment(self):
segment = MockSegment()
pipeline = Pipeline([segment])
output = pipeline.run(x=1)
assert output == segment.process_output

def test_run_with_multiple_segments(self):
segment1 = MockSegment()
segment2 = MockSegment()
pipeline = Pipeline([segment1, segment2])
output = pipeline.run(x=1)
assert output == segment2.process_output


class MockSegment:
def __init__(self):
self.process_output = None

def __call__(self, **kwargs):
self.process_output = kwargs["x"] + 1
return self.process_output
10 changes: 10 additions & 0 deletions tests/core/data/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,13 @@ def test_reader():
# number of markers * 3 (x,y,z) + 1 (time)
# frame number becomes the index
assert len(timeseries.data.columns) == metadata[MocapMetadataEntries["marker_count"]] * 3 + 1


def test_reading_events():
reader = MocapReader(
source=Path("tests/fixtures/sample_dance_with_header_and_events.tsv"),
name="test",
)
metadata = reader.metadata
assert metadata["event"] is not None
assert len(metadata["event"]) == 3
Loading

0 comments on commit 79523ed

Please sign in to comment.