Skip to content

Commit

Permalink
Bugfix: Change _find_long_blocks to catch leading edge (#91)
Browse files Browse the repository at this point in the history
* Bugfix: Change _find_long_blocks to catch leading edge

Modified _find_long_blocks to use single pointer implementation to find chunks of 1s.
Modified tests so that test_find_long_blocks now finds start of 1s in middle and at ending of below_threshold.
Modified test_spt_window to simpler implementation to remove odd cases where random num generator of anglez caused unpredictable sleep detection at leading edge

* Address PR comments

Make functionality of unit test more obvious
Fix redundant if statement logic

* Update analytics.py

Improve algorithm description.

* Refactor and bugfix

refactor the implementation of find_long_blocks and fill_short_blocks as these functions were duplicates. Now there is a single call, however we use 'above_threshold' and np.logical_not for find_long_blocks implementation
Also fix the issue with fill_short_blocks that arose from indentation typo

* Fix documentation

Small changes to docstrings
Define variable for the different epoch lengths explicitly
Renam `_fill_false_blocks`
  • Loading branch information
Asanto32 authored Aug 27, 2024
1 parent 33d43fd commit c4414c2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 74 deletions.
84 changes: 34 additions & 50 deletions src/wristpy/processing/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ def _spt_window(
This function finds the absolute difference of the anglez data over 5s windows.
We find the 5-minute rolling median of that difference.
Next, we find what that 5-minute median is below a specified threshold, taken as
new default value from the GGIR implementation of the HDCZA algorithm.
Next, we find when that 5-minute median is above a specified threshold, taken as
the new default value from the GGIR implementation of the HDCZA algorithm. This
represents non-sleep candidates. The logical not of this is the sleep candidate.
We then find long blocks (30 minutes) when the threshold criteria is met.
Any gaps in SPT windows that are less than a specified window length
(default 60 minutes) are filled.
Any gaps in SPT windows that are less than 60 minutes are filled.
Args:
anglez_data: the raw anglez data, calculated from calibrated acceleration.
Expand All @@ -120,13 +120,25 @@ def _spt_window(
using an accelerometer without sleep diary. Sci Rep 8, 12975 (2018).
https://doi.org/10.1038/s41598-018-31266-z
"""
long_epoch_median = 300
long_block = 360
short_block_gap = 720

anglez_abs_diff = self._compute_abs_diff_mean_anglez(anglez_data)
anglez_median_long_epoch = computations.moving_median(anglez_abs_diff, 300)
below_threshold = (anglez_median_long_epoch.measurements < threshold).flatten()
anglez_median_long_epoch = computations.moving_median(
anglez_abs_diff, long_epoch_median
)
non_sleep_candidates = (
anglez_median_long_epoch.measurements >= threshold
).flatten()

sleep_idx_array = self._find_long_blocks(below_threshold)
sleep_candidates = np.logical_not(
self._fill_false_blocks(non_sleep_candidates, long_block)
)

sleep_idx_array_filled = self._fill_short_blocks(sleep_idx_array)
sleep_idx_array_filled = self._fill_false_blocks(
sleep_candidates, short_block_gap
)

return models.Measurement(
measurements=sleep_idx_array_filled, time=anglez_median_long_epoch.time
Expand Down Expand Up @@ -220,70 +232,42 @@ def _find_onset_wakeup_times(

return sleep_windows

def _find_long_blocks(
self, below_threshold: np.ndarray, block_length: int = 360
) -> np.ndarray:
"""Helper function to find long blocks where SPT window is true.
This function uses the convolution of a kernel of 1s, of length block_length,
to find the continuous long blocks where SPT window is true. Where the
convolution value is == long_block_length that implies an overlap between the
kernel and the threshold signal of length long_block.
Args:
below_threshold: the 5-minute rolling median of the anglez difference
that is true when below the cutoff threshold.
block_length: the length of the long block that defines sleep, default is
30 minutes. (360 chunks of 5s)
Returns:
A numpy array with 1s indicating the identified SPT windows.
"""
kernel = np.ones(block_length, dtype=int)
convolved = np.convolve(below_threshold, kernel, mode="same")
long_blocks_idx = np.where(convolved == block_length)[0]
sleep_idx_array = np.zeros(len(below_threshold))
sleep_idx_array[long_blocks_idx] = 1

return sleep_idx_array

def _fill_short_blocks(
self, sleep_idx_array: np.ndarray, gap_block: int = 720
def _fill_false_blocks(
self, boolean_array: np.ndarray, gap_block: int
) -> np.ndarray:
"""Helper function to fill gaps in SPT window that are less than 60 minutes.
"""Helper function to fill gaps in SPT window that are less than gap_blocks.
We find the first non-zero in the sleep_idx_array, if there are none ,
we return the initial array.
We then iterate over the array and count every zero between ones
(skipping the first 1),
(skipping the leading zeros),
if that value is less than the gap_block, we fill in with ones.
Args:
sleep_idx_array: the array of SPT windows.
gap_block: the length of the gap that defines sleep, default is 60 minutes.
The units are chunks of 5s.
boolean_array: A generic boolean array, typically the SPT window.
gap_block: the length of the gap that needs to be filled.
Returns:
A numpy array with 1s indicating the identified SPT windows.
A numpy array with 1s, typically for identified SPT windows.
"""
n_zeros = 0
first_one_idx = next(
(index for index, value in enumerate(sleep_idx_array) if value), None
(index for index, value in enumerate(boolean_array) if value), None
)
if first_one_idx is None:
return sleep_idx_array
return boolean_array

for sleep_array_idx in range(first_one_idx, len(sleep_idx_array)):
sleep_value = sleep_idx_array[sleep_array_idx]
for sleep_array_idx in range(first_one_idx, len(boolean_array)):
sleep_value = boolean_array[sleep_array_idx]
if not sleep_value:
n_zeros += 1
continue

if n_zeros < gap_block:
sleep_idx_array[sleep_array_idx - n_zeros : sleep_array_idx] = 1
n_zeros = 0
boolean_array[sleep_array_idx - n_zeros : sleep_array_idx] = True
n_zeros = 0

return sleep_idx_array
return boolean_array

def _compute_abs_diff_mean_anglez(
self, anglez_data: models.Measurement, window_size_seconds: int = 5
Expand Down
34 changes: 10 additions & 24 deletions tests/unit/test_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,24 @@ def sleep_detection() -> analytics.GGIRSleepDetection:
dummy_date + datetime.timedelta(seconds=i) for i in range(3600)
]
test_time = pl.Series("time", dummy_datetime_list)
anglez = np.random.uniform(-90, 90, size=3600)
anglez = np.random.randint(-90, 90, size=3600)
anglez_measurement = models.Measurement(measurements=anglez, time=test_time)
return analytics.GGIRSleepDetection(anglez_measurement)


def test_find_long_blocks(
def test_fill_false_blocks(
sleep_detection: analytics.GGIRSleepDetection,
) -> None:
"""Test the _find_long_blocks method."""
block_length = 3
below_threshold = np.array([0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0])
expected_result = np.array([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0])

result = sleep_detection._find_long_blocks(below_threshold, block_length)

assert np.array_equal(
result, expected_result
), f"Expected {expected_result}, but got {result}"


def test_fill_short_blocks(
sleep_detection: analytics.GGIRSleepDetection,
) -> None:
"""Test the _fill_short_blocks method."""
"""Test the _fill_false_short_blocks method."""
gap_block = 3
sleep_idx_array = np.array([0, 0, 0, 1, 1, 0, 0, 1])
expected_result = np.array([0, 0, 0, 1, 1, 1, 1, 1])
sleep_idx_array = np.array(
[True, False, True, False, False, False, True, True, False, True, False, True]
)
expected_result = np.array(
[True, True, True, False, False, False, True, True, True, True, True, True]
)

result = sleep_detection._fill_short_blocks(sleep_idx_array, gap_block)
result = sleep_detection._fill_false_blocks(sleep_idx_array, gap_block)

assert np.array_equal(
result, expected_result
Expand Down Expand Up @@ -164,14 +153,11 @@ def test_remove_nonwear_periods_no_overlap() -> None:

def test_spt_window(sleep_detection: analytics.GGIRSleepDetection) -> None:
"""Test the _spt_window method."""
half_long_block = 180
sleep_detection.anglez.measurements = np.zeros(
len(sleep_detection.anglez.measurements)
)
expected_length = int(len(sleep_detection.anglez.measurements) / 5) - 1
expected_result = np.ones(expected_length)
expected_result[0:half_long_block] = 0
expected_result[-(half_long_block - 1) :] = 0

result = sleep_detection._spt_window(sleep_detection.anglez)

Expand Down

0 comments on commit c4414c2

Please sign in to comment.