Skip to content

Commit

Permalink
feat: streaming for s3select
Browse files Browse the repository at this point in the history
temp changes.
ex-aws/ex_aws_s3#23
  • Loading branch information
avinayak committed Nov 13, 2023
1 parent 476e3f0 commit 3217eaf
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 20 deletions.
15 changes: 14 additions & 1 deletion lib/ex_aws.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,20 @@ defmodule ExAws do
@impl ExAws.Behaviour
@spec stream!(ExAws.Operation.t(), keyword) :: Enumerable.t()
def stream!(op, config_overrides \\ []) do
ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides))
case ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) do
{:ok, result} ->
result

%Stream{} = result ->
result

error ->
raise ExAws.Error, """
ExAws Stream Request Error!
#{inspect(error)}
"""
end
end

@doc false
Expand Down
4 changes: 2 additions & 2 deletions lib/ex_aws/behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule ExAws.Behaviour do
@callback request!(ExAws.Operation.t(), Keyword.t()) :: term | no_return

@doc "See `ExAws.stream!/2`."
@callback stream!(ExAws.Operation.t()) :: Enumerable.t()
@callback stream!(ExAws.Operation.t()) :: Enumerable.t() | {:ok, term} | {:error, term}

@doc "See `ExAws.stream!/2`."
@callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t()
@callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() | {:ok, term} | {:error, term}
end
26 changes: 21 additions & 5 deletions lib/ex_aws/operation/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ defmodule ExAws.Operation.S3 do

defimpl ExAws.Operation do
def perform(operation, config) do
{operation, config, url, body, headers, http_method} =
build_request_params(operation, config)

ExAws.Request.request(http_method, url, body, headers, config, operation.service)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
end

def stream!(%{stream_builder: :octet_stream} = operation, config) do
{operation, config, url, body, headers, http_method} =
build_request_params(operation, config)

ExAws.Request.request(http_method, url, body, headers, config, operation.service, true)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
end

def stream!(%{stream_builder: fun}, config), do: fun.(config)

def build_request_params(operation, config) do
body = operation.body
headers = operation.headers
http_method = operation.http_method
Expand All @@ -37,13 +57,9 @@ defmodule ExAws.Operation.S3 do
|> put_content_length_header(body, http_method)
|> Map.to_list()

ExAws.Request.request(http_method, url, body, headers, config, operation.service)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
{operation, config, url, body, headers, http_method}
end

def stream!(%{stream_builder: fun}, config), do: fun.(config)

defp put_content_length_header(headers, "", :get), do: headers

defp put_content_length_header(headers, body, _) do
Expand Down
44 changes: 35 additions & 9 deletions lib/ex_aws/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,39 @@ defmodule ExAws.Request do
@type error_t :: {:error, {:http_error, http_status, binary}}
@type response_t :: success_t | error_t

def request(http_method, url, data, headers, config, service) do
def request(http_method, url, data, headers, config, service, stream \\ false) do
body =
case data do
[] -> "{}"
d when is_binary(d) -> d
_ -> config[:json_codec].encode!(data)
end

request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1})
request_and_retry(http_method, url, service, config, headers, body, stream, {:attempt, 1})
end

def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}),
do: {:error, reason}

def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do
def request_and_retry(
_method,
_url,
_service,
_config,
_headers,
_req_body,
_stream,
{:error, reason}
),
do: {:error, reason}

def request_and_retry(
method,
url,
service,
config,
headers,
req_body,
stream,
{:attempt, attempt}
) do
full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body)

with {:ok, full_headers} <- full_headers do
Expand All @@ -35,7 +53,7 @@ defmodule ExAws.Request do
)
end

case do_request(config, method, safe_url, req_body, full_headers, attempt, service) do
case do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do
{:ok, %{status_code: status} = resp} when status in 200..299 or status == 304 ->
{:ok, resp}

Expand All @@ -53,6 +71,7 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)

Expand All @@ -71,6 +90,7 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)

Expand All @@ -86,13 +106,14 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)
end
end
end

defp do_request(config, method, safe_url, req_body, full_headers, attempt, service) do
defp do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do
telemetry_event = Map.get(config, :telemetry_event, [:ex_aws, :request])
telemetry_options = Map.get(config, :telemetry_options, [])

Expand All @@ -111,7 +132,8 @@ defmodule ExAws.Request do
safe_url,
req_body,
full_headers,
Map.get(config, :http_opts, [])
Map.get(config, :http_opts, []),
stream
)
|> maybe_transform_response()

Expand Down Expand Up @@ -211,6 +233,10 @@ defmodule ExAws.Request do
|> :timer.sleep()
end

def maybe_transform_response({:ok, %{status: status, stream: stream, headers: headers}}) do
{:ok, %{status_code: status, stream: stream, headers: headers}}
end

def maybe_transform_response({:ok, %{status: status, body: body, headers: headers}}) do
# Req and Finch use status (rather than status_code) as a key.
{:ok, %{status_code: status, body: body, headers: headers}}
Expand Down
39 changes: 36 additions & 3 deletions lib/ex_aws/request/hackney.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,52 @@ defmodule ExAws.Request.Hackney do

@default_opts [recv_timeout: 30_000]

def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts)
opts = http_opts ++ [:with_body | opts]
def request(method, url, body \\ "", headers \\ [], http_opts \\ [], stream \\ false) do
opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) ++ http_opts

opts =
if stream do
opts
else
[:with_body | opts]
end

case :hackney.request(method, url, headers, body, opts) do
{:ok, status, headers} ->
{:ok, %{status_code: status, headers: headers}}

{:ok, status, headers, client} when is_reference(client) ->
stream =
Stream.resource(
fn -> client end,
&continue_stream/1,
&finish_stream/1
)

{:ok, %{status_code: status, headers: headers, stream: stream}}

{:ok, status, headers, body} ->
{:ok, %{status_code: status, headers: headers, body: body}}

{:error, reason} ->
{:error, %{reason: reason}}
end
end

defp continue_stream(client) do
case :hackney.stream_body(client) do
{:ok, data} ->
{[data], client}

:done ->
{:halt, client}

{:error, reason} ->
raise reason
end
end

defp finish_stream(client) do
:hackney.close(client)
end
end
1 change: 1 addition & 0 deletions lib/ex_aws/request/http_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ defmodule ExAws.Request.HttpClient do
) ::
{:ok, %{status_code: pos_integer, headers: any}}
| {:ok, %{status_code: pos_integer, headers: any, body: binary}}
| {:ok, %{status_code: pos_integer, headers: any, stream: Enumerable.t()}}
| {:error, %{reason: any}}
end

0 comments on commit 3217eaf

Please sign in to comment.