Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Polars to v0.41.3 #917

Merged
merged 32 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ae29cb6
WIP: making things run, but break some tests
philss May 27, 2024
fcd8678
fix compilation
lkarthee Jun 1, 2024
c702a2b
Merge remote-tracking branch 'upstream/main' into ps-bump-polars-to-v…
lkarthee Jun 24, 2024
9d415dc
bump polars to 0.41
lkarthee Jun 24, 2024
c0b9763
Merge remote-tracking branch 'upstream/main' into ps-bump-polars-to-v…
lkarthee Jul 14, 2024
66ee582
Working version 0.41.3
lkarthee Jul 14, 2024
e79c0f6
don't pass keys when cross-joining
billylanchantin Jul 14, 2024
6f463ff
Run "cargo update"
philss Jul 15, 2024
a7759b7
Changes that fix some tests
philss Jul 16, 2024
0edc5d8
Fix args for pivot_longer
philss Jul 17, 2024
bc234bf
Fix order of arguments for pivot_stable
philss Jul 17, 2024
c80e6dd
Changes in pivot_wider with repeated columns
philss Jul 18, 2024
048d46a
Explicitly use literal expressions with their types
philss Jul 18, 2024
bc4e2a3
Cast to float 64 series for window functions with weights
philss Jul 18, 2024
4274c88
Use aggregate function for polars query
philss Jul 18, 2024
a6d923b
Improve check of weights
philss Jul 18, 2024
2644d27
Attempt to fix "pow/2" after Polars changes
philss Jul 18, 2024
54991c0
Revert "Attempt to fix "pow/2" after Polars changes"
philss Jul 19, 2024
cb3886c
Adapt pow/2 to follow the same rules from Polars
philss Jul 19, 2024
7c2baeb
Capture errors when inspecting DFs at PolarsBackend
philss Jul 19, 2024
878b422
Cleaning comments and add missing option
philss Jul 19, 2024
0b1f39e
Rewrite CloudWriter to use BufWriter from object_store
philss Jul 20, 2024
15f8521
Update object_store to v0.10.2
philss Jul 22, 2024
be9a236
WIP: enable the "to_parquet/2" test case with new object_store
philss Jul 22, 2024
142155b
Revert "WIP: enable the "to_parquet/2" test case with new object_store"
philss Jul 22, 2024
739bc82
Ensure the CloudWriter finishes its job after writers
philss Jul 22, 2024
d64af7e
WIP: saving PoC that didn't work
philss Jul 23, 2024
c8aabfe
Revert "WIP: saving PoC that didn't work"
philss Jul 23, 2024
bb04f29
Raise for when saving lazy df as parquet to cloud using streaming
philss Jul 23, 2024
d7b6010
Add changes to the changelog
philss Jul 24, 2024
64370e2
Last round of "cargo update"
philss Jul 24, 2024
1ce99b9
Update CHANGELOG.md
philss Jul 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `Explorer.Series.pow/2` no longer casts to float when the exponent is a signed
integer. We are following the way Polars works now, which is to try to execute
the operation or raise an exception in case the exponent is negative.

- `Explorer.Series.pivot_wider/4` no longer includes the `names_from` column
name in the new columns when `values_from` is a list of columns. This is more
consistent with its behaviour when `values_from` is a single column.

- `Explorer.Series.substring/3` no longer cycles to the end of the string if the
negative offset surpasses the beginning of that string. In that case, an empty
string is returned.

- The `Explorer.Series.ewm_*` functions no longer replace `nil` values with the
value at the previous index. They now propogate `nil` values through to the
result series.

- Saving a dataframe as a Parquet file to S3 services no longer works when
streaming is enabled. This is temporary due to a bug in Polars. An exception
should be raised instead.

## [v0.8.3] - 2024-06-10

### Added
Expand Down
23 changes: 11 additions & 12 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2778,7 +2778,7 @@ defmodule Explorer.DataFrame do
#Explorer.DataFrame<
Polars[3 x 2]
a string ["a", "b", "c"]
b f64 [1.0, 4.0, 9.0]
b s64 [1, 4, 9]
>

