Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read parquet files from AWS S3 #652

Merged
merged 30 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
df6791f
Add token attr to FSS.S3.Config
philss Jul 13, 2023
385dba5
Add necessary deps for using S3 in our crate project
philss Jul 13, 2023
a609e4e
Add FSS.S3.Entry.parse/2 to parse URLs
philss Jul 17, 2023
95bdfb8
WIP: implement integration between the Elixir and Rust code
philss Jul 18, 2023
a509f18
Apply suggestions from code review
philss Jul 18, 2023
982b450
Remove support for S3 URLs that are http(s)
philss Jul 18, 2023
382c4ea
Handle config with better defaults and errors
philss Jul 19, 2023
184a71a
Change endpoint to be optional
philss Jul 19, 2023
27c0cdf
Use streaming and disable CSE to make polars fetch from S3
philss Jul 19, 2023
21ce7b1
Fix test case
philss Jul 19, 2023
57702d6
Fix test case
philss Jul 19, 2023
d4314f9
Remove bypass and simplify wip test
philss Jul 19, 2023
b8b157f
Create a ExUnit tag for cloud integration
philss Jul 19, 2023
d689eef
Apply suggestion from PR review
philss Jul 20, 2023
3e81696
Use "bucket" instead of "host"
philss Jul 20, 2023
3d8eb4e
Rebuild Cargo.lock
philss Jul 20, 2023
049a3a8
Add integration test for "from_parquet/2" lazy from S3
philss Jul 20, 2023
c09f690
Try to run directly from the script
philss Jul 20, 2023
b64cb86
Wait before trying to access localstack
philss Jul 20, 2023
b747c91
Add region as suggested in https://florian.ec/blog/github-actions-aws…
philss Jul 20, 2023
9df08b2
Add the ec2 mock container
philss Jul 20, 2023
5a393b0
Using export
philss Jul 20, 2023
299af28
Improve usage of script
philss Jul 20, 2023
da33589
Implement "from_parquet/2" reading from s3 for eager DF
philss Jul 20, 2023
1a3ca29
Select columns in "from_parquet"
philss Jul 20, 2023
a16e958
Minor docs improvement
philss Jul 20, 2023
7ae158f
Add one more test case
philss Jul 20, 2023
a3969cf
Apply suggestions from code review [ci skip]
philss Jul 21, 2023
f8830eb
Improve errors on FSS
philss Jul 21, 2023
f1b3711
Add instructions to the README
philss Jul 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ jobs:
- name: Compile once again but without optional deps
run: mix compile --force --warnings-as-errors --no-optional-deps

- name: Run cloud integration tests
run: |
mix localstack.setup
mix test --only cloud_integration

format:
runs-on: ubuntu-latest
name: mix format
Expand Down
24 changes: 12 additions & 12 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,18 @@ defmodule Explorer.DataFrame do
# path to a file in disk
Explorer.DataFrame.from_parquet("/path/to/file.parquet")

# path to a URL schema (with optional configuration) (coming soon)
Explorer.DataFrame.from_parquet("s3://bucket/file.parquet", config: FSS.S3.config(credentials))
# path to a URL schema (with optional configuration)
Explorer.DataFrame.from_parquet("s3://bucket/file.parquet", config: FSS.S3.Entry.config_from_system_env())
philss marked this conversation as resolved.
Show resolved Hide resolved

# path to a filesystem specification entry (coming soon)
Explorer.DataFrame.from_parquet(FSS.S3.entry("s3://bucket/file.parquet", credentials))
# it's possible to configure using keyword lists
Explorer.DataFrame.from_parquet("s3://bucket/file.parquet", config: [access_key_id: "my-key", secret_access_key: "my-secret"])

# a FSS entry with its config inside
Explorer.DataFrame.from_parquet(%FSS.S3.Entry{config: %FSS.S3.Config{}})
philss marked this conversation as resolved.
Show resolved Hide resolved

The `:config` option of `from_*` functions is only required if the filename is a path
to a remote resource. In case it's a FSS entry, the requirement is that the config is passed
inside the entry struct.

## Selecting columns and access

Expand Down Expand Up @@ -719,8 +726,7 @@ defmodule Explorer.DataFrame do
defp normalise_entry(%S3.Entry{config: %S3.Config{}} = entry, nil), do: {:ok, entry}

defp normalise_entry("s3://" <> _rest = entry, config) do
config = s3_config(config)
{:ok, %S3.Entry{url: entry, config: config}}
S3.Entry.parse(entry, config: config)
philss marked this conversation as resolved.
Show resolved Hide resolved
end

defp normalise_entry("file://" <> path, _config), do: {:ok, %Local.Entry{path: path}}
Expand All @@ -729,12 +735,6 @@ defmodule Explorer.DataFrame do
{:ok, %Local.Entry{path: filepath}}
end

defp s3_config(%S3.Config{} = config), do: config

defp s3_config(other) do
raise ArgumentError, "expected a valid FSS.S3.config/1 in :config, got: #{inspect(other)}"
end

@doc """
Similar to `from_parquet/2` but raises if there is a problem reading the Parquet file.
"""
Expand Down
8 changes: 6 additions & 2 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ defmodule Explorer.PolarsBackend.DataFrame do
end

