Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Adding support for single threshold non-wear detection across watch platforms #105

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/wristpy/core/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Configuration module for wristpy."""

import logging
from typing import Optional

import pydantic_settings

Expand All @@ -11,6 +12,7 @@ class Settings(pydantic_settings.BaseSettings):
LIGHT_THRESHOLD: float = 0.03
MODERATE_THRESHOLD: float = 0.1
VIGOROUS_THRESHOLD: float = 0.3
RANGE_CRITERIA: Optional[float] = None
Asanto32 marked this conversation as resolved.
Show resolved Hide resolved

LOGGING_LEVEL: int = logging.INFO

Expand Down
18 changes: 5 additions & 13 deletions src/wristpy/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,12 @@ def run(
if epoch_length is not None:
enmo = computations.moving_mean(enmo, epoch_length=epoch_length)
anglez = computations.moving_mean(anglez, epoch_length=epoch_length)

# Watches require different criteria due to differences in the sensor values on the
# lower end of the distribution.

if input.suffix == ".bin":
range_criterion = 0.5
elif input.suffix == ".gt3x":
range_criterion = 0.05
if settings.RANGE_CRITERIA is not None:
non_wear_array = metrics.detect_nonwear(
calibrated_acceleration, range_criteria=settings.RANGE_CRITERIA
)
else:
raise exceptions.InvalidFileTypeError("Unknown input file type.")

non_wear_array = metrics.detect_nonwear(
calibrated_acceleration, range_criteria=range_criterion
)
non_wear_array = metrics.detect_nonwear(calibrated_acceleration)

sleep_detector = analytics.GGIRSleepDetection(anglez)
sleep_windows = sleep_detector.run_sleep_detection()
Expand Down
27 changes: 19 additions & 8 deletions src/wristpy/processing/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Calculate base metrics, anglez and enmo."""

from typing import Optional

import numpy as np
import polars as pl

Expand Down Expand Up @@ -63,7 +65,7 @@ def detect_nonwear(
short_epoch_length: int = 900,
n_short_epoch_in_long_epoch: int = 4,
std_criteria: float = 0.013,
range_criteria: float = 0.05,
range_criteria: Optional[float] = None,
) -> models.Measurement:
"""Set non_wear_flag based on accelerometer data.

Expand All @@ -90,7 +92,8 @@ def detect_nonwear(
short_epoch_length: The short window size, in seconds.
n_short_epoch_in_long_epoch: Number of short epochs that makeup one long epoch.
std_criteria: Threshold criteria for standard deviation.
range_criteria: Threshold criteria for range of acceleration.
range_criteria: Optional threshold criteria for range of acceleration.
Default value is None to use solely the standard deviation criteria.

Returns:
A new Measurment instance with the non-wear flag and corresponding timestamps.
Expand Down Expand Up @@ -152,7 +155,7 @@ def _compute_nonwear_value_array(
grouped_acceleration: pl.DataFrame,
n_short_epoch_in_long_epoch: int,
std_criteria: float,
range_criteria: float,
range_criteria: Optional[float] = None,
) -> np.ndarray:
"""Helper function to calculate the nonwear value array.