It's possible to "reuse" a variable for different computations:
Expand Down Expand Up @@ -4745,8 +4745,7 @@ defmodule Explorer.DataFrame do

Multiple columns are accepted for the `values_from` parameter, but the behaviour is slightly
different for the naming of new columns in the resultant dataframe. The new columns are going
to be prefixed by the name of the original value column, followed by an underscore and the
original column name, followed by the name of the variable.
to be prefixed by the name of the original value column, followed by the name of the variable.

iex> df = Explorer.DataFrame.new(
iex> product_id: [1, 1, 1, 1, 2, 2, 2, 2],
Expand All @@ -4758,14 +4757,14 @@ defmodule Explorer.DataFrame do
#Explorer.DataFrame<
Polars[2 x 9]
product_id s64 [1, 2]
property_value_property_product_id s64 [1, 2]
property_value_property_width_cm s64 [42, 35]
property_value_property_height_cm s64 [40, 20]
property_value_property_length_cm s64 [64, 40]
another_value_property_product_id s64 [1, 2]
another_value_property_width_cm s64 [43, 36]
another_value_property_height_cm s64 [41, 21]
another_value_property_length_cm s64 [65, 42]
property_value_product_id s64 [1, 2]
property_value_width_cm s64 [42, 35]
property_value_height_cm s64 [40, 20]
property_value_length_cm s64 [64, 40]
another_value_product_id s64 [1, 2]
another_value_width_cm s64 [43, 36]
another_value_height_cm s64 [41, 21]
another_value_length_cm s64 [65, 42]
>

## Grouped examples
Expand Down Expand Up @@ -6063,7 +6062,7 @@ defmodule Explorer.DataFrame do
Basic example:

iex> df = Explorer.DataFrame.new(a: [1, 2, 3], b: ["x", "y", "y"])
iex> Explorer.DataFrame.sql(df, "select a, b from df group by b order by b")
iex> Explorer.DataFrame.sql(df, "select ARRAY_AGG(a), b from df group by b order by b")
#Explorer.DataFrame<
Polars[2 x 2]
a list[s64] [[1], [2, 3]]
Expand Down
38 changes: 22 additions & 16 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Explorer.PolarsBackend.DataFrame do

with {:ok, df_result} <- adbc_result,
{:ok, df} <- df_result,
do: {:ok, Shared.create_dataframe(df)}
do: Shared.create_dataframe(df)
end

@impl true
Expand Down Expand Up @@ -122,7 +122,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -195,6 +195,11 @@ defmodule Explorer.PolarsBackend.DataFrame do

{columns, with_projection} = column_names_or_projection(columns)

dtypes_list =
if not Enum.empty?(dtypes) do
Map.to_list(dtypes)
end

df =
Native.df_load_csv(
contents,
Expand All @@ -207,15 +212,15 @@ defmodule Explorer.PolarsBackend.DataFrame do
delimiter,
true,
columns,
Map.to_list(dtypes),
dtypes_list,
encoding,
nil_values,
parse_dates,
char_byte(eol_delimiter)
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -242,7 +247,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def from_ndjson(%Local.Entry{} = entry, infer_schema_length, batch_size) do
case Native.df_from_ndjson(entry.path, infer_schema_length, batch_size) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -274,7 +279,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def load_ndjson(contents, infer_schema_length, batch_size) when is_binary(contents) do
case Native.df_load_ndjson(contents, infer_schema_length, batch_size) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -284,7 +289,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
# We first read using a lazy dataframe, then we collect.
with {:ok, ldf} <- Native.lf_from_parquet_cloud(entry, max_rows, columns),
{:ok, df} <- Native.lf_compute(ldf) do
{:ok, Shared.create_dataframe(df)}
Shared.create_dataframe(df)
end
end

Expand Down Expand Up @@ -316,7 +321,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
)

case df do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -370,7 +375,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def load_parquet(contents) when is_binary(contents) do
case Native.df_load_parquet(contents) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -394,7 +399,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_from_ipc(entry.path, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -428,7 +433,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_load_ipc(contents, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -452,7 +457,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_from_ipc_stream(entry.path, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -486,7 +491,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
{columns, projection} = column_names_or_projection(columns)

case Native.df_load_ipc_stream(contents, columns, projection) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:ok, df} -> Shared.create_dataframe(df)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -556,7 +561,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
list = Enum.map(list, & &1.data)

Shared.apply(:df_from_series, [list])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

defp to_column_name!(column_name) when is_binary(column_name), do: column_name
Expand Down Expand Up @@ -669,7 +674,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
@impl true
def nil_count(%DataFrame{} = df) do
Shared.apply(:df_nil_count, [df.data])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

@impl true
Expand Down Expand Up @@ -707,6 +712,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
expressions,
directions,
maintain_order?,
multithreaded?,
nulls_last?,
df.groups
])
Expand Down Expand Up @@ -811,7 +817,7 @@ defmodule Explorer.PolarsBackend.DataFrame do
values_from,
names_prefix_optional
])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

