From 85a5e389af31ee7ae34addc82dfed77c503db51b Mon Sep 17 00:00:00 2001 From: Marco Edward Gorelli Date: Thu, 28 Mar 2024 14:54:20 +0000 Subject: [PATCH] fix: ensure first datapoint is always included in group_by_dynamic (#15312) --- crates/polars-time/src/windows/bounds.rs | 10 +- crates/polars-time/src/windows/group_by.rs | 3 +- crates/polars-time/src/windows/test.rs | 40 +++-- crates/polars-time/src/windows/window.rs | 143 ++++++++++++++---- py-polars/polars/dataframe/frame.py | 12 +- py-polars/polars/lazyframe/frame.py | 14 +- .../unit/operations/test_group_by_dynamic.py | 35 ++++- 7 files changed, 205 insertions(+), 52 deletions(-) diff --git a/crates/polars-time/src/windows/bounds.rs b/crates/polars-time/src/windows/bounds.rs index eba76ac7fb72..07757620cfe1 100644 --- a/crates/polars-time/src/windows/bounds.rs +++ b/crates/polars-time/src/windows/bounds.rs @@ -63,7 +63,15 @@ impl Bounds { pub(crate) fn is_future(&self, t: i64, closed: ClosedWindow) -> bool { match closed { ClosedWindow::Left | ClosedWindow::None => self.stop <= t, - ClosedWindow::Both | ClosedWindow::Right => t > self.stop, + ClosedWindow::Both | ClosedWindow::Right => self.stop < t, + } + } + + #[inline] + pub(crate) fn is_past(&self, t: i64, closed: ClosedWindow) -> bool { + match closed { + ClosedWindow::Left | ClosedWindow::Both => self.start > t, + ClosedWindow::None | ClosedWindow::Right => self.start >= t, } } } diff --git a/crates/polars-time/src/windows/group_by.rs b/crates/polars-time/src/windows/group_by.rs index 0da725707eb7..c7cb2429fa22 100644 --- a/crates/polars-time/src/windows/group_by.rs +++ b/crates/polars-time/src/windows/group_by.rs @@ -180,6 +180,7 @@ pub fn group_by_windows( window .get_overlapping_bounds_iter( boundary, + closed_window, tu, tz.parse::().ok().as_ref(), start_by, @@ -198,7 +199,7 @@ pub fn group_by_windows( _ => { update_groups_and_bounds( window - .get_overlapping_bounds_iter(boundary, tu, None, start_by) + .get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by) .unwrap(), start_offset, time, diff --git a/crates/polars-time/src/windows/test.rs b/crates/polars-time/src/windows/test.rs index 7b573c14d49f..d0b8dbd67b67 100644 --- a/crates/polars-time/src/windows/test.rs +++ b/crates/polars-time/src/windows/test.rs @@ -148,8 +148,8 @@ fn test_groups_large_interval() { false, Default::default(), ); - assert_eq!(groups.len(), 2); - assert_eq!(groups[1], [2, 2]); + assert_eq!(groups.len(), 3); + assert_eq!(groups[1], [1, 1]); } #[test] @@ -167,7 +167,9 @@ fn test_offset() { Duration::parse("-2m"), ); - let b = w.get_earliest_bounds_ns(t, None).unwrap(); + let b = w + .get_earliest_bounds_ns(t, ClosedWindow::Left, None) + .unwrap(); let start = NaiveDate::from_ymd_opt(2020, 1, 1) .unwrap() .and_hms_opt(23, 58, 0) @@ -209,7 +211,9 @@ fn test_boundaries() { ); // earliest bound is first datapoint: 2021-12-16 00:00:00 - let b = w.get_earliest_bounds_ns(ts[0], None).unwrap(); + let b = w + .get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None) + .unwrap(); assert_eq!(b.start, start.and_utc().timestamp_nanos_opt().unwrap()); // test closed: "both" (includes both ends of the interval) @@ -340,9 +344,10 @@ fn test_boundaries() { false, Default::default(), ); - assert_eq!(groups[0], [1, 2]); // 00:00:00 -> 00:30:00 - assert_eq!(groups[1], [3, 2]); // 01:00:00 -> 01:30:00 - assert_eq!(groups[2], [5, 2]); // 02:00:00 -> 02:30:00 + assert_eq!(groups[0], [0, 1]); // (2021-12-15 23:30, 2021-12-16 00:00] + assert_eq!(groups[1], [1, 2]); // (2021-12-16 00:00, 2021-12-16 00:30] + assert_eq!(groups[2], [3, 2]); // (2021-12-16 00:30, 2021-12-16 01:00] + assert_eq!(groups[3], [5, 2]); // (2021-12-16 01:00, 2021-12-16 01:30] // test closed: "none" (should not include left or right end of interval) let (groups, _, _) = group_by_windows( @@ -388,14 +393,18 @@ fn test_boundaries_2() { // period 1h // offset 30m let offset = Duration::parse("30m"); - let w = Window::new(Duration::parse("2h"), Duration::parse("1h"), offset); + let every = Duration::parse("2h"); + let w = Window::new(every, Duration::parse("1h"), offset); // earliest bound is first datapoint: 2021-12-16 00:00:00 + 30m offset: 2021-12-16 00:30:00 - let b = w.get_earliest_bounds_ns(ts[0], None).unwrap(); + // We then shift back by `every` (2h): 2021-12-15 22:30:00 + let b = w + .get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None) + .unwrap(); assert_eq!( b.start, - start.and_utc().timestamp_nanos_opt().unwrap() + offset.duration_ns() + start.and_utc().timestamp_nanos_opt().unwrap() + offset.duration_ns() - every.duration_ns() ); let (groups, lower, higher) = group_by_windows( @@ -520,7 +529,9 @@ fn test_boundaries_ms() { ); // earliest bound is first datapoint: 2021-12-16 00:00:00 - let b = w.get_earliest_bounds_ms(ts[0], None).unwrap(); + let b = w + .get_earliest_bounds_ms(ts[0], ClosedWindow::Both, None) + .unwrap(); assert_eq!(b.start, start.and_utc().timestamp_millis()); // test closed: "both" (includes both ends of the interval) @@ -651,9 +662,10 @@ fn test_boundaries_ms() { false, Default::default(), ); - assert_eq!(groups[0], [1, 2]); // 00:00:00 -> 00:30:00 - assert_eq!(groups[1], [3, 2]); // 01:00:00 -> 01:30:00 - assert_eq!(groups[2], [5, 2]); // 02:00:00 -> 02:30:00 + assert_eq!(groups[0], [0, 1]); // (2021-12-15 23:30, 2021-12-16 00:00] + assert_eq!(groups[1], [1, 2]); // (2021-12-16 00:00, 2021-12-16 00:30] + assert_eq!(groups[2], [3, 2]); // (2021-12-16 00:30, 2021-12-16 01:00] + assert_eq!(groups[3], [5, 2]); // (2021-12-16 01:00, 2021-12-16 01:30] // test closed: "none" (should not include left or right end of interval) let (groups, _, _) = group_by_windows( diff --git a/crates/polars-time/src/windows/window.rs b/crates/polars-time/src/windows/window.rs index 8adb7520ecfe..16d43c4da3d8 100644 --- a/crates/polars-time/src/windows/window.rs +++ b/crates/polars-time/src/windows/window.rs @@ -8,6 +8,37 @@ use polars_core::prelude::*; use crate::prelude::*; +/// Ensure that earliest datapoint (`t`) is in, or in front of, first window. +/// +/// For example, if we have: +/// +/// - first datapoint is `2020-01-01 01:00` +/// - `every` is `'1d'` +/// - `period` is `'2d'` +/// - `offset` is `'6h'` +/// +/// then truncating the earliest datapoint by `every` and adding `offset` results +/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint +/// a chance of being included, we then shift the window back by `every` to +/// `[2019-12-31 06:00, 2020-01-02 06:00)`. +pub(crate) fn ensure_t_in_or_in_front_of_window( + mut every: Duration, + t: i64, + offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult, + period: Duration, + mut start: i64, + closed_window: ClosedWindow, + tz: Option<&Tz>, +) -> PolarsResult { + every.negative = !every.negative; + let mut stop = offset_fn(&period, start, tz)?; + while Bounds::new(start, stop).is_past(t, closed_window) { + start = offset_fn(&every, start, tz)?; + stop = offset_fn(&period, start, tz)?; + } + Ok(Bounds::new_checked(start, stop)) +} + /// Represents a window in time #[derive(Copy, Clone)] pub struct Window { @@ -82,24 +113,58 @@ impl Window { /// returns the bounds for the earliest window bounds /// that contains the given time t. For underlapping windows that /// do not contain time t, the window directly after time t will be returned. - pub fn get_earliest_bounds_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult { + pub fn get_earliest_bounds_ns( + &self, + t: i64, + closed_window: ClosedWindow, + tz: Option<&Tz>, + ) -> PolarsResult { let start = self.truncate_ns(t, tz)?; - let stop = self.period.add_ns(start, tz)?; - - Ok(Bounds::new_checked(start, stop)) + ensure_t_in_or_in_front_of_window( + self.every, + t, + Duration::add_ns, + self.period, + start, + closed_window, + tz, + ) } - pub fn get_earliest_bounds_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult { + pub fn get_earliest_bounds_us( + &self, + t: i64, + closed_window: ClosedWindow, + tz: Option<&Tz>, + ) -> PolarsResult { let start = self.truncate_us(t, tz)?; - let stop = self.period.add_us(start, tz)?; - Ok(Bounds::new_checked(start, stop)) + ensure_t_in_or_in_front_of_window( + self.every, + t, + Duration::add_us, + self.period, + start, + closed_window, + tz, + ) } - pub fn get_earliest_bounds_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult { + pub fn get_earliest_bounds_ms( + &self, + t: i64, + closed_window: ClosedWindow, + tz: Option<&Tz>, + ) -> PolarsResult { let start = self.truncate_ms(t, tz)?; - let stop = self.period.add_ms(start, tz)?; - - Ok(Bounds::new_checked(start, stop)) + ensure_t_in_or_in_front_of_window( + self.every, + t, + Duration::add_ms, + self.period, + start, + closed_window, + tz, + ) } pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize { @@ -120,11 +185,12 @@ impl Window { pub fn get_overlapping_bounds_iter<'a>( &'a self, boundary: Bounds, + closed_window: ClosedWindow, tu: TimeUnit, tz: Option<&'a Tz>, start_by: StartBy, ) -> PolarsResult { - BoundsIter::new(*self, boundary, tu, tz, start_by) + BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by) } } @@ -140,6 +206,7 @@ pub struct BoundsIter<'a> { impl<'a> BoundsIter<'a> { fn new( window: Window, + closed_window: ClosedWindow, boundary: Bounds, tu: TimeUnit, tz: Option<&'a Tz>, @@ -157,14 +224,20 @@ impl<'a> BoundsIter<'a> { boundary }, StartBy::WindowBound => match tu { - TimeUnit::Nanoseconds => window.get_earliest_bounds_ns(boundary.start, tz)?, - TimeUnit::Microseconds => window.get_earliest_bounds_us(boundary.start, tz)?, - TimeUnit::Milliseconds => window.get_earliest_bounds_ms(boundary.start, tz)?, + TimeUnit::Nanoseconds => { + window.get_earliest_bounds_ns(boundary.start, closed_window, tz)? + }, + TimeUnit::Microseconds => { + window.get_earliest_bounds_us(boundary.start, closed_window, tz)? + }, + TimeUnit::Milliseconds => { + window.get_earliest_bounds_ms(boundary.start, closed_window, tz)? + }, }, _ => { { #[allow(clippy::type_complexity)] - let (from, to, offset): ( + let (from, to, offset_fn): ( fn(i64) -> NaiveDateTime, fn(NaiveDateTime) -> i64, fn(&Duration, i64, Option<&Tz>) -> PolarsResult, @@ -186,9 +259,8 @@ impl<'a> BoundsIter<'a> { ), }; // find beginning of the week. - let mut boundary = boundary; let dt = from(boundary.start); - (boundary.start, boundary.stop) = match tz { + match tz { #[cfg(feature = "timezones")] Some(tz) => { let dt = tz.from_utc_datetime(&dt); @@ -196,16 +268,24 @@ impl<'a> BoundsIter<'a> { let dt = dt.naive_utc(); let start = to(dt); // adjust start of the week based on given day of the week - let start = offset( + let start = offset_fn( &Duration::parse(&format!("{}d", start_by.weekday().unwrap())), start, Some(tz), )?; // apply the 'offset' - let start = offset(&window.offset, start, Some(tz))?; + let start = offset_fn(&window.offset, start, Some(tz))?; + // make sure the first datapoint has a chance to be included // and compute the end of the window defined by the 'period' - let stop = offset(&window.period, start, Some(tz))?; - (start, stop) + ensure_t_in_or_in_front_of_window( + window.every, + boundary.start, + offset_fn, + window.period, + start, + closed_window, + Some(tz), + )? }, _ => { let tz = chrono::Utc; @@ -214,20 +294,27 @@ impl<'a> BoundsIter<'a> { let dt = dt.naive_utc(); let start = to(dt); // adjust start of the week based on given day of the week - let start = offset( + let start = offset_fn( &Duration::parse(&format!("{}d", start_by.weekday().unwrap())), start, None, ) .unwrap(); // apply the 'offset' - let start = offset(&window.offset, start, None).unwrap(); + let start = offset_fn(&window.offset, start, None).unwrap(); + // make sure the first datapoint has a chance to be included // and compute the end of the window defined by the 'period' - let stop = offset(&window.period, start, None).unwrap(); - (start, stop) + ensure_t_in_or_in_front_of_window( + window.every, + boundary.start, + offset_fn, + window.period, + start, + closed_window, + None, + )? }, - }; - boundary + } } }, }; diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 16f21574cd63..2d0e9cde550e 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -5554,8 +5554,8 @@ def group_by_dynamic( - [start + 2*every, start + 2*every + period) - ... - where `start` is determined by `start_by`, `offset`, and `every` (see parameter - descriptions below). + where `start` is determined by `start_by`, `offset`, `every`, and the earliest + datapoint. See the `start_by` argument description for details. .. warning:: The index column must be sorted in ascending order. If `by` is passed, then @@ -5577,7 +5577,7 @@ def group_by_dynamic( period length of the window, if None it will equal 'every' offset - offset of the window, only takes effect if `start_by` is `'window'`. + offset of the window, does not take effect if `start_by` is 'datapoint'. Defaults to negative `every`. truncate truncate the time value to the window lower bound @@ -5613,6 +5613,9 @@ def group_by_dynamic( * 'tuesday': Start the window on the Tuesday before the first data point. * ... * 'sunday': Start the window on the Sunday before the first data point. + + The resulting window is then shifted back until the earliest datapoint + is in or in front of it. check_sorted Check whether `index_column` is sorted (or, if `group_by` is given, check whether it's sorted within each group). @@ -10694,6 +10697,9 @@ def groupby_dynamic( * 'tuesday': Start the window on the Tuesday before the first data point. * ... * 'sunday': Start the window on the Sunday before the first data point. + + The resulting window is then shifted back until the earliest datapoint + is in or in front of it. check_sorted Check whether `index_column` is sorted (or, if `by` is given, check whether it's sorted within each group). diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 05d5124bf11e..4ec0a5632e46 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -3403,8 +3403,8 @@ def group_by_dynamic( - [start + 2*every, start + 2*every + period) - ... - where `start` is determined by `start_by`, `offset`, and `every` (see parameter - descriptions below). + where `start` is determined by `start_by`, `offset`, `every`, and the earliest + datapoint. See the `start_by` argument description for details. .. warning:: The index column must be sorted in ascending order. If `by` is passed, then @@ -3426,7 +3426,7 @@ def group_by_dynamic( period length of the window, if None it will equal 'every' offset - offset of the window, only takes effect if `start_by` is `'window'`. + offset of the window, does not take effect if `start_by` is 'datapoint'. Defaults to negative `every`. truncate truncate the time value to the window lower bound @@ -3462,6 +3462,9 @@ def group_by_dynamic( * 'tuesday': Start the window on the Tuesday before the first data point. * ... * 'sunday': Start the window on the Sunday before the first data point. + + The resulting window is then shifted back until the earliest datapoint + is in or in front of it. check_sorted Check whether `index_column` is sorted (or, if `group_by` is given, check whether it's sorted within each group). @@ -6447,7 +6450,7 @@ def groupby_dynamic( period length of the window, if None it will equal 'every' offset - offset of the window, only takes effect if `start_by` is `'window'`. + offset of the window, does not take effect if `start_by` is 'datapoint'. Defaults to negative `every`. truncate truncate the time value to the window lower bound @@ -6472,6 +6475,9 @@ def groupby_dynamic( * 'tuesday': Start the window on the Tuesday before the first data point. * ... * 'sunday': Start the window on the Sunday before the first data point. + + The resulting window is then shifted back until the earliest datapoint + is in or in front of it. check_sorted Check whether `index_column` is sorted (or, if `by` is given, check whether it's sorted within each group). diff --git a/py-polars/tests/unit/operations/test_group_by_dynamic.py b/py-polars/tests/unit/operations/test_group_by_dynamic.py index 6ed1a3d6ab53..12b26a4b7ded 100644 --- a/py-polars/tests/unit/operations/test_group_by_dynamic.py +++ b/py-polars/tests/unit/operations/test_group_by_dynamic.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import date, datetime, timedelta +from datetime import date, datetime, timedelta, timezone from typing import TYPE_CHECKING, Any import numpy as np @@ -990,3 +990,36 @@ def test_group_by_dynamic_check_sorted_15225() -> None: assert_frame_equal(result, expected) with pytest.raises(pl.InvalidOperationError, match="not explicitly sorted"): result = df.group_by_dynamic("b", every="2d").agg(pl.sum("a")) + + +@pytest.mark.parametrize("start_by", ["window", "friday"]) +def test_earliest_point_included_when_offset_is_set_15241(start_by: StartBy) -> None: + df = pl.DataFrame( + data={ + "t": pl.Series( + [ + datetime(2024, 3, 22, 3, 0, tzinfo=timezone.utc), + datetime(2024, 3, 22, 4, 0, tzinfo=timezone.utc), + datetime(2024, 3, 22, 5, 0, tzinfo=timezone.utc), + datetime(2024, 3, 22, 6, 0, tzinfo=timezone.utc), + ] + ), + "v": [1, 10, 100, 1000], + } + ).set_sorted("t") + result = df.group_by_dynamic( + index_column="t", + every="1d", + offset=timedelta(hours=5), + start_by=start_by, + ).agg("v") + expected = pl.DataFrame( + { + "t": [ + datetime(2024, 3, 21, 5, 0, tzinfo=timezone.utc), + datetime(2024, 3, 22, 5, 0, tzinfo=timezone.utc), + ], + "v": [[1, 10], [100, 1000]], + } + ) + assert_frame_equal(result, expected)