Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into fix_read_scan_write_c…
Browse files Browse the repository at this point in the history
…sv_arguments

# Conflicts:
#	py-polars/tests/unit/namespaces/test_meta.py
  • Loading branch information
svaningelgem committed Oct 8, 2023
2 parents b5404d5 + 9d40f0a commit b1d9a74
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 57 deletions.
13 changes: 9 additions & 4 deletions crates/polars-core/src/frame/group_by/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ impl DataFrame {
!by.is_empty(),
ComputeError: "at least one key is required in a group_by operation"
);
let by_len = by[0].len();
let minimal_by_len = by.iter().map(|s| s.len()).min().expect("at least 1 key");
let df_height = self.height();

// we only throw this error if self.width > 0
// so that we can still call this on a dummy dataframe where we provide the keys
if (by_len != self.height()) && (self.width() > 0) {
if (minimal_by_len != df_height) && (self.width() > 0) {
polars_ensure!(
by_len == 1,
minimal_by_len == 1,
ShapeMismatch: "series used as keys should have the same length as the dataframe"
);
by[0] = by[0].new_from_index(0, self.height())
for by_key in by.iter_mut() {
if by_key.len() == minimal_by_len {
*by_key = by_key.new_from_index(0, df_height)
}
}
};

let n_partitions = _set_partition_size();
Expand Down
21 changes: 9 additions & 12 deletions crates/polars-core/src/frame/group_by/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,19 +367,16 @@ impl GroupsProxy {
}
}