@impl true
def from_parquet(%S3.Entry{}, _max_rows, _columns) do
raise "S3 is not supported yet"
def from_parquet(%S3.Entry{} = entry, max_rows, columns) do
# We first read using a lazy dataframe, then we collect.
with {:ok, ldf} <- Native.lf_from_parquet_cloud(entry, max_rows, columns),
{:ok, df} <- Native.lf_collect(ldf) do
{:ok, Shared.create_dataframe(df)}
end
end

@impl true
Expand Down
15 changes: 6 additions & 9 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,16 @@ defmodule Explorer.PolarsBackend.LazyFrame do
defp char_byte(<<char::utf8>>), do: char

@impl true
def from_parquet(%S3.Entry{}, _max_rows, _columns) do
raise "S3 is not supported yet"
def from_parquet(%S3.Entry{} = entry, max_rows, columns) do
case Native.lf_from_parquet_cloud(entry, max_rows, columns) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:error, error} -> {:error, error}
end
end

@impl true
def from_parquet(%Local.Entry{} = entry, max_rows, columns) do
if columns do
raise ArgumentError,
"`columns` is not supported by Polars' lazy backend. " <>
"Consider using `select/2` after reading the parquet file"
end

case Native.lf_from_parquet(entry.path, max_rows) do
case Native.lf_from_parquet(entry.path, max_rows, columns) do
{:ok, df} -> {:ok, Shared.create_dataframe(df)}
{:error, error} -> {:error, error}
end
Expand Down
3 changes: 2 additions & 1 deletion lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ defmodule Explorer.PolarsBackend.Native do
def lf_slice(_df, _offset, _length), do: err()
def lf_from_ipc(_filename), do: err()
def lf_from_ndjson(_filename, _infer_schema_length, _batch_size), do: err()
def lf_from_parquet(_filename, _stop_after_n_rows), do: err()
def lf_from_parquet(_filename, _stop_after_n_rows, _maybe_columns), do: err()
def lf_from_parquet_cloud(_ex_s3_entry, _stop_after_n_rows, _maybe_columns), do: err()

def lf_from_csv(
_filename,
Expand Down
82 changes: 77 additions & 5 deletions lib/fss.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,91 @@ defmodule FSS do

defmodule S3 do
defmodule Config do
defstruct [:region, :access_key_id, :secret_access_key]
defstruct [
:access_key_id,
:region,
:secret_access_key,
:endpoint,
:token
]

@type t :: %__MODULE__{
region: String.t(),
access_key_id: String.t(),
secret_access_key: String.t()
region: String.t(),
secret_access_key: String.t(),
endpoint: String.t() | nil,
token: String.t() | nil
}
end

defmodule Entry do
defstruct [:url, :config]
defstruct [:bucket, :key, :port, :config]

@type t :: %__MODULE__{
bucket: String.t(),
key: String.t(),
port: pos_integer(),
config: Config.t()
}

def config_from_system_env() do
philss marked this conversation as resolved.
Show resolved Hide resolved
%FSS.S3.Config{
access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
region: System.get_env("AWS_REGION", System.get_env("AWS_DEFAULT_REGION")),
token: System.get_env("AWS_SESSION_TOKEN")
}
philss marked this conversation as resolved.
Show resolved Hide resolved
end

def parse(url, opts \\ []) do
opts = Keyword.validate!(opts, config: nil)

uri = URI.parse(url)

case uri do
%{scheme: "s3", host: bucket, path: "/" <> key} when is_binary(bucket) ->
config =
opts
|> Keyword.fetch!(:config)
|> case do
nil ->
config_from_system_env()

%Config{} = config ->
config

config when is_list(config) or is_map(config) ->
struct!(config_from_system_env(), config)

other ->
raise ArgumentError,
"expect configuration to be a %FSS.S3.Config{} struct, a keyword list or a map. Instead got #{inspect(other)}"
end
|> validate_config!()

{:ok, %__MODULE__{bucket: bucket, key: key, port: uri.port, config: config}}

_ ->
{:error,
ArgumentError.exception(
"expected s3://<bucket>/<key> URL, got: " <>
URI.to_string(uri)
)}
end
end

defp validate_config!(%Config{} = config) do
access = config.access_key_id
secret = config.secret_access_key
region = config.region

if is_nil(access) or is_nil(secret) or is_nil(region) or
access == "" or secret == "" or region == "" do
raise "missing configuration keys or region to access the S3 API"
end
philss marked this conversation as resolved.
Show resolved Hide resolved

@type t :: %__MODULE__{url: String.t(), config: Config.t()}
config
end
end
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ defmodule Explorer.MixProject do
package: package(),
deps: deps(),
docs: docs(),
preferred_cli_env: [ci: :test],
preferred_cli_env: [ci: :test, "localstack.setup": :test],
aliases: [
"rust.lint": ["cmd cargo clippy --manifest-path=native/explorer/Cargo.toml -- -Dwarnings"],
"rust.fmt": ["cmd cargo fmt --manifest-path=native/explorer/Cargo.toml --all"],
"localstack.setup": ["cmd ./test/support/setup-localstack.sh"],
ci: ["format", "rust.fmt", "rust.lint", "test"]
]
]
Expand Down
Loading