Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SelectObjectContentRequest #236

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
295f223
feat: SelectObjectContentRequest
avinayak Nov 5, 2023
be7889e
Added parsing for AWS EventStream messages, Implemented EventStream m…
avinayak Nov 7, 2023
8df057e
Refactored parser function from S3 to EventStream
avinayak Nov 7, 2023
07636bd
Changed stream builder from event_stream to octet_stream
avinayak Nov 7, 2023
610b09e
Refactor ExAws.S3 module to use
avinayak Nov 8, 2023
13bf801
Refactor ExAws.S3.SelectObjectContents to use
avinayak Nov 8, 2023
98a41f9
Delete S3Lister module and unused function calls
avinayak Nov 8, 2023
a2c265f
Add support for S3 SelectObjectContent API and
avinayak Nov 9, 2023
649110e
Add type definition for SelectObjectContents
avinayak Nov 9, 2023
861bf58
Add module documentation for SelectObjectContents
avinayak Nov 9, 2023
14238a5
Add to_xml/1 function to Utils module
avinayak Nov 9, 2023
555b2f4
Refactor ExAws.S3.Parsers.EventStream and Prelude
avinayak Nov 9, 2023
bde09da
Refactor code for improved readability and
avinayak Nov 9, 2023
f7174f9
Refactor S3 event stream header test and delete S3
avinayak Nov 9, 2023
2a83400
Fix SelectObjectContents typo and remove unused
avinayak Nov 9, 2023
7cbb0f7
Add error logging to EventStream parser
avinayak Nov 9, 2023
0228265
Refactored S3 EventStream parser for improved error handling, Updated…
avinayak Nov 14, 2023
f1b1e82
Refactored buffer_stream function for better code readability
avinayak Nov 14, 2023
770fd04
Updated buffer_stream logic in ExAws.SParsers.EventStream
avinayak Nov 14, 2023
3931220
Refactored buffer_stream method in EventStream module
avinayak Nov 15, 2023
6a5d3c5
Updated comments and removed unneeded code in event stream parser
avinayak Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions lib/ex_aws/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,113 @@ defmodule ExAws.S3 do
}
end

@type select_object_content_opts :: [
{:input_serialization, input_serialization}
| {:output_serialization, output_serialization}
| {:scan_range, scan_range}
]

@type input_serialization ::
%{csv_input: csv_input} | %{json_input: json_input} | %{parquet_input: %{}}

@type csv_input :: %{
file_header_info: :none | :ignore | :use,
comments: binary,
quote_escape_character: binary,
record_delimiter: binary,
field_delimiter: binary,
quote_character: binary,
allow_quoted_record_delimiter: boolean
}
@type json_input :: %{
type: :document | :lines
}

@type output_serialization ::
%{csv_output: csv_output} | %{json_output: json_output}

@type csv_output :: %{
quote_fields: :always | :as_needed,
quote_escape_character: binary,
record_delimiter: binary,
field_delimiter: binary,
quote_character: binary
}

@type json_output :: %{
record_delimiter: binary
}

@type scan_range :: %{start: pos_integer, end: pos_integer}

@doc """
Filters and selects the contents of an Amazon S3 object based on an SQL statement.

## Options
* `:input_serialization` - Specifies JSON, CSV, or Parquet as the input serialization format.
each of which has corresponding parameters to describe the format of the object to be retrieved.
* `:output_serialization` - Specifies JSON or CSV as the output serialization format.
Each of which has corresponding parameters to describe the format of the output data.
* `:scan_range` - Specifies the byte range of the object to get the records from.

More information can be found in the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html).


## Example
```
S3.select_object_content(
"my-bucket",
"path/to/file.csv",
"SELECT * FROM S3Object s WHERE s._1 = 'some value'",
input_serialization: %{
csv_input: %{
file_header_info: :use,
record_delimiter: "\\n",
field_delimiter: ",",
quote_character: "\\""
}
},
output_serialization: %{
csv_output: %{
record_delimiter: "\\n",
field_delimiter: ",",
quote_character: "\\""
}
}
) |> ExAws.stream!()
```

Note that **this won't start fetching anything immediately** since it returns an Elixir `Stream`.

### Streaming into a file
```
S3.select_object_content(
"my-bucket",
"path/to/file.csv",
"SELECT * FROM S3Object s WHERE s._1 = 'some value'"
) |> ExAws.stream!() |> Stream.into(File.stream!("output.csv"))
```
"""
@spec select_object_content(
bucket :: binary,
path :: binary,
query :: binary,
opts :: select_object_content_opts
) :: __MODULE__.SelectObjectContent.t()
def select_object_content(
bucket,
path,
query,
opts \\ []
) do
%__MODULE__.SelectObjectContent{
bucket: bucket,
path: path,
query: query,
opts: opts
}
end

