diff --git a/src/wristpy/processing/analytics.py b/src/wristpy/processing/analytics.py index 24cd8edc..b29e6591 100644 --- a/src/wristpy/processing/analytics.py +++ b/src/wristpy/processing/analytics.py @@ -100,11 +100,11 @@ def _spt_window( This function finds the absolute difference of the anglez data over 5s windows. We find the 5-minute rolling median of that difference. - Next, we find what that 5-minute median is below a specified threshold, taken as - new default value from the GGIR implementation of the HDCZA algorithm. + Next, we find when that 5-minute median is above a specified threshold, taken as + the new default value from the GGIR implementation of the HDCZA algorithm. This + represents non-sleep candidates. The logical not of this is the sleep candidate. We then find long blocks (30 minutes) when the threshold criteria is met. - Any gaps in SPT windows that are less than a specified window length - (default 60 minutes) are filled. + Any gaps in SPT windows that are less than 60 minutes are filled. Args: anglez_data: the raw anglez data, calculated from calibrated acceleration. @@ -120,13 +120,25 @@ def _spt_window( using an accelerometer without sleep diary. Sci Rep 8, 12975 (2018). https://doi.org/10.1038/s41598-018-31266-z """ + long_epoch_median = 300 + long_block = 360 + short_block_gap = 720 + anglez_abs_diff = self._compute_abs_diff_mean_anglez(anglez_data) - anglez_median_long_epoch = computations.moving_median(anglez_abs_diff, 300) - below_threshold = (anglez_median_long_epoch.measurements < threshold).flatten() + anglez_median_long_epoch = computations.moving_median( + anglez_abs_diff, long_epoch_median + ) + non_sleep_candidates = ( + anglez_median_long_epoch.measurements >= threshold + ).flatten() - sleep_idx_array = self._find_long_blocks(below_threshold) + sleep_candidates = np.logical_not( + self._fill_false_blocks(non_sleep_candidates, long_block) + ) - sleep_idx_array_filled = self._fill_short_blocks(sleep_idx_array) + sleep_idx_array_filled = self._fill_false_blocks( + sleep_candidates, short_block_gap + ) return models.Measurement( measurements=sleep_idx_array_filled, time=anglez_median_long_epoch.time @@ -220,70 +232,42 @@ def _find_onset_wakeup_times( return sleep_windows - def _find_long_blocks( - self, below_threshold: np.ndarray, block_length: int = 360 - ) -> np.ndarray: - """Helper function to find long blocks where SPT window is true. - - This function uses the convolution of a kernel of 1s, of length block_length, - to find the continuous long blocks where SPT window is true. Where the - convolution value is == long_block_length that implies an overlap between the - kernel and the threshold signal of length long_block. - - Args: - below_threshold: the 5-minute rolling median of the anglez difference - that is true when below the cutoff threshold. - block_length: the length of the long block that defines sleep, default is - 30 minutes. (360 chunks of 5s) - - Returns: - A numpy array with 1s indicating the identified SPT windows. - """ - kernel = np.ones(block_length, dtype=int) - convolved = np.convolve(below_threshold, kernel, mode="same") - long_blocks_idx = np.where(convolved == block_length)[0] - sleep_idx_array = np.zeros(len(below_threshold)) - sleep_idx_array[long_blocks_idx] = 1 - - return sleep_idx_array - - def _fill_short_blocks( - self, sleep_idx_array: np.ndarray, gap_block: int = 720 + def _fill_false_blocks( + self, boolean_array: np.ndarray, gap_block: int ) -> np.ndarray: - """Helper function to fill gaps in SPT window that are less than 60 minutes. + """Helper function to fill gaps in SPT window that are less than gap_blocks. We find the first non-zero in the sleep_idx_array, if there are none , we return the initial array. We then iterate over the array and count every zero between ones - (skipping the first 1), + (skipping the leading zeros), if that value is less than the gap_block, we fill in with ones. Args: - sleep_idx_array: the array of SPT windows. - gap_block: the length of the gap that defines sleep, default is 60 minutes. - The units are chunks of 5s. + boolean_array: A generic boolean array, typically the SPT window. + gap_block: the length of the gap that needs to be filled. Returns: - A numpy array with 1s indicating the identified SPT windows. + A numpy array with 1s, typically for identified SPT windows. """ n_zeros = 0 first_one_idx = next( - (index for index, value in enumerate(sleep_idx_array) if value), None + (index for index, value in enumerate(boolean_array) if value), None ) if first_one_idx is None: - return sleep_idx_array + return boolean_array - for sleep_array_idx in range(first_one_idx, len(sleep_idx_array)): - sleep_value = sleep_idx_array[sleep_array_idx] + for sleep_array_idx in range(first_one_idx, len(boolean_array)): + sleep_value = boolean_array[sleep_array_idx] if not sleep_value: n_zeros += 1 continue if n_zeros < gap_block: - sleep_idx_array[sleep_array_idx - n_zeros : sleep_array_idx] = 1 - n_zeros = 0 + boolean_array[sleep_array_idx - n_zeros : sleep_array_idx] = True + n_zeros = 0 - return sleep_idx_array + return boolean_array def _compute_abs_diff_mean_anglez( self, anglez_data: models.Measurement, window_size_seconds: int = 5 diff --git a/tests/unit/test_analytics.py b/tests/unit/test_analytics.py index d3f539d9..ebee89a0 100644 --- a/tests/unit/test_analytics.py +++ b/tests/unit/test_analytics.py @@ -20,35 +20,24 @@ def sleep_detection() -> analytics.GGIRSleepDetection: dummy_date + datetime.timedelta(seconds=i) for i in range(3600) ] test_time = pl.Series("time", dummy_datetime_list) - anglez = np.random.uniform(-90, 90, size=3600) + anglez = np.random.randint(-90, 90, size=3600) anglez_measurement = models.Measurement(measurements=anglez, time=test_time) return analytics.GGIRSleepDetection(anglez_measurement) -def test_find_long_blocks( +def test_fill_false_blocks( sleep_detection: analytics.GGIRSleepDetection, ) -> None: - """Test the _find_long_blocks method.""" - block_length = 3 - below_threshold = np.array([0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0]) - expected_result = np.array([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0]) - - result = sleep_detection._find_long_blocks(below_threshold, block_length) - - assert np.array_equal( - result, expected_result - ), f"Expected {expected_result}, but got {result}" - - -def test_fill_short_blocks( - sleep_detection: analytics.GGIRSleepDetection, -) -> None: - """Test the _fill_short_blocks method.""" + """Test the _fill_false_short_blocks method.""" gap_block = 3 - sleep_idx_array = np.array([0, 0, 0, 1, 1, 0, 0, 1]) - expected_result = np.array([0, 0, 0, 1, 1, 1, 1, 1]) + sleep_idx_array = np.array( + [True, False, True, False, False, False, True, True, False, True, False, True] + ) + expected_result = np.array( + [True, True, True, False, False, False, True, True, True, True, True, True] + ) - result = sleep_detection._fill_short_blocks(sleep_idx_array, gap_block) + result = sleep_detection._fill_false_blocks(sleep_idx_array, gap_block) assert np.array_equal( result, expected_result @@ -164,14 +153,11 @@ def test_remove_nonwear_periods_no_overlap() -> None: def test_spt_window(sleep_detection: analytics.GGIRSleepDetection) -> None: """Test the _spt_window method.""" - half_long_block = 180 sleep_detection.anglez.measurements = np.zeros( len(sleep_detection.anglez.measurements) ) expected_length = int(len(sleep_detection.anglez.measurements) / 5) - 1 expected_result = np.ones(expected_length) - expected_result[0:half_long_block] = 0 - expected_result[-(half_long_block - 1) :] = 0 result = sleep_detection._spt_window(sleep_detection.anglez)