Expand Down Expand Up @@ -183,7 +186,9 @@ def _compute_nonwear_value_array(
calculated_nonwear_value = acceleration_selected_long_window.select(
pl.col("X", "Y", "Z").map_batches(
lambda df: _compute_nonwear_value_per_axis(
df, std_criteria, range_criteria
df,
std_criteria,
range_criteria,
)
)
).sum_horizontal()
Expand All @@ -200,7 +205,9 @@ def _compute_nonwear_value_array(


def _compute_nonwear_value_per_axis(
axis_acceleration_data: pl.Series, std_criteria: float, range_criteria: float
axis_acceleration_data: pl.Series,
std_criteria: float,
range_criteria: Optional[float] = None,
) -> bool:
"""Helper function to calculate the nonwear criteria per axis.

Expand All @@ -210,16 +217,20 @@ def _compute_nonwear_value_per_axis(
acceleration data of one axis (length of each list is the number of samples
that make up short_epoch_length in seconds).
std_criteria: Threshold criteria for standard deviation
range_criteria: Threshold criteria for range of acceleration
range_criteria: Threshold criteria for range of acceleration. If None, only
standard deviation criteria is used.

Returns:
Non-wear value for the axis.
"""
axis_long_window_data = pl.concat(axis_acceleration_data, how="vertical")
axis_std = axis_long_window_data.std()
axis_range = axis_long_window_data.max() - axis_long_window_data.min()
if range_criteria is not None:
axis_range = axis_long_window_data.max() - axis_long_window_data.min()
criteria_boolean = (axis_std < std_criteria) & (axis_range < range_criteria)
else:
criteria_boolean = axis_std < std_criteria

criteria_boolean = (axis_std < std_criteria) & (axis_range < range_criteria)
return criteria_boolean


Expand Down
25 changes: 24 additions & 1 deletion tests/smoke/test_orchestrator_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from wristpy.core import orchestrator
from wristpy.core import config, orchestrator


@pytest.mark.parametrize(
Expand Down Expand Up @@ -40,3 +40,26 @@ def test_orchestrator_different_epoch(
assert results.nonwear_epoch is not None
assert results.sleep_windows_epoch is not None
assert results.physical_activity_levels is not None


@pytest.mark.parametrize(
"file_name", [pathlib.Path("test_output.csv"), pathlib.Path("test_output.parquet")]
)
def test_happy_path_range_criteria(
file_name: pathlib.Path, tmp_path: pathlib.Path, sample_data_gt3x: pathlib.Path
) -> None:
"""Happy path for orchestrator."""
settings_range_criteria = config.Settings(RANGE_CRITERIA=0.1)
results = orchestrator.run(
Asanto32 marked this conversation as resolved.
Show resolved Hide resolved
input=sample_data_gt3x,
output=tmp_path / file_name,
settings=settings_range_criteria,
)

assert (tmp_path / file_name).exists()
assert isinstance(results, orchestrator.Results)
assert results.enmo is not None
assert results.anglez is not None
assert results.nonwear_epoch is not None
assert results.sleep_windows_epoch is not None
assert results.physical_activity_levels is not None
Asanto32 marked this conversation as resolved.
Show resolved Hide resolved
86 changes: 86 additions & 0 deletions tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,32 @@ def test_group_acceleration_data_by_time() -> None:
)
def test_compute_nonwear_value_per_axis(
create_acceleration: pl.DataFrame, modifier: int, expected_result: int
) -> None:
"""Test the nonwear value per axis function."""
std_criteria = modifier
Asanto32 marked this conversation as resolved.
Show resolved Hide resolved
acceleration = create_acceleration.with_columns(pl.col("time").set_sorted())
acceleration_grouped = acceleration.group_by_dynamic(
index_column="time", every="5s"
).agg([pl.all().exclude(["time"])])

test_resultx = metrics._compute_nonwear_value_per_axis(
acceleration_grouped["X"], std_criteria
)

assert (
test_resultx == expected_result
), f"Expected {expected_result}, got: {test_resultx}"


@pytest.mark.parametrize(
"modifier, expected_result",
[
(1, 1),
(0, 0),
],
)
def test_compute_nonwear_value_per_axis_range(
create_acceleration: pl.DataFrame, modifier: int, expected_result: int
) -> None:
"""Test the nonwear value per axis function."""
std_criteria = modifier
Expand Down Expand Up @@ -181,6 +207,30 @@ def test_compute_nonwear_value_array(create_acceleration: pl.DataFrame) -> None:
expected_time_length = len(acceleration_grouped)
expected_result = 3

test_result = metrics._compute_nonwear_value_array(
acceleration_grouped,
n_short_epoch_in_long_epoch,
std_criteria=1,
)

assert np.all(
test_result == expected_result
), f"Expected {expected_result}, got: {test_result}"
assert (
len(test_result) == expected_time_length
), f"Expected time to be {expected_time_length}, got: {len(test_result)}"


def test_compute_nonwear_value_array_range(create_acceleration: pl.DataFrame) -> None:
"""Test the compute nonwear value array function."""
n_short_epoch_in_long_epoch = int(4)
create_acceleration = create_acceleration.with_columns(pl.col("time").set_sorted())
acceleration_grouped = create_acceleration.group_by_dynamic(
index_column="time", every="5s"
).agg([pl.all().exclude(["time"])])
expected_time_length = len(acceleration_grouped)
expected_result = 3

test_result = metrics._compute_nonwear_value_array(
acceleration_grouped,
n_short_epoch_in_long_epoch,
Expand All @@ -205,6 +255,42 @@ def test_compute_nonwear_value_array(create_acceleration: pl.DataFrame) -> None:
)
def test_detect_nonwear(
create_acceleration: pl.DataFrame, modifier: int, expected_result: int
) -> None:
"""Test the detect nonwear function."""
short_epoch_length = 5
n_short_epoch_in_long_epoch = int(4)
std_criteria = modifier
acceleration_df = create_acceleration
acceleration = models.Measurement(
measurements=acceleration_df.select(["X", "Y", "Z"]).to_numpy(),
time=acceleration_df["time"],
)
expected_time_length = math.ceil(len(acceleration.time) / short_epoch_length)

test_result = metrics.detect_nonwear(
acceleration,
short_epoch_length,
n_short_epoch_in_long_epoch,
std_criteria,
)

assert np.all(
test_result.measurements == modifier
), f"Expected non-wear flag value to be {expected_result}, got: {test_result}"
assert (
len(test_result.time) == expected_time_length
), f"Expected time to be {expected_time_length}, got: {len(test_result.time)}"


@pytest.mark.parametrize(
"modifier, expected_result",
[
(1, 1),
(0, 0),
],
)
def test_detect_nonwear_range(
create_acceleration: pl.DataFrame, modifier: int, expected_result: int
) -> None:
"""Test the detect nonwear function."""
short_epoch_length = 5
Expand Down