@type upload_opt ::
{:max_concurrency, pos_integer}
| {:timeout, pos_integer}
Expand Down
113 changes: 113 additions & 0 deletions lib/ex_aws/s3/parsers/event_stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule ExAws.S3.Parsers.EventStream do
@moduledoc false

# Parses EventStream messages.

# AWS encodes EventStream messages in binary as follows:
# [ prelude ][ headers ][ payload ][ message-crc ]
# |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->|

# This module parses this information and returns a struct with the prelude, headers and payload.
# The prelude contains the total length of the message, the length of the headers,
# the length of the prelude, the CRC of the message, and the length of the payload.

# Additionally, this module buffers the stream and parses the messages as they come in.
# Also, stream is transformed such that each item is seperated by line breaks

# The headers are a map of header names to values.
# The payload is the actual message data.
# The message-crc is a CRC32 checksum of the message (excluding the message-crc itself).
# Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information.

alias ExAws.S3.Parsers.EventStream.Message
alias ExAws.S3.Parsers.EventStream.Prelude
alias ExAws.S3.Parsers.EventStream.Header
require Logger

defp buffer_stream(stream) do
Stream.transform(stream, {nil, <<>>}, &buffer_stream/2)
end

defp buffer_stream(chunk, {nil, buffer}) do
new_buffer = buffer <> chunk
{:ok, %Prelude{total_length: total_length} = prelude} = Prelude.parse(new_buffer)

if total_length == byte_size(new_buffer) do
{:ok, parsed_message} = parse_message(prelude, new_buffer)
{[parsed_message], {nil, <<>>}}
else
buffer_stream(chunk, {prelude, buffer})
end
end

defp buffer_stream(chunk, {%Prelude{total_length: total_length} = prelude, buffer}) do
new_buffer = buffer <> chunk
new_buffer_length = byte_size(new_buffer)

cond do
new_buffer_length < total_length ->
# needs more data. put it in the buffer and wait for more
{[], {prelude, new_buffer}}

new_buffer_length > total_length ->
# we have more than one message in the buffer. parse the first one and keep the rest in the buffer
<<payload::binary-size(total_length), remaining_buffer::binary>> = new_buffer
{:ok, parsed_message} = parse_message(prelude, payload)
{[parsed_message], {nil, remaining_buffer}}

new_buffer_length == total_length ->
# we have exactly one message in the buffer. parse it and clear the buffer
<<payload::binary-size(total_length), <<>>::binary>> =
new_buffer

{:ok, parsed_message} = parse_message(prelude, payload)
{[parsed_message], {nil, <<>>}}
end
end

defp chunk_stream_by_linebreaks(stream) do
Stream.transform(stream, "", fn
chunk, buffer ->
case String.split(buffer <> chunk, "\n") do
lines when length(lines) > 1 ->
last = Enum.at(lines, -1)
rest = Enum.slice(lines, 0..-2)
{rest, last}

[line] ->
{[], line}
end
end)
end

def parse_message(prelude, payload_bytes) do
with :ok <- Message.verify_message_crc(prelude, payload_bytes),
{:ok, headers} <- Header.parse(prelude, payload_bytes),
{:ok, payload} <- Message.parse_payload(prelude, payload_bytes) do
{:ok, %Message{prelude: prelude, payload: payload, headers: headers}}
end
end