@impl true
Expand Down
21 changes: 7 additions & 14 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
)

case result do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -204,15 +204,15 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_parquet(%S3.Entry{} = entry, max_rows, columns, _rechunk) do
case Native.lf_from_parquet_cloud(entry, max_rows, columns) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

@impl true
def from_parquet(%Local.Entry{} = entry, max_rows, columns, _rechunk) do
case Native.lf_from_parquet(entry.path, max_rows, columns) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -226,7 +226,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_ndjson(%Local.Entry{} = entry, infer_schema_length, batch_size) do
case Native.lf_from_ndjson(entry.path, infer_schema_length, batch_size) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand All @@ -240,7 +240,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
@impl true
def from_ipc(%Local.Entry{} = entry, columns) when is_nil(columns) do
case Native.lf_from_ipc(entry.path) do
{:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)}
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end
Expand Down Expand Up @@ -364,15 +364,8 @@ defmodule Explorer.PolarsBackend.LazyFrame do
end

@impl true
def to_parquet(%DF{} = ldf, %S3.Entry{} = entry, {compression, level}, _streaming = true) do
case Native.lf_to_parquet_cloud(
ldf.data,
entry,
Shared.parquet_compression(compression, level)
) do
{:ok, _} -> :ok
{:error, error} -> {:error, RuntimeError.exception(error)}
end
def to_parquet(%DF{} = _ldf, %S3.Entry{} = _entry, {_compression, _level}, _streaming = true) do
raise "streaming of a lazy frame to the cloud using parquet is currently unavailable. Please try again disabling the `:streaming` option."
end

@impl true
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ defmodule Explorer.PolarsBackend.Native do
_expressions,
_directions,
_maintain_order?,
_multithreaded?,
_nulls_last?,
_groups
),
Expand Down
13 changes: 10 additions & 3 deletions lib/explorer/polars_backend/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ defmodule Explorer.PolarsBackend.Series do

def frequencies(%Series{} = series) do
Shared.apply(:s_frequencies, [series.data])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
|> DataFrame.rename(["values", "counts"])
end

Expand All @@ -534,7 +534,7 @@ defmodule Explorer.PolarsBackend.Series do
category_label
) do
{:ok, polars_df} ->
Shared.create_dataframe(polars_df)
Shared.create_dataframe!(polars_df)

{:error, "Polars Error: lengths don't match: " <> _rest} ->
raise ArgumentError, "lengths don't match: labels count must equal bins count"
Expand All @@ -553,7 +553,7 @@ defmodule Explorer.PolarsBackend.Series do
break_point_label,
category_label
])
|> Shared.create_dataframe()
|> Shared.create_dataframe!()
end

# Window
Expand Down Expand Up @@ -596,6 +596,13 @@ defmodule Explorer.PolarsBackend.Series do
end

defp window_function(operation, series, window_size, weights, min_periods, center) do
series =
if List.wrap(weights) == [] do
series
else
cast(series, {:f, 64})
end

Shared.apply_series(series, operation, [window_size, weights, min_periods, center])
end

Expand Down
Loading
Loading