Skip to content

Commit

Permalink
Fix FSS.S3 support when bucket is not provided
Browse files Browse the repository at this point in the history
This makes Explorer treat the "endpoint" as already containing the bucket
name, and force the virtual host style in order to make Polars ignore the
bucket name.

Since FSS is always building the correct endpoint in case the user does not
provide, we changed the endpoint to be required in the Rust struct.

This work is related to elixir-explorer/fss#2
  • Loading branch information
philss committed Aug 19, 2023
1 parent c7c253a commit ae768c0
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 20 deletions.
12 changes: 8 additions & 4 deletions lib/explorer/fss/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ defimpl Explorer.FSS, for: FSS.S3.Entry do
end

defp url(%FSS.S3.Entry{} = entry) do
config = entry.config

uri = URI.parse(config.endpoint)

uri =
if endpoint = entry.config.endpoint do
URI.parse(endpoint)
if not is_nil(config.bucket) and not String.contains?(config.endpoint, config.bucket) do
append_path(uri, "/" <> config.bucket)
else
URI.parse("https://s3.#{entry.config.region}.amazonaws.com")
uri
end

uri
|> append_path("/" <> entry.bucket <> "/" <> entry.key)
|> append_path("/" <> entry.key)
|> URI.to_string()
end

Expand Down
4 changes: 3 additions & 1 deletion lib/explorer/polars_backend/shared.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ defmodule Explorer.PolarsBackend.Shared do
the `System.tmp_dir()`.
"""
def build_path_for_entry(%FSS.S3.Entry{} = entry) do
bucket = entry.config.bucket || "default-explorer-bucket"

hash =
:crypto.hash(:sha256, entry.bucket <> "/" <> entry.key) |> Base.url_encode64(padding: false)
:crypto.hash(:sha256, bucket <> "/" <> entry.key) |> Base.url_encode64(padding: false)

id = "s3-file-#{hash}"

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule Explorer.MixProject do
[
{:aws_signature, "~> 0.3"},
{:castore, "~> 1.0"},
{:fss, github: "elixir-explorer/fss"},
{:fss, github: "elixir-explorer/fss", branch: "ps-add-parse-bucket-url"},
{:rustler_precompiled, "~> 0.6"},
{:table, "~> 0.1.2"},
{:adbc, "~> 0.1", optional: true},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.33", "3c3fd9673bb5dcc9edc28dd90f50c87ce506d1f71b70e3de69aa8154bc695d44", [:mix], [], "hexpm", "2d526833729b59b9fdb85785078697c72ac5e5066350663e5be6a1182da61b8f"},
"elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"},
"ex_doc": {:hex, :ex_doc, "0.30.3", "bfca4d340e3b95f2eb26e72e4890da83e2b3a5c5b0e52607333bf5017284b063", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "fbc8702046c1d25edf79de376297e608ac78cdc3a29f075484773ad1718918b6"},
"fss": {:git, "https://github.com/elixir-explorer/fss.git", "417a1b6716c89d6a59cae01340ed0aff7e5b1164", []},
"fss": {:git, "https://github.com/elixir-explorer/fss.git", "9154e1d66a34057d228e88bd4483f4e527ce4919", [branch: "ps-add-parse-bucket-url"]},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
Expand Down
27 changes: 20 additions & 7 deletions native/explorer/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,26 @@ fn build_aws_s3_cloud_writer(
) -> Result<crate::cloud_writer::CloudWriter, ExplorerError> {
let config = ex_entry.config;
let mut aws_builder = object_store::aws::AmazonS3Builder::new()
.with_region(config.region)
.with_bucket_name(ex_entry.bucket)
.with_access_key_id(config.access_key_id)
.with_secret_access_key(config.secret_access_key);

if let Some(endpoint) = config.endpoint {
aws_builder = aws_builder.with_allow_http(true).with_endpoint(endpoint);
.with_region(&config.region)
.with_access_key_id(&config.access_key_id)
.with_secret_access_key(&config.secret_access_key)
.with_allow_http(true)
.with_endpoint(&config.endpoint);

if let Some(bucket_name) = &config.bucket {
aws_builder = aws_builder.with_bucket_name(bucket_name);

// If the bucket is already in the endpoint, we force the "virtual-hosted"
// style in order to make Polars ignore the bucket name.
if config.endpoint.contains(bucket_name) {
aws_builder = aws_builder.with_virtual_hosted_style_request(true);
}
} else {
// This bucket name is going to be ignored because it's assumed to be
// already in the endpoint URL.
aws_builder = aws_builder
.with_bucket_name("explorer-default-bucket-name")
.with_virtual_hosted_style_request(true);
}

if let Some(token) = config.token {
Expand Down
27 changes: 21 additions & 6 deletions native/explorer/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,21 +421,25 @@ pub struct ExS3Config {
pub access_key_id: String,
pub secret_access_key: String,
pub region: String,
pub endpoint: Option<String>,
pub endpoint: String,
pub bucket: Option<String>,
pub token: Option<String>,
}

#[derive(NifStruct, Clone, Debug)]
#[module = "FSS.S3.Entry"]
pub struct ExS3Entry {
pub bucket: String,
pub key: String,
pub config: ExS3Config,
}

impl fmt::Display for ExS3Entry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "s3://{}/{}", self.bucket, self.key)
if let Some(bucket_name) = &self.config.bucket {
write!(f, "s3://{}/{}", bucket_name, self.key)
} else {
write!(f, "s3://default-explorer-bucket/{}", self.key)
}
}
}

Expand All @@ -450,13 +454,24 @@ impl ExS3Config {
(S3Key::from_str("aws_allow_http").unwrap(), &true_as_string),
];

if let Some(endpoint) = &self.endpoint {
aws_opts.push((S3Key::Endpoint, endpoint))
}
aws_opts.push((S3Key::Endpoint, &self.endpoint));

if let Some(token) = &self.token {
aws_opts.push((S3Key::Token, token))
}

// When bucket is not present, we need to force the virtual host style
// in order to ignore the bucket name.
// We also enforce the host style because we already put the bucket when
// is an AWS S3 endpoint.
if self.bucket.is_none() || self.is_aws_endpoint() {
aws_opts.push((S3Key::VirtualHostedStyleRequest, &true_as_string));
}

CloudOptions::default().with_aws(aws_opts)
}

pub fn is_aws_endpoint(&self) -> bool {
self.endpoint.contains("amazonaws.com")
}
}
21 changes: 21 additions & 0 deletions test/explorer/data_frame/csv_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,27 @@ defmodule Explorer.DataFrame.CSVTest do

assert {:error, "no such file or directory"} = DF.from_csv(path, config: s3_config)
end

@tag :cloud_integration
test "writes a CSV file to endpoint ignoring bucket name", %{df: df} do
config = %FSS.S3.Config{
access_key_id: "test",
secret_access_key: "test",
endpoint: "http://localhost:4566/test-bucket",
bucket: nil,
region: "us-east-1"
}

entry = %FSS.S3.Entry{
key: "wine-yolo-#{System.monotonic_time()}.csv",
config: config
}

assert :ok = DF.to_csv(df, entry)

saved_df = DF.from_csv!(entry)
assert DF.to_columns(saved_df) == DF.to_columns(Explorer.Datasets.wine())
end
end

describe "from_csv/2 - HTTP" do
Expand Down

0 comments on commit ae768c0

Please sign in to comment.