def parse_raw_stream(
{:ok,
%{
stream: stream
}}
) do
stream
|> buffer_stream()
|> Stream.each(&Message.raise_errors!/1)
|> Stream.filter(&Message.is_record?/1)
|> Stream.map(&Message.get_payload/1)
|> chunk_stream_by_linebreaks()
end

def parse_raw_stream({:error, {:http_error, _, %{headers: _, status_code: _, stream: stream}}}) do
stream_error = Enum.into(stream, "")
raise "Error parsing stream: #{inspect(stream_error)}"
end

def parse_raw_stream({:error, error}) do
raise "Error parsing stream: #{inspect(error)}"
end
end
52 changes: 52 additions & 0 deletions lib/ex_aws/s3/parsers/event_stream/header.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule ExAws.S3.Parsers.EventStream.Header do
@moduledoc false

# Parses EventStream headers.

# AWS encodes EventStream headers as follows:

# [header-name-size][header-name][header-data-type][header-value-size][header-value-data]
# |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->|

# This module parses this information and returns a map of header names - values.
# header-data-type is always 0x07(String) for S3.

alias ExAws.S3.Parsers.EventStream.Prelude

def extract_headers(header_bytes) do
do_extract_headers(header_bytes, [])
end

defp do_extract_headers(<<>>, headers), do: Map.new(headers)

defp do_extract_headers(<<header_name_size, rest::binary>>, headers) do
<<header_name::binary-size(header_name_size), <<7>>, rest::binary>> = rest
<<value_size_binary::binary-size(2), rest::binary>> = rest

value_size = :binary.decode_unsigned(value_size_binary, :big)
<<value::binary-size(value_size), rest::binary>> = rest

do_extract_headers(rest, [{header_name, value} | headers])
end

defp extract_header_bytes(
%Prelude{
prelude_length: prelude_length,
headers_length: headers_length
},
payload_bytes
) do
<<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_length),
_payload::binary>> = payload_bytes

headers_bytes
end

def parse(
%Prelude{} = prelude,
payload_bytes
) do
headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers()
{:ok, headers}
end
end
63 changes: 63 additions & 0 deletions lib/ex_aws/s3/parsers/event_stream/message.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule ExAws.S3.Parsers.EventStream.Message do
@moduledoc false

# Parses EventStream messages. This module parses this information and returns
# a struct with the prelude, headers and payload. Also verifies the message CRC.

alias ExAws.S3.Parsers.EventStream.Prelude
import Bitwise
require Logger

defstruct prelude: nil,
headers: nil,
payload: nil

def verify_message_crc(
%Prelude{
crc: prelude_crc,
payload_length: payload_length,
headers_length: headers_length
},
payload_bytes
) do
<<_::binary-size(8), message_bytes::binary-size(payload_length + headers_length + 4),
message_crc_bytes::binary-size(4)>> = payload_bytes

message_crc = :binary.decode_unsigned(message_crc_bytes, :big)
computed_crc = prelude_crc |> :erlang.crc32(message_bytes) |> band(0xFFFFFFFF)

if computed_crc == message_crc do
:ok
else
{:error, :message_checksum_mismatch}
end
end

def parse_payload(
%Prelude{
prelude_length: prelude_length,
headers_length: headers_length,
payload_length: payload_length
},
payload_bytes
) do
<<_prelude_headers::binary-size(prelude_length + headers_length),
message_bytes::binary-size(payload_length), _::binary>> = payload_bytes

{:ok, message_bytes}
end

def is_record?(%__MODULE__{headers: headers}) do
Map.get(headers, ":event-type") == "Records"
end

def get_payload(%__MODULE__{payload: payload}) do
payload
end

def raise_errors!(%__MODULE__{headers: headers}) do
if Map.get(headers, ":message-type") == "error" do
raise "Error message received: #{inspect(headers)}"
end
end
end
Loading