pub fn take_group_lasts(self) -> Vec<IdxSize> {
/// # Safety
/// This will not do any bounds checks. The caller must ensure
/// all groups have members.
pub unsafe fn take_group_lasts(self) -> Vec<IdxSize> {
match self {
GroupsProxy::Idx(groups) => {
groups
.all
.iter()
.map(|idx| {
// safety:
// idx has at least one eletment, so -1 is always in bounds
unsafe { *idx.get_unchecked(idx.len() - 1) }
})
.collect()
},
GroupsProxy::Idx(groups) => groups
.all
.iter()
.map(|idx| *idx.get_unchecked(idx.len() - 1))
.collect(),
GroupsProxy::Slice { groups, .. } => groups
.into_iter()
.map(|[first, len]| first + len - 1)
Expand Down
6 changes: 4 additions & 2 deletions crates/polars-ops/src/series/ops/is_last_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ where
#[cfg(feature = "dtype-struct")]
fn is_last_distinct_struct(s: &Series) -> PolarsResult<BooleanChunked> {
let groups = s.group_tuples(true, false)?;
let last = groups.take_group_lasts();
// SAFETY: all groups have at least a single member
let last = unsafe { groups.take_group_lasts() };
let mut out = MutableBitmap::with_capacity(s.len());
out.extend_constant(s.len(), false);

Expand All @@ -165,7 +166,8 @@ fn is_last_distinct_struct(s: &Series) -> PolarsResult<BooleanChunked> {
#[cfg(feature = "group_by_list")]
fn is_last_distinct_list(ca: &ListChunked) -> PolarsResult<BooleanChunked> {
let groups = ca.group_tuples(true, false)?;
let last = groups.take_group_lasts();
// SAFETY: all groups have at least a single member
let last = unsafe { groups.take_group_lasts() };
let mut out = MutableBitmap::with_capacity(ca.len());
out.extend_constant(ca.len(), false);

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/dsl/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use correlation::*;
pub use horizontal::*;
pub use index::*;
#[cfg(feature = "temporal")]
use polars_core::export::arrow::temporal_conversions::NANOSECONDS;
use polars_core::export::arrow::temporal_conversions::{MICROSECONDS, MILLISECONDS, NANOSECONDS};
#[cfg(feature = "temporal")]
use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY;
#[cfg(feature = "dtype-struct")]
Expand Down
69 changes: 48 additions & 21 deletions crates/polars-plan/src/dsl/functions/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pub struct DurationArgs {
pub milliseconds: Expr,
pub microseconds: Expr,
pub nanoseconds: Expr,
pub time_unit: TimeUnit,
}

impl Default for DurationArgs {
Expand All @@ -190,6 +191,7 @@ impl Default for DurationArgs {
milliseconds: lit(0),
microseconds: lit(0),
nanoseconds: lit(0),
time_unit: TimeUnit::Microseconds,
}
}
}
Expand Down Expand Up @@ -258,15 +260,15 @@ pub fn duration(args: DurationArgs) -> Expr {
if s.iter().any(|s| s.is_empty()) {
return Ok(Some(Series::new_empty(
s[0].name(),
&DataType::Duration(TimeUnit::Nanoseconds),
&DataType::Duration(args.time_unit),
)));
}

let days = s[0].cast(&DataType::Int64).unwrap();
let seconds = s[1].cast(&DataType::Int64).unwrap();
let mut nanoseconds = s[2].cast(&DataType::Int64).unwrap();
let microseconds = s[3].cast(&DataType::Int64).unwrap();
let milliseconds = s[4].cast(&DataType::Int64).unwrap();
let mut microseconds = s[3].cast(&DataType::Int64).unwrap();
let mut milliseconds = s[4].cast(&DataType::Int64).unwrap();
let minutes = s[5].cast(&DataType::Int64).unwrap();
let hours = s[6].cast(&DataType::Int64).unwrap();
let weeks = s[7].cast(&DataType::Int64).unwrap();
Expand All @@ -278,34 +280,59 @@ pub fn duration(args: DurationArgs) -> Expr {
(s.len() != max_len && s.get(0).unwrap() != AnyValue::Int64(0)) || s.len() == max_len
};

if nanoseconds.len() != max_len {
nanoseconds = nanoseconds.new_from_index(0, max_len);
}
if condition(&microseconds) {
nanoseconds = nanoseconds + (microseconds * 1_000);
}
if condition(&milliseconds) {
nanoseconds = nanoseconds + (milliseconds * 1_000_000);
}
let multiplier = match args.time_unit {
TimeUnit::Nanoseconds => NANOSECONDS,
TimeUnit::Microseconds => MICROSECONDS,
TimeUnit::Milliseconds => MILLISECONDS,
};

let mut duration = match args.time_unit {
TimeUnit::Nanoseconds => {
if nanoseconds.len() != max_len {
nanoseconds = nanoseconds.new_from_index(0, max_len);
}
if condition(&microseconds) {
nanoseconds = nanoseconds + (microseconds * 1_000);
}
if condition(&milliseconds) {
nanoseconds = nanoseconds + (milliseconds * 1_000_000);
}
nanoseconds
},
TimeUnit::Microseconds => {
if microseconds.len() != max_len {
microseconds = microseconds.new_from_index(0, max_len);
}
if condition(&milliseconds) {
microseconds = microseconds + (milliseconds * 1_000);
}
microseconds
},
TimeUnit::Milliseconds => {
if milliseconds.len() != max_len {
milliseconds = milliseconds.new_from_index(0, max_len);
}
milliseconds
},
};

if condition(&seconds) {
nanoseconds = nanoseconds + (seconds * NANOSECONDS);
duration = duration + (seconds * multiplier);
}
if condition(&days) {
nanoseconds = nanoseconds + (days * NANOSECONDS * SECONDS_IN_DAY);
duration = duration + (days * multiplier * SECONDS_IN_DAY);
}
if condition(&minutes) {
nanoseconds = nanoseconds + minutes * NANOSECONDS * 60;
duration = duration + minutes * multiplier * 60;
}
if condition(&hours) {
nanoseconds = nanoseconds + hours * NANOSECONDS * 60 * 60;
duration = duration + hours * multiplier * 60 * 60;
}
if condition(&weeks) {
nanoseconds = nanoseconds + weeks * NANOSECONDS * SECONDS_IN_DAY * 7;
duration = duration + weeks * multiplier * SECONDS_IN_DAY * 7;
}

nanoseconds
.cast(&DataType::Duration(TimeUnit::Nanoseconds))
.map(Some)
duration.cast(&DataType::Duration(args.time_unit)).map(Some)
}) as Arc<dyn SeriesUdf>);

Expr::AnonymousFunction {
Expand All @@ -320,7 +347,7 @@ pub fn duration(args: DurationArgs) -> Expr {
args.weeks,
],
function,
output_type: GetOutput::from_type(DataType::Duration(TimeUnit::Nanoseconds)),
output_type: GetOutput::from_type(DataType::Duration(args.time_unit)),
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: true,
Expand Down
8 changes: 4 additions & 4 deletions docs/src/python/user-guide/expressions/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@
# --8<-- [end:struct_unnest]

# --8<-- [start:series_struct]
rating_Series = pl.Series(
rating_series = pl.Series(
"ratings",
[
{"Movie": "Cars", "Theatre": "NE", "Avg_Rating": 4.5},
{"Movie": "Toy Story", "Theatre": "ME", "Avg_Rating": 4.9},
],
)
print(rating_Series)
print(rating_series)
# --8<-- [end:series_struct]

# --8<-- [start:series_struct_extract]
out = rating_Series.struct.field("Movie")
out = rating_series.struct.field("Movie")
print(out)
# --8<-- [end:series_struct_extract]

# --8<-- [start:series_struct_rename]
out = (
rating_Series.to_frame()
rating_series.to_frame()
.select(pl.col("ratings").struct.rename_fields(["Film", "State", "Value"]))
.unnest("ratings")
)
Expand Down
6 changes: 3 additions & 3 deletions docs/user-guide/expressions/structs.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Polars will interpret a `dict` sent to the `Series` constructor as a `Struct`:

!!! note "Constructing `Series` objects"

Note that `Series` here was constructed with the `name` of the series in the begninng, followed by the `values`. Providing the latter first
Note that `Series` here was constructed with the `name` of the series in the beginning, followed by the `values`. Providing the latter first
is considered an anti-pattern in Polars, and must be avoided.

### Extracting individual values of a `Struct`
Expand All @@ -60,7 +60,7 @@ Let's say that we needed to obtain just the `movie` value in the `Series` that w

### Renaming individual keys of a `Struct`

What if we need to rename individual `field`s of a `Struct` column? We first convert the `rating_Series` object to a `DataFrame` so that we can view the changes easily, and then use the `rename_fields` method:
What if we need to rename individual `field`s of a `Struct` column? We first convert the `rating_series` object to a `DataFrame` so that we can view the changes easily, and then use the `rename_fields` method:

{{code_block('user-guide/expressions/structs','series_struct_rename',['struct.rename_fields'])}}

Expand All @@ -84,7 +84,7 @@ We can identify the unique cases at this level also with `is_unique`!

### Multi-column ranking

Suppose, given that we know there are duplicates, we want to choose which rank gets a higher priority. We define _Count_ of ratings to be more important than the actual `Avg_Rating` themselves, and only use it to break a tie. We can then do:
Suppose, given that we know there are duplicates, we want to choose which rank gets a higher priority. We define `Count` of ratings to be more important than the actual `Avg_Rating` themselves, and only use it to break a tie. We can then do:

{{code_block('user-guide/expressions/structs','struct_ranking',['is_duplicated', 'struct'])}}

Expand Down
15 changes: 12 additions & 3 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -7408,10 +7408,19 @@ def partition_by(
]

if as_dict:
if len(by) == 1:
return {df[by][0, 0]: df for df in partitions}
df = self._from_pydf(self._df)
if include_key:
if len(by) == 1:
names = [p[by[0]][0] for p in partitions]
else:
names = [p.select(by).row(0) for p in partitions]
else:
return {df[by].row(0): df for df in partitions}
if len(by) == 1:
names = df[by[0]].unique(maintain_order=True).to_list()
else:
names = df.select(by).unique(maintain_order=True).rows()

return dict(zip(names, partitions))

return partitions

Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/functions/as_datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def duration(
minutes: Expr | str | int | None = None,
hours: Expr | str | int | None = None,
weeks: Expr | str | int | None = None,
time_unit: TimeUnit = "us",
) -> Expr:
"""
Create polars `Duration` from distinct time components.
Expand Down Expand Up @@ -294,6 +295,7 @@ def duration(
minutes,
hours,
weeks,
time_unit,
)
)

Expand Down
3 changes: 3 additions & 0 deletions py-polars/src/functions/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ pub fn dtype_cols(dtypes: Vec<Wrap<DataType>>) -> PyResult<PyExpr> {

#[allow(clippy::too_many_arguments)]
#[pyfunction]
#[pyo3(signature = (days, seconds, nanoseconds, microseconds, milliseconds, minutes, hours, weeks, time_unit))]
pub fn duration(
days: Option<PyExpr>,
seconds: Option<PyExpr>,
Expand All @@ -297,6 +298,7 @@ pub fn duration(
minutes: Option<PyExpr>,
hours: Option<PyExpr>,
weeks: Option<PyExpr>,
time_unit: Wrap<TimeUnit>,
) -> PyExpr {
set_unwrapped_or_0!(
days,
Expand All @@ -317,6 +319,7 @@ pub fn duration(
minutes,
hours,
weeks,
time_unit: time_unit.0,
};
dsl::duration(args).into()
}
Expand Down
13 changes: 13 additions & 0 deletions py-polars/tests/unit/dataframe/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,19 @@ def test_partition_by() -> None:
"b": [1, 3],
}

# test with both as_dict and include_key=False
df = pl.DataFrame(
{
"a": pl.int_range(0, 100, dtype=pl.UInt8, eager=True),
"b": pl.int_range(0, 100, dtype=pl.UInt8, eager=True),
"c": pl.int_range(0, 100, dtype=pl.UInt8, eager=True),
"d": pl.int_range(0, 100, dtype=pl.UInt8, eager=True),
}
).sample(n=100_000, with_replacement=True, shuffle=True)

partitions = df.partition_by(["a", "b"], as_dict=True, include_key=False)
assert all(key == value.row(0) for key, value in partitions.items())


def test_list_of_list_of_struct() -> None:
expected = [{"list_of_list_of_struct": [[{"a": 1}, {"a": 2}]]}]
Expand Down
30 changes: 28 additions & 2 deletions py-polars/tests/unit/functions/test_as_datatype.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from datetime import date, datetime
from datetime import date, datetime, timedelta
from typing import TYPE_CHECKING

import pytest
Expand Down Expand Up @@ -96,10 +96,36 @@ def test_time() -> None:

def test_empty_duration() -> None:
s = pl.DataFrame([], {"days": pl.Int32}).select(pl.duration(days="days"))
assert s.dtypes == [pl.Duration("ns")]
assert s.dtypes == [pl.Duration("us")]
assert s.shape == (0, 1)


@pytest.mark.parametrize(
("time_unit", "expected"),
[
("ms", timedelta(days=1, minutes=2, seconds=3, milliseconds=4)),
("us", timedelta(days=1, minutes=2, seconds=3, milliseconds=4, microseconds=5)),
("ns", timedelta(days=1, minutes=2, seconds=3, milliseconds=4, microseconds=5)),
],
)
def test_duration_time_units(time_unit: TimeUnit, expected: timedelta) -> None:
result = pl.LazyFrame().select(
pl.duration(
days=1,
minutes=2,
seconds=3,
milliseconds=4,
microseconds=5,
nanoseconds=6,
time_unit=time_unit,
)
)
assert result.schema["duration"] == pl.Duration(time_unit)
assert result.collect()["duration"].item() == expected
if time_unit == "ns":
assert result.collect()["duration"].dt.nanoseconds().item() == 86523004005006


def test_list_concat() -> None:
s0 = pl.Series("a", [[1, 2]])
s1 = pl.Series("b", [[3, 4, 5]])
Expand Down
Loading

0 comments on commit b1d9a74

Please sign in to comment.