From 295f2234e334afb21f56cde1db029f842277154d Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Sun, 5 Nov 2023 14:56:52 -0800 Subject: [PATCH 01/21] feat: SelectObjectContentRequest --- lib/ex_aws/s3.ex | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 7499f5f..1caa008 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -757,6 +757,34 @@ defmodule ExAws.S3 do } end + def select_object_content(bucket, object_key, query, _opts \\ []) do + payload = """ + + + #{query} + SQL + + + USE + \\n + , + + + + + \n + + + + """ + + params = %{"select" => "", "select-type" => "2"} + + request(:post, bucket, object_key, [body: payload, params: params], %{ + stream_builder: :event_stream + }) + end + @type upload_opt :: {:max_concurrency, pos_integer} | {:timeout, pos_integer} From be7889e2b7754c6243746db4b62201fa46808d56 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 13:50:01 -0800 Subject: [PATCH 02/21] Added parsing for AWS EventStream messages, Implemented EventStream message, headers, and payload parsing, Added new functions to parse EventStream messages --- lib/ex_aws/s3.ex | 15 +++- lib/ex_aws/s3/parsers/event_stream.ex | 29 +++++++ lib/ex_aws/s3/parsers/event_stream/header.ex | 51 +++++++++++ lib/ex_aws/s3/parsers/event_stream/message.ex | 65 ++++++++++++++ lib/ex_aws/s3/parsers/event_stream/prelude.ex | 87 +++++++++++++++++++ test/lib/s3/event_stream/header_test.exs | 45 ++++++++++ 6 files changed, 291 insertions(+), 1 deletion(-) create mode 100644 lib/ex_aws/s3/parsers/event_stream.ex create mode 100644 lib/ex_aws/s3/parsers/event_stream/header.ex create mode 100644 lib/ex_aws/s3/parsers/event_stream/message.ex create mode 100644 lib/ex_aws/s3/parsers/event_stream/prelude.ex create mode 100644 test/lib/s3/event_stream/header_test.exs diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 1caa008..832c013 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -781,10 +781,23 @@ defmodule ExAws.S3 do params = %{"select" => "", "select-type" => "2"} request(:post, bucket, object_key, [body: payload, params: params], %{ - stream_builder: :event_stream + stream_builder: :event_stream, + parser: &parse/1 }) end + def parse( + {:ok, + %{ + stream: stream + }} + ) do + stream + |> Stream.map(&Parsers.EventStream.parse_message/1) + |> Stream.filter(&Parsers.EventStream.Message.is_record?/1) + |> Stream.map(&Parsers.EventStream.Message.get_payload/1) + end + @type upload_opt :: {:max_concurrency, pos_integer} | {:timeout, pos_integer} diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex new file mode 100644 index 0000000..83f7f68 --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -0,0 +1,29 @@ +defmodule ExAws.S3.Parsers.EventStream do + @moduledoc """ + Parses EventStream messages. + + AWS encodes EventStream messages in binary as follows: + [ prelude ][ headers ][ payload ][ message-crc ] + |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| + + This module parses this information and returns a struct with the prelude, headers and payload. + + The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + + The headers are a map of header names to values. + + The payload is the actual message data. + + The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). + + Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. + """ + + alias ExAws.S3.Parsers.EventStream.Message + + def parse_message(chunk) do + with {:ok, message} <- Message.parse(chunk) do + message + end + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/header.ex b/lib/ex_aws/s3/parsers/event_stream/header.ex new file mode 100644 index 0000000..f1a9b4f --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/header.ex @@ -0,0 +1,51 @@ +defmodule ExAws.S3.Parsers.EventStream.Header do + @moduledoc """ + Parses EventStream headers. + + AWS encodes EventStream headers as follows: + + [header-name-size][header-name][header-data-type][header-value-size][header-value-data] + |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| + + This module parses this information and returns a map of header names - values. + header-data-type is always 0x07(String) for S3. + """ + alias ExAws.S3.Parsers.EventStream.Prelude + + def extract_headers(header_bytes) do + do_extract_headers(header_bytes, []) + end + + defp do_extract_headers(<<>>, headers), do: Map.new(headers) + + defp do_extract_headers(<>, headers) do + <>, rest::binary>> = rest + <> = rest + + value_size = :binary.decode_unsigned(value_size_binary, :big) + <> = rest + + do_extract_headers(rest, [{header_name, value} | headers]) + end + + defp extract_header_bytes( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_length), + _payload::binary>> = payload_bytes + + headers_bytes + end + + def parse( + %Prelude{} = prelude, + payload_bytes + ) do + headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers() + {:ok, headers} + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex new file mode 100644 index 0000000..07e5388 --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -0,0 +1,65 @@ +defmodule ExAws.S3.Parsers.EventStream.Message do + @moduledoc """ + Parses EventStream messages. This module parses this information and returns a struct with the prelude, headers and payload. Also verifies the message CRC. + """ + alias ExAws.S3.Parsers.EventStream.Prelude + alias ExAws.S3.Parsers.EventStream.Header + import Bitwise + + defstruct prelude: nil, + headers: nil, + payload: nil + + def verify_message_crc( + %Prelude{ + crc: prelude_crc, + payload_length: payload_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_::binary-size(8), message_bytes::binary-size(payload_length + headers_length + 4), + message_crc_bytes::binary-size(4)>> = payload_bytes + + message_crc = :binary.decode_unsigned(message_crc_bytes, :big) + computed_crc = prelude_crc |> :erlang.crc32(message_bytes) |> band(0xFFFFFFFF) + + if computed_crc == message_crc do + :ok + else + {:error, :message_checksum_mismatch} + end + end + + def parse_payload( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length, + payload_length: payload_length + }, + payload_bytes + ) do + <<_prelude_headers::binary-size(prelude_length + headers_length), + message_bytes::binary-size(payload_length), _::binary>> = payload_bytes + + {:ok, message_bytes} + end + + def is_record?(%__MODULE__{headers: headers}) do + Map.get(headers, ":event-type") == "Records" + end + + def get_payload(%__MODULE__{payload: payload}) do + payload + end + + def parse(chunk) do + with {:ok, prelude, payload_bytes} <- + Prelude.parse(chunk), + :ok <- verify_message_crc(prelude, payload_bytes), + {:ok, headers} <- Header.parse(prelude, payload_bytes), + {:ok, payload} <- parse_payload(prelude, payload_bytes) do + {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} + end + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/prelude.ex b/lib/ex_aws/s3/parsers/event_stream/prelude.ex new file mode 100644 index 0000000..9a5e96c --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/prelude.ex @@ -0,0 +1,87 @@ +defmodule ExAws.S3.Parsers.EventStream.Prelude do + @moduledoc """ + Parses EventStream preludes. + + AWS encodes EventStream preludes in binary as follows: + + [ total-length ][headers-length][ prelude crc ] + |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| + + This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + """ + import Bitwise + + defstruct total_length: nil, + headers_length: nil, + prelude_length: nil, + crc: nil, + payload_length: nil, + prelude_bytes: nil + + @prelude_length 12 + # 128 Kb + @max_header_length 128 * 1024 + # 16 Mb + @max_payload_length 16 * 1024 * 1024 + + defp unpack_prelude( + << + total_length_bytes::binary-size(4), + headers_length_bytes::binary-size(4), + crc::binary-size(4) + >> = prelude_bytes + ) do + total_length = :binary.decode_unsigned(total_length_bytes, :big) + headers_length = :binary.decode_unsigned(headers_length_bytes, :big) + crc = :binary.decode_unsigned(crc, :big) + + {:ok, + %__MODULE__{ + total_length: total_length, + headers_length: headers_length, + prelude_length: @prelude_length, + payload_length: total_length - @prelude_length - headers_length - 4, + crc: crc, + prelude_bytes: prelude_bytes + }} + end + + def validate_prelude( + %__MODULE__{ + headers_length: headers_length, + payload_length: payload_length + } = prelude + ) do + cond do + headers_length > @max_header_length -> + {:error, :invalid_headers_length} + + payload_length > @max_payload_length -> + {:error, :invalid_payload_length} + + true -> + {:ok, prelude} + end + end + + def validate_checksum( + <>, + prelude_checksum + ) do + computed_checksum = prelude_bytes_without_crc |> :erlang.crc32() |> band(0xFFFFFFFF) + + if computed_checksum == prelude_checksum do + :ok + else + {:error, :prelude_checksum_mismatch} + end + end + + def parse(<> = payload) do + with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), + {:ok, prelude} <- validate_prelude(unpacked_prelude), + :ok <- validate_checksum(prelude_bytes, unpacked_prelude.crc) do + {:ok, prelude, payload} + end + end +end diff --git a/test/lib/s3/event_stream/header_test.exs b/test/lib/s3/event_stream/header_test.exs new file mode 100644 index 0000000..a6a3727 --- /dev/null +++ b/test/lib/s3/event_stream/header_test.exs @@ -0,0 +1,45 @@ +defmodule ExAws.AuthTest do + use ExUnit.Case, async: true + + alias ExAws.S3.Parsers.EventStream.Header + + describe "Header.extract_headers/1" do + test "can extract Record type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 111, 99, + 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, 118, 101, 110, 116, + 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "application/octet-stream", + ":event-type" => "Records" + } + end + + test "can extract Stats type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, 11, 58, 101, 118, 101, 110, + 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "text/xml", + ":event-type" => "Stats" + } + end + + test "can extract End type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, + 0, 3, 69, 110, 100>> + ) == %{ + ":message-type" => "event", + ":event-type" => "End" + } + end + end +end From 8df057e0183a6bd0e668b9e731c04278cbdf253e Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 13:52:56 -0800 Subject: [PATCH 03/21] Refactored parser function from S3 to EventStream --- lib/ex_aws/s3.ex | 14 +------------- lib/ex_aws/s3/parsers/event_stream.ex | 14 +++++++++++++- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 832c013..ac28086 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -782,22 +782,10 @@ defmodule ExAws.S3 do request(:post, bucket, object_key, [body: payload, params: params], %{ stream_builder: :event_stream, - parser: &parse/1 + parser: &Parsers.EventStream.parse_raw_stream/1 }) end - def parse( - {:ok, - %{ - stream: stream - }} - ) do - stream - |> Stream.map(&Parsers.EventStream.parse_message/1) - |> Stream.filter(&Parsers.EventStream.Message.is_record?/1) - |> Stream.map(&Parsers.EventStream.Message.get_payload/1) - end - @type upload_opt :: {:max_concurrency, pos_integer} | {:timeout, pos_integer} diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 83f7f68..153d866 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -21,9 +21,21 @@ defmodule ExAws.S3.Parsers.EventStream do alias ExAws.S3.Parsers.EventStream.Message - def parse_message(chunk) do + defp parse_message(chunk) do with {:ok, message} <- Message.parse(chunk) do message end end + + def parse_raw_stream( + {:ok, + %{ + stream: stream + }} + ) do + stream + |> Stream.map(&parse_message/1) + |> Stream.filter(&Message.is_record?/1) + |> Stream.map(&Message.get_payload/1) + end end From 07636bd5ccf1d4071f9959df1ddea46175ce1f97 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 14:14:35 -0800 Subject: [PATCH 04/21] Changed stream builder from event_stream to octet_stream --- lib/ex_aws/s3.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index ac28086..25d159a 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -781,7 +781,7 @@ defmodule ExAws.S3 do params = %{"select" => "", "select-type" => "2"} request(:post, bucket, object_key, [body: payload, params: params], %{ - stream_builder: :event_stream, + stream_builder: :octet_stream, parser: &Parsers.EventStream.parse_raw_stream/1 }) end From 610b09ea101d8cb0dbf952f4492c2599249ec586 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 15:10:34 -0800 Subject: [PATCH 05/21] Refactor ExAws.S3 module to use SelectObjectContents struct and add XML serialization helpers --- lib/ex_aws/s3.ex | 48 +++--- lib/ex_aws/s3/parsers/event_stream/message.ex | 3 +- lib/ex_aws/s3/select_object_contents.ex | 161 ++++++++++++++++++ lib/ex_aws/s3/utils.ex | 19 +++ 4 files changed, 203 insertions(+), 28 deletions(-) create mode 100644 lib/ex_aws/s3/select_object_contents.ex diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 25d159a..65953fa 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -7,6 +7,7 @@ defmodule ExAws.S3 do import ExAws.S3.Utils alias ExAws.S3.Parsers + alias ExAws.S3.SelectObjectContents @type acl_opt :: {:acl, canned_acl} | grant @type acl_opts :: [acl_opt] @@ -757,33 +758,26 @@ defmodule ExAws.S3 do } end - def select_object_content(bucket, object_key, query, _opts \\ []) do - payload = """ - - - #{query} - SQL - - - USE - \\n - , - - - - - \n - - - - """ - - params = %{"select" => "", "select-type" => "2"} - - request(:post, bucket, object_key, [body: payload, params: params], %{ - stream_builder: :octet_stream, - parser: &Parsers.EventStream.parse_raw_stream/1 - }) + def select_object_content( + bucket, + path, + query, + input_serialization \\ %{ + csv: %{} + }, + output_serialization \\ %{ + csv: %{} + }, + scan_range \\ nil + ) do + %__MODULE__.SelectObjectContents{ + bucket: bucket, + path: path, + query: query, + input_serialization: input_serialization, + output_serialization: output_serialization, + scan_range: scan_range + } end @type upload_opt :: diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex index 07e5388..97efc17 100644 --- a/lib/ex_aws/s3/parsers/event_stream/message.ex +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -1,6 +1,7 @@ defmodule ExAws.S3.Parsers.EventStream.Message do @moduledoc """ - Parses EventStream messages. This module parses this information and returns a struct with the prelude, headers and payload. Also verifies the message CRC. + Parses EventStream messages. This module parses this information and returns + a struct with the prelude, headers and payload. Also verifies the message CRC. """ alias ExAws.S3.Parsers.EventStream.Prelude alias ExAws.S3.Parsers.EventStream.Header diff --git a/lib/ex_aws/s3/select_object_contents.ex b/lib/ex_aws/s3/select_object_contents.ex new file mode 100644 index 0000000..5b4e49a --- /dev/null +++ b/lib/ex_aws/s3/select_object_contents.ex @@ -0,0 +1,161 @@ +defmodule ExAws.S3.SelectObjectContents do + alias ExAws.S3.Utils + + @enforce_keys ~w(bucket path query)a + defstruct bucket: nil, + path: nil, + query: nil, + input_serialization: nil, + output_serialization: nil, + scan_range: nil, + opts: [], + service: :s3 + + # https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html + def input_params(%{ + csv: csv_input + }) do + csv_params = + %{ + file_header_info: "USE", + record_delimiter: "\n", + field_delimiter: "," + } + |> Map.merge(csv_input) + |> Utils.to_xml() + + "#{csv_params}" + end + + def input_params(%{ + json: json_input + }) do + json_params = + %{ + type: "DOCUMENT" + } + |> Map.merge(json_input) + |> Utils.to_xml() + + "#{json_params}" + end + + def input_params(%{ + parquet: _ + }) do + "" + end + + def output_params(%{ + csv: csv_output + }) do + csv_params = + %{ + record_delimiter: "\n", + field_delimiter: "," + } + |> Map.merge(csv_output) + |> Utils.to_xml() + + "#{csv_params}" + end + + def output_params(%{ + json: json_output + }) do + json_params = + %{ + record_delimiter: "\n" + } + |> Map.merge(json_output) + |> Utils.to_xml() + + "#{json_params}" + end + + def scan_range_params( + %{ + start: _range_start, + end: _range_end + } = scan_range_input + ) do + ExAws.S3.Utils.to_xml(%{ + scan_range_input: scan_range_input + }) + end + + def scan_range_params(_) do + "" + end + + def build_payload( + query, + input_serialization, + output_serialization, + scan_range + ) do + """ + + + #{query} + SQL + + #{input_params(input_serialization)} + + + #{output_params(output_serialization)} + + #{scan_range_params(scan_range)} + + """ + end + + defimpl ExAws.Operation do + alias ExAws.S3.SelectObjectContents + alias ExAws.S3.Parsers.EventStream + + def perform( + _, + _ + ) do + raise "Not implemented. Use stream! instead." + end + + def stream!( + %{ + bucket: bucket, + path: path, + query: query, + input_serialization: input_serialization, + output_serialization: output_serialization, + scan_range: scan_range + }, + config + ) do + payload = + SelectObjectContents.build_payload( + query, + input_serialization, + output_serialization, + scan_range + ) + + params = %{"select" => "", "select-type" => "2"} + + ExAws.stream!( + %ExAws.Operation.S3{ + http_method: :post, + bucket: bucket, + path: path, + body: payload, + headers: %{}, + resource: "", + params: params, + stream_builder: :octet_stream, + parser: &EventStream.parse_raw_stream/1 + }, + config + ) + end + end +end diff --git a/lib/ex_aws/s3/utils.ex b/lib/ex_aws/s3/utils.ex index 1981943..03fb172 100644 --- a/lib/ex_aws/s3/utils.ex +++ b/lib/ex_aws/s3/utils.ex @@ -345,4 +345,23 @@ defmodule ExAws.S3.Utils do {{datetime.year, datetime.month, datetime.day}, {datetime.hour, datetime.minute, datetime.second}} end + + def to_xml(map) when is_map(map) do + Enum.map(map, fn {key, value} -> + key = key |> Atom.to_string() |> Macro.camelize() + "<#{key}>#{to_xml(value)}" + end) + |> Enum.join() + end + + def to_xml(list) when is_list(list) do + Enum.map(list, fn value -> + "#{to_xml(value)}" + end) + |> Enum.join() + end + + def to_xml(value) when is_binary(value), do: value + def to_xml(value) when is_integer(value), do: Integer.to_string(value) + def to_xml(value) when is_float(value), do: Float.to_string(value) end From 13bf801533055736fe508c896ad747d95252c2b1 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 15:28:26 -0800 Subject: [PATCH 06/21] Refactor ExAws.S3.SelectObjectContents to use options for input_serialization, output_serialization, and scan_range --- lib/ex_aws/s3/select_object_contents.ex | 12 +++--- ss.exs | 53 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 ss.exs diff --git a/lib/ex_aws/s3/select_object_contents.ex b/lib/ex_aws/s3/select_object_contents.ex index 5b4e49a..65ab902 100644 --- a/lib/ex_aws/s3/select_object_contents.ex +++ b/lib/ex_aws/s3/select_object_contents.ex @@ -77,10 +77,10 @@ defmodule ExAws.S3.SelectObjectContents do %{ start: _range_start, end: _range_end - } = scan_range_input + } = scan_range ) do ExAws.S3.Utils.to_xml(%{ - scan_range_input: scan_range_input + scan_range: scan_range }) end @@ -126,12 +126,14 @@ defmodule ExAws.S3.SelectObjectContents do bucket: bucket, path: path, query: query, - input_serialization: input_serialization, - output_serialization: output_serialization, - scan_range: scan_range + opts: opts }, config ) do + input_serialization = opts[:input_serialization] || %{csv: %{}} + output_serialization = opts[:output_serialization] || %{csv: %{}} + scan_range = opts[:scan_range] || nil + payload = SelectObjectContents.build_payload( query, diff --git a/ss.exs b/ss.exs new file mode 100644 index 0000000..8a3d804 --- /dev/null +++ b/ss.exs @@ -0,0 +1,53 @@ +defmodule S3Lister do + @moduledoc """ + Module for listing contents of an S3 bucket. + """ + + def init do + config = + ExAws.Config.new(:s3, + access_key_id: "minio", + secret_access_key: "minio123", + scheme: "http://", + host: "localhost", + port: 9000, + debug_requests: true + ) + + Application.put_env(:ex_aws, :s3, config) + end + + def write_stream_to_files(stream, file_name \\ "output.txt") do + stream + |> Stream.map(fn payload -> + IO.puts(payload) + payload + end) + |> Stream.into(File.stream!(file_name)) + |> Stream.run() + end + + def select_dhhd_chunk(bucket, s3_key) do + ExAws.S3.select_object_content( + bucket, + s3_key, + "select ID,Name from S3Object", + input_serialization: %{csv: %{}}, + output_serialization: %{csv: %{}}, + scan_range: %{start: 0, end: 1000} + ) + |> ExAws.stream!() + |> write_stream_to_files() + end + + def stream_file(bucket, s3_key) do + ExAws.S3.download_file(bucket, s3_key, :memory) + |> ExAws.stream!() + |> Stream.into(File.stream!("local_path.csv")) + |> Stream.run() + end +end + +S3Lister.init() +S3Lister.select_dhhd_chunk("hiive-local-preqin-data", "flowers_data.csv") +# S3Lister.stream_file("hiive-local-preqin-data", "flowers_data.csv") From 98a41f99586cd0f1551ffa505e5852ea4fdd50dc Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 15:28:54 -0800 Subject: [PATCH 07/21] Delete S3Lister module and unused function calls --- ss.exs | 53 ----------------------------------------------------- 1 file changed, 53 deletions(-) delete mode 100644 ss.exs diff --git a/ss.exs b/ss.exs deleted file mode 100644 index 8a3d804..0000000 --- a/ss.exs +++ /dev/null @@ -1,53 +0,0 @@ -defmodule S3Lister do - @moduledoc """ - Module for listing contents of an S3 bucket. - """ - - def init do - config = - ExAws.Config.new(:s3, - access_key_id: "minio", - secret_access_key: "minio123", - scheme: "http://", - host: "localhost", - port: 9000, - debug_requests: true - ) - - Application.put_env(:ex_aws, :s3, config) - end - - def write_stream_to_files(stream, file_name \\ "output.txt") do - stream - |> Stream.map(fn payload -> - IO.puts(payload) - payload - end) - |> Stream.into(File.stream!(file_name)) - |> Stream.run() - end - - def select_dhhd_chunk(bucket, s3_key) do - ExAws.S3.select_object_content( - bucket, - s3_key, - "select ID,Name from S3Object", - input_serialization: %{csv: %{}}, - output_serialization: %{csv: %{}}, - scan_range: %{start: 0, end: 1000} - ) - |> ExAws.stream!() - |> write_stream_to_files() - end - - def stream_file(bucket, s3_key) do - ExAws.S3.download_file(bucket, s3_key, :memory) - |> ExAws.stream!() - |> Stream.into(File.stream!("local_path.csv")) - |> Stream.run() - end -end - -S3Lister.init() -S3Lister.select_dhhd_chunk("hiive-local-preqin-data", "flowers_data.csv") -# S3Lister.stream_file("hiive-local-preqin-data", "flowers_data.csv") From a2c265f2d4c96f6d3159ca3bb0ea99846367148d Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 16:25:01 -0800 Subject: [PATCH 08/21] Add support for S3 SelectObjectContent API and improve code organization --- lib/ex_aws/s3.ex | 106 ++++++++++++++++-- lib/ex_aws/s3/parsers/event_stream.ex | 24 ++-- lib/ex_aws/s3/parsers/event_stream/header.ex | 17 +-- lib/ex_aws/s3/parsers/event_stream/message.ex | 9 +- lib/ex_aws/s3/parsers/event_stream/prelude.ex | 13 +-- lib/ex_aws/s3/utils.ex | 1 + 6 files changed, 128 insertions(+), 42 deletions(-) diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 65953fa..33c6966 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -7,7 +7,6 @@ defmodule ExAws.S3 do import ExAws.S3.Utils alias ExAws.S3.Parsers - alias ExAws.S3.SelectObjectContents @type acl_opt :: {:acl, canned_acl} | grant @type acl_opts :: [acl_opt] @@ -758,25 +757,110 @@ defmodule ExAws.S3 do } end + @type select_object_content_opts :: [ + {:input_serialization, input_serialization} + | {:output_serialization, output_serialization} + | {:scan_range, scan_range} + ] + + @type input_serialization :: + %{csv_input: csv_input} | %{json_input: json_input} | %{parquet_input: %{}} + + @type csv_input :: %{ + file_header_info: :none | :ignore | :use, + comments: binary, + quote_escape_character: binary, + record_delimiter: binary, + field_delimiter: binary, + quote_character: binary, + allow_quoted_record_delimiter: boolean + } + @type json_input :: %{ + type: :document | :lines + } + + @type output_serialization :: + %{csv_output: csv_output} | %{json_output: json_output} + + @type csv_output :: %{ + quote_fields: :always | :as_needed, + quote_escape_character: binary, + record_delimiter: binary, + field_delimiter: binary, + quote_character: binary + } + + @type json_output :: %{ + record_delimiter: binary + } + + @type scan_range :: %{start: pos_integer, end: pos_integer} + + @doc """ + Filters and selects the contents of an Amazon S3 object based on an SQL statement. + + ## Options + * `:input_serialization` - Specifies JSON, CSV, or Parquet as the input serialization format. + each of which has corresponding parameters to describe the format of the object to be retrieved. + * `:output_serialization` - Specifies JSON or CSV as the output serialization format. + Each of which has corresponding parameters to describe the format of the output data. + * `:scan_range` - Specifies the byte range of the object to get the records from. + + More information can be found in the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html). + + + ## Example + ``` + S3.select_object_content( + "my-bucket", + "path/to/file.csv", + "SELECT * FROM S3Object s WHERE s._1 = 'some value'", + input_serialization: %{ + csv_input: %{ + file_header_info: :use, + record_delimiter: "\\n", + field_delimiter: ",", + quote_character: "\\"" + } + }, + output_serialization: %{ + csv_output: %{ + record_delimiter: "\\n", + field_delimiter: ",", + quote_character: "\\"" + } + } + ) |> ExAws.stream!() + ``` + + Note that **this won't start fetching anything immediately** since it returns an Elixir `Stream`. + + ### Streaming into a file + ``` + S3.select_object_content( + "my-bucket", + "path/to/file.csv", + "SELECT * FROM S3Object s WHERE s._1 = 'some value'" + ) |> ExAws.stream!() |> Stream.into(File.stream!("output.csv")) + ``` + """ + @spec select_object_content( + bucket :: binary, + path :: binary, + query :: binary, + opts :: select_object_content_opts + ) :: __MODULE__.SelectObjectContents.t() def select_object_content( bucket, path, query, - input_serialization \\ %{ - csv: %{} - }, - output_serialization \\ %{ - csv: %{} - }, - scan_range \\ nil + opts \\ [] ) do %__MODULE__.SelectObjectContents{ bucket: bucket, path: path, query: query, - input_serialization: input_serialization, - output_serialization: output_serialization, - scan_range: scan_range + opts: opts } end diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 153d866..60718ce 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -1,23 +1,23 @@ defmodule ExAws.S3.Parsers.EventStream do - @moduledoc """ - Parses EventStream messages. + @moduledoc false - AWS encodes EventStream messages in binary as follows: - [ prelude ][ headers ][ payload ][ message-crc ] - |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| + # Parses EventStream messages. - This module parses this information and returns a struct with the prelude, headers and payload. + # AWS encodes EventStream messages in binary as follows: + # [ prelude ][ headers ][ payload ][ message-crc ] + # |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| - The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + # This module parses this information and returns a struct with the prelude, headers and payload. - The headers are a map of header names to values. + # The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. - The payload is the actual message data. + # The headers are a map of header names to values. - The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). + # The payload is the actual message data. - Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. - """ + # The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). + + # Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. alias ExAws.S3.Parsers.EventStream.Message diff --git a/lib/ex_aws/s3/parsers/event_stream/header.ex b/lib/ex_aws/s3/parsers/event_stream/header.ex index f1a9b4f..0e43431 100644 --- a/lib/ex_aws/s3/parsers/event_stream/header.ex +++ b/lib/ex_aws/s3/parsers/event_stream/header.ex @@ -1,15 +1,16 @@ defmodule ExAws.S3.Parsers.EventStream.Header do - @moduledoc """ - Parses EventStream headers. + @moduledoc false - AWS encodes EventStream headers as follows: + # Parses EventStream headers. - [header-name-size][header-name][header-data-type][header-value-size][header-value-data] - |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| + # AWS encodes EventStream headers as follows: + + # [header-name-size][header-name][header-data-type][header-value-size][header-value-data] + # |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| + + # This module parses this information and returns a map of header names - values. + # header-data-type is always 0x07(String) for S3. - This module parses this information and returns a map of header names - values. - header-data-type is always 0x07(String) for S3. - """ alias ExAws.S3.Parsers.EventStream.Prelude def extract_headers(header_bytes) do diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex index 97efc17..757f80b 100644 --- a/lib/ex_aws/s3/parsers/event_stream/message.ex +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -1,8 +1,9 @@ defmodule ExAws.S3.Parsers.EventStream.Message do - @moduledoc """ - Parses EventStream messages. This module parses this information and returns - a struct with the prelude, headers and payload. Also verifies the message CRC. - """ + @moduledoc false + + # Parses EventStream messages. This module parses this information and returns + # a struct with the prelude, headers and payload. Also verifies the message CRC. + alias ExAws.S3.Parsers.EventStream.Prelude alias ExAws.S3.Parsers.EventStream.Header import Bitwise diff --git a/lib/ex_aws/s3/parsers/event_stream/prelude.ex b/lib/ex_aws/s3/parsers/event_stream/prelude.ex index 9a5e96c..928cbc8 100644 --- a/lib/ex_aws/s3/parsers/event_stream/prelude.ex +++ b/lib/ex_aws/s3/parsers/event_stream/prelude.ex @@ -1,14 +1,13 @@ defmodule ExAws.S3.Parsers.EventStream.Prelude do - @moduledoc """ - Parses EventStream preludes. + @moduledoc false + # Parses EventStream preludes. - AWS encodes EventStream preludes in binary as follows: + # AWS encodes EventStream preludes in binary as follows: - [ total-length ][headers-length][ prelude crc ] - |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| + # [ total-length ][headers-length][ prelude crc ] + # |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| - This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. - """ + # This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. import Bitwise defstruct total_length: nil, diff --git a/lib/ex_aws/s3/utils.ex b/lib/ex_aws/s3/utils.ex index 03fb172..30f352c 100644 --- a/lib/ex_aws/s3/utils.ex +++ b/lib/ex_aws/s3/utils.ex @@ -361,6 +361,7 @@ defmodule ExAws.S3.Utils do |> Enum.join() end + def to_xml(value) when is_atom(value), do: Atom.to_string(value) |> String.upcase() def to_xml(value) when is_binary(value), do: value def to_xml(value) when is_integer(value), do: Integer.to_string(value) def to_xml(value) when is_float(value), do: Float.to_string(value) From 649110e63414b44a1d8a827487aacc10f1787fa9 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 16:27:18 -0800 Subject: [PATCH 09/21] Add type definition for SelectObjectContents module --- lib/ex_aws/s3/select_object_contents.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/ex_aws/s3/select_object_contents.ex b/lib/ex_aws/s3/select_object_contents.ex index 65ab902..7c07724 100644 --- a/lib/ex_aws/s3/select_object_contents.ex +++ b/lib/ex_aws/s3/select_object_contents.ex @@ -11,6 +11,8 @@ defmodule ExAws.S3.SelectObjectContents do opts: [], service: :s3 + @type t :: %__MODULE__{} + # https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html def input_params(%{ csv: csv_input From 861bf58c0850201649ffeb40cbae4a93eb9c6354 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 16:35:33 -0800 Subject: [PATCH 10/21] Add module documentation for SelectObjectContents operation --- lib/ex_aws/s3/select_object_contents.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/ex_aws/s3/select_object_contents.ex b/lib/ex_aws/s3/select_object_contents.ex index 7c07724..bde99c2 100644 --- a/lib/ex_aws/s3/select_object_contents.ex +++ b/lib/ex_aws/s3/select_object_contents.ex @@ -1,4 +1,7 @@ defmodule ExAws.S3.SelectObjectContents do + @moduledoc """ + Represents the (SelectObjectContent)[https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html] operation. + """ alias ExAws.S3.Utils @enforce_keys ~w(bucket path query)a @@ -13,7 +16,6 @@ defmodule ExAws.S3.SelectObjectContents do @type t :: %__MODULE__{} - # https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html def input_params(%{ csv: csv_input }) do From 14238a54f29d4cfb315b854a9b4101a582ba63db Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 16:48:54 -0800 Subject: [PATCH 11/21] Add to_xml/1 function to Utils module --- test/lib/s3/utils_test.exs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/lib/s3/utils_test.exs b/test/lib/s3/utils_test.exs index 376c66b..d8f7bab 100644 --- a/test/lib/s3/utils_test.exs +++ b/test/lib/s3/utils_test.exs @@ -132,4 +132,23 @@ defmodule ExAws.S3.ImplTest do "0000true0prefix/Enabled123" end end + + describe "to_xml/1" do + test "renders a simple map" do + assert %{ + foo: %{ + bar: "baz", + qux: :quux, + nested: %{foo: "bar"}, + num: 10, + float: 10.0, + bool: true, + null: nil, + snake_case: "camelCase" + } + } + |> Utils.to_xml() == + "bazTRUE10.0barNIL10QUUXcamelCase" + end + end end From 555b2f4bb00f3fc225e3cc90f7c5941131aff0a9 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 16:51:32 -0800 Subject: [PATCH 12/21] Refactor ExAws.S3.Parsers.EventStream and Prelude modules to improve readability and consistency. --- lib/ex_aws/s3/parsers/event_stream.ex | 7 ++----- lib/ex_aws/s3/parsers/event_stream/prelude.ex | 4 +++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 60718ce..7cb91c3 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -8,15 +8,12 @@ defmodule ExAws.S3.Parsers.EventStream do # |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| # This module parses this information and returns a struct with the prelude, headers and payload. - - # The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + # The prelude contains the total length of the message, the length of the headers, + # the length of the prelude, the CRC of the message, and the length of the payload. # The headers are a map of header names to values. - # The payload is the actual message data. - # The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). - # Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. alias ExAws.S3.Parsers.EventStream.Message diff --git a/lib/ex_aws/s3/parsers/event_stream/prelude.ex b/lib/ex_aws/s3/parsers/event_stream/prelude.ex index 928cbc8..dac0aee 100644 --- a/lib/ex_aws/s3/parsers/event_stream/prelude.ex +++ b/lib/ex_aws/s3/parsers/event_stream/prelude.ex @@ -7,7 +7,9 @@ defmodule ExAws.S3.Parsers.EventStream.Prelude do # [ total-length ][headers-length][ prelude crc ] # |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| - # This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + # This module parses this information and returns a struct with the total length of the message, + # the length of the headers, the length of the prelude, the CRC of the message, + # and the length of the payload. import Bitwise defstruct total_length: nil, From bde09da20f8fed559b742224040dd5a873510994 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 17:02:46 -0800 Subject: [PATCH 13/21] Refactor code for improved readability and performance. --- test/lib/s3/select_object_contents_test.exs | 109 ++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 test/lib/s3/select_object_contents_test.exs diff --git a/test/lib/s3/select_object_contents_test.exs b/test/lib/s3/select_object_contents_test.exs new file mode 100644 index 0000000..852625e --- /dev/null +++ b/test/lib/s3/select_object_contents_test.exs @@ -0,0 +1,109 @@ +defmodule ExAws.S3.SelectObjectContentsTest do + use ExUnit.Case, async: true + + alias ExAws.S3.SelectObjectContents + + describe "SelectObjectContents.build_payload/4" do + test "default payload" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{csv: %{}}, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n ,\n\n \n \n\n" + end + + test "CSV input" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{ + csv: %{ + file_header_info: "IGNORE", + record_delimiter: "\n", + field_delimiter: ",", + quote_character: "\"", + quote_escape_character: "\"", + comments: "#", + allow_quoted_record_delimiter: false + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n FALSE#,IGNORE\"\"\n\n \n \n ,\n\n \n \n\n" + end + + test "JSON Input" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{ + json: %{ + type: :document, + record_delimiter: "\n" + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n \nDOCUMENT\n \n \n ,\n\n \n \n\n" + end + + test "Parquet Input" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{ + parquet: %{ + record_delimiter: "\n" + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n \n \n \n ,\n\n \n \n\n" + end + + test "CSV Output" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{csv: %{}}, + %{ + csv: %{ + record_delimiter: "\n", + field_delimiter: ",", + quote_character: "\"", + quote_escape_character: "\"", + quote_fields: :asneeded, + comments: "#" + } + }, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n #,\"\"ASNEEDED\n\n \n \n\n" + end + + test "JSON Output" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{csv: %{}}, + %{ + json: %{ + record_delimiter: "\n" + } + }, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n \n\n \n \n\n" + end + + test "Scan Range" do + assert SelectObjectContents.build_payload( + "select * from s3object", + %{csv: %{}}, + %{csv: %{}}, + %{start: 0, end: 100} + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n ,\n\n \n 1000\n\n" + end + end +end From f7174f94f425b9a21d58c274b588fe6612a9c1d7 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 17:40:18 -0800 Subject: [PATCH 14/21] Refactor S3 event stream header test and delete S3 select object contents test --- test/lib/s3/event_stream/header_test.exs | 2 +- test/lib/s3/event_stream/message_test.exs | 81 +++++++++++++++++++ .../request_test.exs} | 2 +- 3 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 test/lib/s3/event_stream/message_test.exs rename test/lib/s3/{select_object_contents_test.exs => select_object_contents/request_test.exs} (99%) diff --git a/test/lib/s3/event_stream/header_test.exs b/test/lib/s3/event_stream/header_test.exs index a6a3727..392f22f 100644 --- a/test/lib/s3/event_stream/header_test.exs +++ b/test/lib/s3/event_stream/header_test.exs @@ -1,4 +1,4 @@ -defmodule ExAws.AuthTest do +defmodule ExAws.S3.EventStream.HeaderTest do use ExUnit.Case, async: true alias ExAws.S3.Parsers.EventStream.Header diff --git a/test/lib/s3/event_stream/message_test.exs b/test/lib/s3/event_stream/message_test.exs new file mode 100644 index 0000000..7d5add3 --- /dev/null +++ b/test/lib/s3/event_stream/message_test.exs @@ -0,0 +1,81 @@ +defmodule ExAws.S3.EventStream.MessageTest do + use ExUnit.Case, async: true + + describe "Message.parse/1" do + test "parses a binary EventStream Records chunk" do + chunk = + <<0, 0, 1, 81, 0, 0, 0, 85, 176, 12, 79, 204, 13, 58, 109, 101, 115, 115, 97, 103, 101, + 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, + 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, + 105, 111, 110, 47, 111, 99, 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, + 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115, + 123, 34, 73, 68, 34, 58, 34, 49, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 80, 108, 97, + 121, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 50, 34, 44, 34, 78, 97, 109, 101, 34, 58, + 34, 68, 101, 116, 97, 105, 108, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 51, 34, 44, + 34, 78, 97, 109, 101, 34, 58, 34, 68, 105, 114, 101, 99, 116, 111, 114, 34, 125, 10, + 123, 34, 73, 68, 34, 58, 34, 52, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 89, 101, 97, + 114, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 53, 34, 44, 34, 78, 97, 109, 101, 34, 58, + 34, 80, 114, 101, 115, 101, 110, 116, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 54, 34, + 44, 34, 78, 97, 109, 101, 34, 58, 34, 83, 117, 114, 102, 97, 99, 101, 34, 125, 10, 123, + 34, 73, 68, 34, 58, 34, 55, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 65, 34, 125, 10, + 123, 34, 73, 68, 34, 58, 34, 56, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 83, 101, 97, + 115, 111, 110, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 57, 34, 44, 34, 78, 97, 109, + 101, 34, 58, 34, 70, 105, 114, 101, 34, 125, 10, 66, 109, 206, 248>> + + assert {:ok, + %ExAws.S3.Parsers.EventStream.Message{ + prelude: %ExAws.S3.Parsers.EventStream.Prelude{ + total_length: 337, + headers_length: 85, + prelude_length: 12, + crc: 2_953_596_876, + payload_length: 236, + prelude_bytes: <<0, 0, 1, 81, 0, 0, 0, 85, 176, 12, 79, 204>> + }, + headers: %{ + ":content-type" => "application/octet-stream", + ":event-type" => "Records", + ":message-type" => "event" + }, + payload: + "{\"ID\":\"1\",\"Name\":\"Play\"}\n{\"ID\":\"2\",\"Name\":\"Detail\"}\n{\"ID\":\"3\",\"Name\":\"Director\"}\n{\"ID\":\"4\",\"Name\":\"Year\"}\n{\"ID\":\"5\",\"Name\":\"Present\"}\n{\"ID\":\"6\",\"Name\":\"Surface\"}\n{\"ID\":\"7\",\"Name\":\"A\"}\n{\"ID\":\"8\",\"Name\":\"Season\"}\n{\"ID\":\"9\",\"Name\":\"Fire\"}\n" + }} = ExAws.S3.Parsers.EventStream.Message.parse(chunk) + end + + test "parses a binary EventStream Stats chunk" do + chunk = + <<0, 0, 0, 240, 0, 0, 0, 67, 194, 195, 159, 30, 13, 58, 109, 101, 115, 115, 97, 103, 101, + 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, + 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, + 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115, + 60, 63, 120, 109, 108, 32, 118, 101, 114, 115, 105, 111, 110, 61, 34, 49, 46, 48, 34, + 32, 101, 110, 99, 111, 100, 105, 110, 103, 61, 34, 85, 84, 70, 45, 56, 34, 63, 62, 60, + 83, 116, 97, 116, 115, 62, 60, 66, 121, 116, 101, 115, 83, 99, 97, 110, 110, 101, 100, + 62, 49, 48, 48, 49, 60, 47, 66, 121, 116, 101, 115, 83, 99, 97, 110, 110, 101, 100, 62, + 60, 66, 121, 116, 101, 115, 80, 114, 111, 99, 101, 115, 115, 101, 100, 62, 49, 48, 48, + 49, 60, 47, 66, 121, 116, 101, 115, 80, 114, 111, 99, 101, 115, 115, 101, 100, 62, 60, + 66, 121, 116, 101, 115, 82, 101, 116, 117, 114, 110, 101, 100, 62, 50, 51, 54, 60, 47, + 66, 121, 116, 101, 115, 82, 101, 116, 117, 114, 110, 101, 100, 62, 60, 47, 83, 116, 97, + 116, 115, 62, 8, 57, 236, 68>> + + assert {:ok, + %ExAws.S3.Parsers.EventStream.Message{ + prelude: %ExAws.S3.Parsers.EventStream.Prelude{ + total_length: 240, + headers_length: 67, + prelude_length: 12, + crc: 3_267_600_158, + payload_length: 157, + prelude_bytes: <<0, 0, 0, 240, 0, 0, 0, 67, 194, 195, 159, 30>> + }, + headers: %{ + ":content-type" => "text/xml", + ":event-type" => "Stats", + ":message-type" => "event" + }, + payload: + "10011001236" + }} = ExAws.S3.Parsers.EventStream.Message.parse(chunk) + end + end +end diff --git a/test/lib/s3/select_object_contents_test.exs b/test/lib/s3/select_object_contents/request_test.exs similarity index 99% rename from test/lib/s3/select_object_contents_test.exs rename to test/lib/s3/select_object_contents/request_test.exs index 852625e..6ded0da 100644 --- a/test/lib/s3/select_object_contents_test.exs +++ b/test/lib/s3/select_object_contents/request_test.exs @@ -1,4 +1,4 @@ -defmodule ExAws.S3.SelectObjectContentsTest do +defmodule ExAws.S3.SelectObjectContents.RequestTest do use ExUnit.Case, async: true alias ExAws.S3.SelectObjectContents From 2a83400e0475ab552c087a4c3eb8324a00d19276 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Wed, 8 Nov 2023 18:10:47 -0800 Subject: [PATCH 15/21] Fix SelectObjectContents typo and remove unused module --- lib/ex_aws/s3.ex | 4 +- ...t_contents.ex => select_object_content.ex} | 9 ++--- .../select_object_contents/request_test.exs | 20 +++++----- .../s3/select_object_contents/stream_test.exs | 35 +++++++++++++++++ test/lib/s3_test.exs | 38 +++++++++++++++++++ 5 files changed, 88 insertions(+), 18 deletions(-) rename lib/ex_aws/s3/{select_object_contents.ex => select_object_content.ex} (93%) create mode 100644 test/lib/s3/select_object_contents/stream_test.exs diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 33c6966..0464e39 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -849,14 +849,14 @@ defmodule ExAws.S3 do path :: binary, query :: binary, opts :: select_object_content_opts - ) :: __MODULE__.SelectObjectContents.t() + ) :: __MODULE__.SelectObjectContent.t() def select_object_content( bucket, path, query, opts \\ [] ) do - %__MODULE__.SelectObjectContents{ + %__MODULE__.SelectObjectContent{ bucket: bucket, path: path, query: query, diff --git a/lib/ex_aws/s3/select_object_contents.ex b/lib/ex_aws/s3/select_object_content.ex similarity index 93% rename from lib/ex_aws/s3/select_object_contents.ex rename to lib/ex_aws/s3/select_object_content.ex index bde99c2..59795b3 100644 --- a/lib/ex_aws/s3/select_object_contents.ex +++ b/lib/ex_aws/s3/select_object_content.ex @@ -1,4 +1,4 @@ -defmodule ExAws.S3.SelectObjectContents do +defmodule ExAws.S3.SelectObjectContent do @moduledoc """ Represents the (SelectObjectContent)[https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html] operation. """ @@ -8,9 +8,6 @@ defmodule ExAws.S3.SelectObjectContents do defstruct bucket: nil, path: nil, query: nil, - input_serialization: nil, - output_serialization: nil, - scan_range: nil, opts: [], service: :s3 @@ -115,7 +112,7 @@ defmodule ExAws.S3.SelectObjectContents do end defimpl ExAws.Operation do - alias ExAws.S3.SelectObjectContents + alias ExAws.S3.SelectObjectContent alias ExAws.S3.Parsers.EventStream def perform( @@ -139,7 +136,7 @@ defmodule ExAws.S3.SelectObjectContents do scan_range = opts[:scan_range] || nil payload = - SelectObjectContents.build_payload( + SelectObjectContent.build_payload( query, input_serialization, output_serialization, diff --git a/test/lib/s3/select_object_contents/request_test.exs b/test/lib/s3/select_object_contents/request_test.exs index 6ded0da..bf72b89 100644 --- a/test/lib/s3/select_object_contents/request_test.exs +++ b/test/lib/s3/select_object_contents/request_test.exs @@ -1,11 +1,11 @@ -defmodule ExAws.S3.SelectObjectContents.RequestTest do +defmodule ExAws.S3.SelectObjectContent.RequestTest do use ExUnit.Case, async: true - alias ExAws.S3.SelectObjectContents + alias ExAws.S3.SelectObjectContent - describe "SelectObjectContents.build_payload/4" do + describe "SelectObjectContent.build_payload/4" do test "default payload" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{csv: %{}}, %{csv: %{}}, @@ -15,7 +15,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "CSV input" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{ csv: %{ @@ -35,7 +35,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "JSON Input" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{ json: %{ @@ -50,7 +50,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "Parquet Input" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{ parquet: %{ @@ -64,7 +64,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "CSV Output" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{csv: %{}}, %{ @@ -83,7 +83,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "JSON Output" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{csv: %{}}, %{ @@ -97,7 +97,7 @@ defmodule ExAws.S3.SelectObjectContents.RequestTest do end test "Scan Range" do - assert SelectObjectContents.build_payload( + assert SelectObjectContent.build_payload( "select * from s3object", %{csv: %{}}, %{csv: %{}}, diff --git a/test/lib/s3/select_object_contents/stream_test.exs b/test/lib/s3/select_object_contents/stream_test.exs new file mode 100644 index 0000000..b5a91a4 --- /dev/null +++ b/test/lib/s3/select_object_contents/stream_test.exs @@ -0,0 +1,35 @@ +defmodule ExAws.S3.SelectObjectContent.StreamTest do + use ExUnit.Case, async: true + + import Support.BypassHelpers + alias ExAws.S3 + + describe "integration test" do + setup [:start_bypass] + + test "stream SelectObjectContent results", %{bypass: bypass} do + file = "test.csv" + bucket = "my-bucket" + setup_select_object_contents_backend(bypass, self(), bucket, file) + + bucket + |> S3.select_object_content(file, "select * from s3object") + |> ExAws.stream!(exaws_config_for_bypass(bypass)) + + assert_received :fetched_stream + end + end + + defp setup_select_object_contents_backend(bypass, test_pid, bucket_name, path) do + request_path = "/#{bucket_name}/#{path}" + + Bypass.expect(bypass, fn conn -> + case conn do + %{method: "POST", request_path: ^request_path, query_string: "select=&select-type=2"} -> + send(test_pid, :fetched_stream) + + Plug.Conn.send_resp(conn, 200, []) + end + end) + end +end diff --git a/test/lib/s3_test.exs b/test/lib/s3_test.exs index 005b019..ed97cf3 100644 --- a/test/lib/s3_test.exs +++ b/test/lib/s3_test.exs @@ -690,6 +690,44 @@ defmodule ExAws.S3Test do assert url == "https://bucket.custom-domain.com" end + test "#select_object_content with input_serialization, output_serialization, and scan_range" do + expected = %ExAws.S3.SelectObjectContent{ + bucket: "bucket", + opts: [ + {:input_serialization, + %{csv: %{field_delimiter: ",", file_header_info: :use, record_delimiter: "\n"}}}, + {:output_serialization, %{csv: %{field_delimiter: ",", record_delimiter: "\n"}}}, + {:scan_range, %{end: 100, start: 0}} + ], + path: "object", + query: "select * from s3object", + service: :s3 + } + + assert S3.select_object_content( + "bucket", + "object", + "select * from s3object", + input_serialization: %{ + csv: %{ + file_header_info: :use, + record_delimiter: "\n", + field_delimiter: "," + } + }, + output_serialization: %{ + csv: %{ + record_delimiter: "\n", + field_delimiter: "," + } + }, + scan_range: %{ + start: 0, + end: 100 + } + ) == expected + end + @spec assert_pre_signed_url( url, expected_scheme_host_path, From 7cbb0f7723cdf97f971c92f7f5d5df8a472fe850 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Thu, 9 Nov 2023 13:17:19 -0800 Subject: [PATCH 16/21] Add error logging to EventStream parser --- lib/ex_aws/s3/parsers/event_stream.ex | 6 ++++++ lib/ex_aws/s3/parsers/event_stream/message.ex | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 7cb91c3..e2bc085 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -17,6 +17,7 @@ defmodule ExAws.S3.Parsers.EventStream do # Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. alias ExAws.S3.Parsers.EventStream.Message + require Logger defp parse_message(chunk) do with {:ok, message} <- Message.parse(chunk) do @@ -32,7 +33,12 @@ defmodule ExAws.S3.Parsers.EventStream do ) do stream |> Stream.map(&parse_message/1) + |> Stream.each(&Message.log_errors/1) |> Stream.filter(&Message.is_record?/1) |> Stream.map(&Message.get_payload/1) end + + def parse_raw_stream({:error, {:http_error, _, %{headers: _, status_code: _, stream: stream}}}) do + stream |> Enum.into("") |> Logger.error() + end end diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex index 757f80b..c88b6a4 100644 --- a/lib/ex_aws/s3/parsers/event_stream/message.ex +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -7,6 +7,7 @@ defmodule ExAws.S3.Parsers.EventStream.Message do alias ExAws.S3.Parsers.EventStream.Prelude alias ExAws.S3.Parsers.EventStream.Header import Bitwise + require Logger defstruct prelude: nil, headers: nil, @@ -55,6 +56,12 @@ defmodule ExAws.S3.Parsers.EventStream.Message do payload end + def log_errors(%__MODULE__{headers: headers}) do + if Map.get(headers, ":message-type") == "error" do + Logger.error("Error in EventStream: #{inspect(headers)}") + end + end + def parse(chunk) do with {:ok, prelude, payload_bytes} <- Prelude.parse(chunk), From 02282651e2080abab41b7da361eefc90ab3b5cf9 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 14 Nov 2023 00:18:52 -0800 Subject: [PATCH 17/21] Refactored S3 EventStream parser for improved error handling, Updated ExAws.SParsers.EventStream to buffer streams --- lib/ex_aws/s3/parsers/event_stream.ex | 59 ++++++++++++++++--- lib/ex_aws/s3/parsers/event_stream/message.ex | 15 +---- lib/ex_aws/s3/parsers/event_stream/prelude.ex | 4 +- test/lib/s3/event_stream/message_test.exs | 19 ++++-- 4 files changed, 69 insertions(+), 28 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index e2bc085..eda9d8c 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -17,11 +17,51 @@ defmodule ExAws.S3.Parsers.EventStream do # Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. alias ExAws.S3.Parsers.EventStream.Message + alias ExAws.S3.Parsers.EventStream.Prelude + alias ExAws.S3.Parsers.EventStream.Header require Logger - defp parse_message(chunk) do - with {:ok, message} <- Message.parse(chunk) do - message + defp buffer_stream(stream) do + Stream.transform(stream, {nil, <<>>}, fn chunk, {prelude, buffer} -> + if(is_nil(prelude)) do + {:ok, prelude} = Prelude.parse(buffer <> chunk) + {[], {prelude, buffer <> chunk}} + else + remaining_bytes = prelude.total_length - byte_size(buffer) + + cond do + byte_size(chunk) < remaining_bytes -> + {[], {prelude, buffer <> chunk}} + + byte_size(chunk) >= remaining_bytes -> + <> = chunk + {:ok, parsed_message} = parse_message(prelude, buffer <> payload) + {[parsed_message], {nil, remaining_buffer}} + end + end + end) + end + + defp chunk_stream_by_linebreaks(stream) do + Stream.transform(stream, "", fn + chunk, buffer -> + case String.split(buffer <> chunk, "\n") do + lines when length(lines) > 1 -> + last = Enum.at(lines, -1) + rest = Enum.slice(lines, 0..-2) + {rest, last} + + [line] -> + {[], line} + end + end) + end + + def parse_message(prelude, payload_bytes) do + with :ok <- Message.verify_message_crc(prelude, payload_bytes), + {:ok, headers} <- Header.parse(prelude, payload_bytes), + {:ok, payload} <- Message.parse_payload(prelude, payload_bytes) do + {:ok, %Message{prelude: prelude, payload: payload, headers: headers}} end end @@ -31,14 +71,19 @@ defmodule ExAws.S3.Parsers.EventStream do stream: stream }} ) do - stream - |> Stream.map(&parse_message/1) - |> Stream.each(&Message.log_errors/1) + buffer_stream(stream) + |> Stream.each(&Message.raise_errors!/1) |> Stream.filter(&Message.is_record?/1) |> Stream.map(&Message.get_payload/1) + |> chunk_stream_by_linebreaks() end def parse_raw_stream({:error, {:http_error, _, %{headers: _, status_code: _, stream: stream}}}) do - stream |> Enum.into("") |> Logger.error() + stream_error = Enum.into(stream, "") + raise "Error parsing stream: #{inspect(stream_error)}" + end + + def parse_raw_stream({:error, error}) do + raise "Error parsing stream: #{inspect(error)}" end end diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex index c88b6a4..b64556d 100644 --- a/lib/ex_aws/s3/parsers/event_stream/message.ex +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -5,7 +5,6 @@ defmodule ExAws.S3.Parsers.EventStream.Message do # a struct with the prelude, headers and payload. Also verifies the message CRC. alias ExAws.S3.Parsers.EventStream.Prelude - alias ExAws.S3.Parsers.EventStream.Header import Bitwise require Logger @@ -56,19 +55,9 @@ defmodule ExAws.S3.Parsers.EventStream.Message do payload end - def log_errors(%__MODULE__{headers: headers}) do + def raise_errors!(%__MODULE__{headers: headers}) do if Map.get(headers, ":message-type") == "error" do - Logger.error("Error in EventStream: #{inspect(headers)}") - end - end - - def parse(chunk) do - with {:ok, prelude, payload_bytes} <- - Prelude.parse(chunk), - :ok <- verify_message_crc(prelude, payload_bytes), - {:ok, headers} <- Header.parse(prelude, payload_bytes), - {:ok, payload} <- parse_payload(prelude, payload_bytes) do - {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} + raise "Error message received: #{inspect(headers)}" end end end diff --git a/lib/ex_aws/s3/parsers/event_stream/prelude.ex b/lib/ex_aws/s3/parsers/event_stream/prelude.ex index dac0aee..1246f74 100644 --- a/lib/ex_aws/s3/parsers/event_stream/prelude.ex +++ b/lib/ex_aws/s3/parsers/event_stream/prelude.ex @@ -78,11 +78,11 @@ defmodule ExAws.S3.Parsers.EventStream.Prelude do end end - def parse(<> = payload) do + def parse(<>) do with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), {:ok, prelude} <- validate_prelude(unpacked_prelude), :ok <- validate_checksum(prelude_bytes, unpacked_prelude.crc) do - {:ok, prelude, payload} + {:ok, prelude} end end end diff --git a/test/lib/s3/event_stream/message_test.exs b/test/lib/s3/event_stream/message_test.exs index 7d5add3..799d3bd 100644 --- a/test/lib/s3/event_stream/message_test.exs +++ b/test/lib/s3/event_stream/message_test.exs @@ -1,6 +1,8 @@ defmodule ExAws.S3.EventStream.MessageTest do use ExUnit.Case, async: true + alias ExAws.S3.Parsers.EventStream + describe "Message.parse/1" do test "parses a binary EventStream Records chunk" do chunk = @@ -23,8 +25,8 @@ defmodule ExAws.S3.EventStream.MessageTest do 101, 34, 58, 34, 70, 105, 114, 101, 34, 125, 10, 66, 109, 206, 248>> assert {:ok, - %ExAws.S3.Parsers.EventStream.Message{ - prelude: %ExAws.S3.Parsers.EventStream.Prelude{ + %EventStream.Message{ + prelude: %EventStream.Prelude{ total_length: 337, headers_length: 85, prelude_length: 12, @@ -39,7 +41,7 @@ defmodule ExAws.S3.EventStream.MessageTest do }, payload: "{\"ID\":\"1\",\"Name\":\"Play\"}\n{\"ID\":\"2\",\"Name\":\"Detail\"}\n{\"ID\":\"3\",\"Name\":\"Director\"}\n{\"ID\":\"4\",\"Name\":\"Year\"}\n{\"ID\":\"5\",\"Name\":\"Present\"}\n{\"ID\":\"6\",\"Name\":\"Surface\"}\n{\"ID\":\"7\",\"Name\":\"A\"}\n{\"ID\":\"8\",\"Name\":\"Season\"}\n{\"ID\":\"9\",\"Name\":\"Fire\"}\n" - }} = ExAws.S3.Parsers.EventStream.Message.parse(chunk) + }} = parse(chunk) end test "parses a binary EventStream Stats chunk" do @@ -59,8 +61,8 @@ defmodule ExAws.S3.EventStream.MessageTest do 116, 115, 62, 8, 57, 236, 68>> assert {:ok, - %ExAws.S3.Parsers.EventStream.Message{ - prelude: %ExAws.S3.Parsers.EventStream.Prelude{ + %EventStream.Message{ + prelude: %EventStream.Prelude{ total_length: 240, headers_length: 67, prelude_length: 12, @@ -75,7 +77,12 @@ defmodule ExAws.S3.EventStream.MessageTest do }, payload: "10011001236" - }} = ExAws.S3.Parsers.EventStream.Message.parse(chunk) + }} = parse(chunk) end end + + defp parse(chunk) do + {:ok, prelude} = EventStream.Prelude.parse(chunk) + EventStream.parse_message(prelude, chunk) + end end From f1b1e828b9a1c926d3f0f12e9204c53eacc5a562 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 14 Nov 2023 09:13:27 -0800 Subject: [PATCH 18/21] Refactored buffer_stream function for better code readability --- lib/ex_aws/s3/parsers/event_stream.ex | 32 +++++++++++++-------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index eda9d8c..84ea9c5 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -22,24 +22,24 @@ defmodule ExAws.S3.Parsers.EventStream do require Logger defp buffer_stream(stream) do - Stream.transform(stream, {nil, <<>>}, fn chunk, {prelude, buffer} -> - if(is_nil(prelude)) do - {:ok, prelude} = Prelude.parse(buffer <> chunk) - {[], {prelude, buffer <> chunk}} - else - remaining_bytes = prelude.total_length - byte_size(buffer) + Stream.transform(stream, {nil, <<>>}, &buffer_stream/2) + end - cond do - byte_size(chunk) < remaining_bytes -> - {[], {prelude, buffer <> chunk}} + defp buffer_stream(chunk, {nil, buffer}) do + {:ok, prelude} = Prelude.parse(buffer <> chunk) + {[], {prelude, buffer <> chunk}} + end - byte_size(chunk) >= remaining_bytes -> - <> = chunk - {:ok, parsed_message} = parse_message(prelude, buffer <> payload) - {[parsed_message], {nil, remaining_buffer}} - end - end - end) + defp buffer_stream(chunk, {%Prelude{total_length: total_length} = prelude, buffer}) do + remaining_bytes = total_length - byte_size(buffer) + + if byte_size(chunk) < remaining_bytes do + {[], {prelude, buffer <> chunk}} + else + <> = chunk + {:ok, parsed_message} = parse_message(prelude, buffer <> payload) + {[parsed_message], {nil, remaining_buffer}} + end end defp chunk_stream_by_linebreaks(stream) do From 770fd04b8ebbb7a61e61c0e9a5c2a0b763ca009d Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 14 Nov 2023 15:11:50 -0800 Subject: [PATCH 19/21] Updated buffer_stream logic in ExAws.SParsers.EventStream --- lib/ex_aws/s3/parsers/event_stream.ex | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 84ea9c5..080de60 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -26,8 +26,15 @@ defmodule ExAws.S3.Parsers.EventStream do end defp buffer_stream(chunk, {nil, buffer}) do - {:ok, prelude} = Prelude.parse(buffer <> chunk) - {[], {prelude, buffer <> chunk}} + payload = buffer <> chunk + {:ok, %Prelude{total_length: total_length} = prelude} = Prelude.parse(payload) + + if total_length == byte_size(payload) do + {:ok, parsed_message} = parse_message(prelude, payload) + {[parsed_message], {nil, <<>>}} + else + buffer_stream(chunk, {prelude, buffer}) + end end defp buffer_stream(chunk, {%Prelude{total_length: total_length} = prelude, buffer}) do From 393122047c4d7c4d53bd6ea1fd7c628d86ed7f63 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 14 Nov 2023 19:54:10 -0800 Subject: [PATCH 20/21] Refactored buffer_stream method in EventStream module --- lib/ex_aws/s3/parsers/event_stream.ex | 45 +++++++++++++++++++-------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 080de60..46aac97 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -26,11 +26,11 @@ defmodule ExAws.S3.Parsers.EventStream do end defp buffer_stream(chunk, {nil, buffer}) do - payload = buffer <> chunk - {:ok, %Prelude{total_length: total_length} = prelude} = Prelude.parse(payload) + new_buffer = buffer <> chunk + {:ok, %Prelude{total_length: total_length} = prelude} = Prelude.parse(new_buffer) - if total_length == byte_size(payload) do - {:ok, parsed_message} = parse_message(prelude, payload) + if total_length == byte_size(new_buffer) do + {:ok, parsed_message} = parse_message(prelude, new_buffer) {[parsed_message], {nil, <<>>}} else buffer_stream(chunk, {prelude, buffer}) @@ -38,14 +38,27 @@ defmodule ExAws.S3.Parsers.EventStream do end defp buffer_stream(chunk, {%Prelude{total_length: total_length} = prelude, buffer}) do - remaining_bytes = total_length - byte_size(buffer) - - if byte_size(chunk) < remaining_bytes do - {[], {prelude, buffer <> chunk}} - else - <> = chunk - {:ok, parsed_message} = parse_message(prelude, buffer <> payload) - {[parsed_message], {nil, remaining_buffer}} + new_buffer = buffer <> chunk + new_buffer_length = byte_size(new_buffer) + + cond do + new_buffer_length < total_length -> + # needs more data. put it in the buffer and wait for more + {[], {prelude, new_buffer}} + + new_buffer_length > total_length -> + # we have more than one message in the buffer. parse the first one and keep the rest in the buffer + <> = new_buffer + {:ok, parsed_message} = parse_message(prelude, payload) + {[parsed_message], {nil, remaining_buffer}} + + new_buffer_length == total_length -> + # we have exactly one message in the buffer. parse it and clear the buffer + <>::binary>> = + new_buffer + + {:ok, parsed_message} = parse_message(prelude, payload) + {[parsed_message], {nil, <<>>}} end end @@ -78,7 +91,13 @@ defmodule ExAws.S3.Parsers.EventStream do stream: stream }} ) do - buffer_stream(stream) + stream + # |> Stream.with_index() + # |> Stream.map(fn {chunk, index} -> + # File.write!("ch/chunk_#{index}.bin", chunk) + # chunk + # end) + |> buffer_stream() |> Stream.each(&Message.raise_errors!/1) |> Stream.filter(&Message.is_record?/1) |> Stream.map(&Message.get_payload/1) From 6a5d3c55d555cce854d74f494e07b117cec8021c Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 14 Nov 2023 19:56:37 -0800 Subject: [PATCH 21/21] Updated comments and removed unneeded code in event stream parser --- lib/ex_aws/s3/parsers/event_stream.ex | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex index 46aac97..6fa3c42 100644 --- a/lib/ex_aws/s3/parsers/event_stream.ex +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -11,6 +11,9 @@ defmodule ExAws.S3.Parsers.EventStream do # The prelude contains the total length of the message, the length of the headers, # the length of the prelude, the CRC of the message, and the length of the payload. + # Additionally, this module buffers the stream and parses the messages as they come in. + # Also, stream is transformed such that each item is seperated by line breaks + # The headers are a map of header names to values. # The payload is the actual message data. # The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). @@ -92,11 +95,6 @@ defmodule ExAws.S3.Parsers.EventStream do }} ) do stream - # |> Stream.with_index() - # |> Stream.map(fn {chunk, index} -> - # File.write!("ch/chunk_#{index}.bin", chunk) - # chunk - # end) |> buffer_stream() |> Stream.each(&Message.raise_errors!/1) |> Stream.filter(&Message.is_record?/1)