diff --git a/lib/ex_aws/s3.ex b/lib/ex_aws/s3.ex index 7499f5f..0464e39 100644 --- a/lib/ex_aws/s3.ex +++ b/lib/ex_aws/s3.ex @@ -757,6 +757,113 @@ defmodule ExAws.S3 do } end + @type select_object_content_opts :: [ + {:input_serialization, input_serialization} + | {:output_serialization, output_serialization} + | {:scan_range, scan_range} + ] + + @type input_serialization :: + %{csv_input: csv_input} | %{json_input: json_input} | %{parquet_input: %{}} + + @type csv_input :: %{ + file_header_info: :none | :ignore | :use, + comments: binary, + quote_escape_character: binary, + record_delimiter: binary, + field_delimiter: binary, + quote_character: binary, + allow_quoted_record_delimiter: boolean + } + @type json_input :: %{ + type: :document | :lines + } + + @type output_serialization :: + %{csv_output: csv_output} | %{json_output: json_output} + + @type csv_output :: %{ + quote_fields: :always | :as_needed, + quote_escape_character: binary, + record_delimiter: binary, + field_delimiter: binary, + quote_character: binary + } + + @type json_output :: %{ + record_delimiter: binary + } + + @type scan_range :: %{start: pos_integer, end: pos_integer} + + @doc """ + Filters and selects the contents of an Amazon S3 object based on an SQL statement. + + ## Options + * `:input_serialization` - Specifies JSON, CSV, or Parquet as the input serialization format. + each of which has corresponding parameters to describe the format of the object to be retrieved. + * `:output_serialization` - Specifies JSON or CSV as the output serialization format. + Each of which has corresponding parameters to describe the format of the output data. + * `:scan_range` - Specifies the byte range of the object to get the records from. + + More information can be found in the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html). + + + ## Example + ``` + S3.select_object_content( + "my-bucket", + "path/to/file.csv", + "SELECT * FROM S3Object s WHERE s._1 = 'some value'", + input_serialization: %{ + csv_input: %{ + file_header_info: :use, + record_delimiter: "\\n", + field_delimiter: ",", + quote_character: "\\"" + } + }, + output_serialization: %{ + csv_output: %{ + record_delimiter: "\\n", + field_delimiter: ",", + quote_character: "\\"" + } + } + ) |> ExAws.stream!() + ``` + + Note that **this won't start fetching anything immediately** since it returns an Elixir `Stream`. + + ### Streaming into a file + ``` + S3.select_object_content( + "my-bucket", + "path/to/file.csv", + "SELECT * FROM S3Object s WHERE s._1 = 'some value'" + ) |> ExAws.stream!() |> Stream.into(File.stream!("output.csv")) + ``` + """ + @spec select_object_content( + bucket :: binary, + path :: binary, + query :: binary, + opts :: select_object_content_opts + ) :: __MODULE__.SelectObjectContent.t() + def select_object_content( + bucket, + path, + query, + opts \\ [] + ) do + %__MODULE__.SelectObjectContent{ + bucket: bucket, + path: path, + query: query, + opts: opts + } + end + @type upload_opt :: {:max_concurrency, pos_integer} | {:timeout, pos_integer} diff --git a/lib/ex_aws/s3/parsers/event_stream.ex b/lib/ex_aws/s3/parsers/event_stream.ex new file mode 100644 index 0000000..6fa3c42 --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream.ex @@ -0,0 +1,113 @@ +defmodule ExAws.S3.Parsers.EventStream do + @moduledoc false + + # Parses EventStream messages. + + # AWS encodes EventStream messages in binary as follows: + # [ prelude ][ headers ][ payload ][ message-crc ] + # |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| + + # This module parses this information and returns a struct with the prelude, headers and payload. + # The prelude contains the total length of the message, the length of the headers, + # the length of the prelude, the CRC of the message, and the length of the payload. + + # Additionally, this module buffers the stream and parses the messages as they come in. + # Also, stream is transformed such that each item is seperated by line breaks + + # The headers are a map of header names to values. + # The payload is the actual message data. + # The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). + # Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. + + alias ExAws.S3.Parsers.EventStream.Message + alias ExAws.S3.Parsers.EventStream.Prelude + alias ExAws.S3.Parsers.EventStream.Header + require Logger + + defp buffer_stream(stream) do + Stream.transform(stream, {nil, <<>>}, &buffer_stream/2) + end + + defp buffer_stream(chunk, {nil, buffer}) do + new_buffer = buffer <> chunk + {:ok, %Prelude{total_length: total_length} = prelude} = Prelude.parse(new_buffer) + + if total_length == byte_size(new_buffer) do + {:ok, parsed_message} = parse_message(prelude, new_buffer) + {[parsed_message], {nil, <<>>}} + else + buffer_stream(chunk, {prelude, buffer}) + end + end + + defp buffer_stream(chunk, {%Prelude{total_length: total_length} = prelude, buffer}) do + new_buffer = buffer <> chunk + new_buffer_length = byte_size(new_buffer) + + cond do + new_buffer_length < total_length -> + # needs more data. put it in the buffer and wait for more + {[], {prelude, new_buffer}} + + new_buffer_length > total_length -> + # we have more than one message in the buffer. parse the first one and keep the rest in the buffer + <> = new_buffer + {:ok, parsed_message} = parse_message(prelude, payload) + {[parsed_message], {nil, remaining_buffer}} + + new_buffer_length == total_length -> + # we have exactly one message in the buffer. parse it and clear the buffer + <>::binary>> = + new_buffer + + {:ok, parsed_message} = parse_message(prelude, payload) + {[parsed_message], {nil, <<>>}} + end + end + + defp chunk_stream_by_linebreaks(stream) do + Stream.transform(stream, "", fn + chunk, buffer -> + case String.split(buffer <> chunk, "\n") do + lines when length(lines) > 1 -> + last = Enum.at(lines, -1) + rest = Enum.slice(lines, 0..-2) + {rest, last} + + [line] -> + {[], line} + end + end) + end + + def parse_message(prelude, payload_bytes) do + with :ok <- Message.verify_message_crc(prelude, payload_bytes), + {:ok, headers} <- Header.parse(prelude, payload_bytes), + {:ok, payload} <- Message.parse_payload(prelude, payload_bytes) do + {:ok, %Message{prelude: prelude, payload: payload, headers: headers}} + end + end + + def parse_raw_stream( + {:ok, + %{ + stream: stream + }} + ) do + stream + |> buffer_stream() + |> Stream.each(&Message.raise_errors!/1) + |> Stream.filter(&Message.is_record?/1) + |> Stream.map(&Message.get_payload/1) + |> chunk_stream_by_linebreaks() + end + + def parse_raw_stream({:error, {:http_error, _, %{headers: _, status_code: _, stream: stream}}}) do + stream_error = Enum.into(stream, "") + raise "Error parsing stream: #{inspect(stream_error)}" + end + + def parse_raw_stream({:error, error}) do + raise "Error parsing stream: #{inspect(error)}" + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/header.ex b/lib/ex_aws/s3/parsers/event_stream/header.ex new file mode 100644 index 0000000..0e43431 --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/header.ex @@ -0,0 +1,52 @@ +defmodule ExAws.S3.Parsers.EventStream.Header do + @moduledoc false + + # Parses EventStream headers. + + # AWS encodes EventStream headers as follows: + + # [header-name-size][header-name][header-data-type][header-value-size][header-value-data] + # |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| + + # This module parses this information and returns a map of header names - values. + # header-data-type is always 0x07(String) for S3. + + alias ExAws.S3.Parsers.EventStream.Prelude + + def extract_headers(header_bytes) do + do_extract_headers(header_bytes, []) + end + + defp do_extract_headers(<<>>, headers), do: Map.new(headers) + + defp do_extract_headers(<>, headers) do + <>, rest::binary>> = rest + <> = rest + + value_size = :binary.decode_unsigned(value_size_binary, :big) + <> = rest + + do_extract_headers(rest, [{header_name, value} | headers]) + end + + defp extract_header_bytes( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_length), + _payload::binary>> = payload_bytes + + headers_bytes + end + + def parse( + %Prelude{} = prelude, + payload_bytes + ) do + headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers() + {:ok, headers} + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/message.ex b/lib/ex_aws/s3/parsers/event_stream/message.ex new file mode 100644 index 0000000..b64556d --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/message.ex @@ -0,0 +1,63 @@ +defmodule ExAws.S3.Parsers.EventStream.Message do + @moduledoc false + + # Parses EventStream messages. This module parses this information and returns + # a struct with the prelude, headers and payload. Also verifies the message CRC. + + alias ExAws.S3.Parsers.EventStream.Prelude + import Bitwise + require Logger + + defstruct prelude: nil, + headers: nil, + payload: nil + + def verify_message_crc( + %Prelude{ + crc: prelude_crc, + payload_length: payload_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_::binary-size(8), message_bytes::binary-size(payload_length + headers_length + 4), + message_crc_bytes::binary-size(4)>> = payload_bytes + + message_crc = :binary.decode_unsigned(message_crc_bytes, :big) + computed_crc = prelude_crc |> :erlang.crc32(message_bytes) |> band(0xFFFFFFFF) + + if computed_crc == message_crc do + :ok + else + {:error, :message_checksum_mismatch} + end + end + + def parse_payload( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length, + payload_length: payload_length + }, + payload_bytes + ) do + <<_prelude_headers::binary-size(prelude_length + headers_length), + message_bytes::binary-size(payload_length), _::binary>> = payload_bytes + + {:ok, message_bytes} + end + + def is_record?(%__MODULE__{headers: headers}) do + Map.get(headers, ":event-type") == "Records" + end + + def get_payload(%__MODULE__{payload: payload}) do + payload + end + + def raise_errors!(%__MODULE__{headers: headers}) do + if Map.get(headers, ":message-type") == "error" do + raise "Error message received: #{inspect(headers)}" + end + end +end diff --git a/lib/ex_aws/s3/parsers/event_stream/prelude.ex b/lib/ex_aws/s3/parsers/event_stream/prelude.ex new file mode 100644 index 0000000..1246f74 --- /dev/null +++ b/lib/ex_aws/s3/parsers/event_stream/prelude.ex @@ -0,0 +1,88 @@ +defmodule ExAws.S3.Parsers.EventStream.Prelude do + @moduledoc false + # Parses EventStream preludes. + + # AWS encodes EventStream preludes in binary as follows: + + # [ total-length ][headers-length][ prelude crc ] + # |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| + + # This module parses this information and returns a struct with the total length of the message, + # the length of the headers, the length of the prelude, the CRC of the message, + # and the length of the payload. + import Bitwise + + defstruct total_length: nil, + headers_length: nil, + prelude_length: nil, + crc: nil, + payload_length: nil, + prelude_bytes: nil + + @prelude_length 12 + # 128 Kb + @max_header_length 128 * 1024 + # 16 Mb + @max_payload_length 16 * 1024 * 1024 + + defp unpack_prelude( + << + total_length_bytes::binary-size(4), + headers_length_bytes::binary-size(4), + crc::binary-size(4) + >> = prelude_bytes + ) do + total_length = :binary.decode_unsigned(total_length_bytes, :big) + headers_length = :binary.decode_unsigned(headers_length_bytes, :big) + crc = :binary.decode_unsigned(crc, :big) + + {:ok, + %__MODULE__{ + total_length: total_length, + headers_length: headers_length, + prelude_length: @prelude_length, + payload_length: total_length - @prelude_length - headers_length - 4, + crc: crc, + prelude_bytes: prelude_bytes + }} + end + + def validate_prelude( + %__MODULE__{ + headers_length: headers_length, + payload_length: payload_length + } = prelude + ) do + cond do + headers_length > @max_header_length -> + {:error, :invalid_headers_length} + + payload_length > @max_payload_length -> + {:error, :invalid_payload_length} + + true -> + {:ok, prelude} + end + end + + def validate_checksum( + <>, + prelude_checksum + ) do + computed_checksum = prelude_bytes_without_crc |> :erlang.crc32() |> band(0xFFFFFFFF) + + if computed_checksum == prelude_checksum do + :ok + else + {:error, :prelude_checksum_mismatch} + end + end + + def parse(<>) do + with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), + {:ok, prelude} <- validate_prelude(unpacked_prelude), + :ok <- validate_checksum(prelude_bytes, unpacked_prelude.crc) do + {:ok, prelude} + end + end +end diff --git a/lib/ex_aws/s3/select_object_content.ex b/lib/ex_aws/s3/select_object_content.ex new file mode 100644 index 0000000..59795b3 --- /dev/null +++ b/lib/ex_aws/s3/select_object_content.ex @@ -0,0 +1,164 @@ +defmodule ExAws.S3.SelectObjectContent do + @moduledoc """ + Represents the (SelectObjectContent)[https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html] operation. + """ + alias ExAws.S3.Utils + + @enforce_keys ~w(bucket path query)a + defstruct bucket: nil, + path: nil, + query: nil, + opts: [], + service: :s3 + + @type t :: %__MODULE__{} + + def input_params(%{ + csv: csv_input + }) do + csv_params = + %{ + file_header_info: "USE", + record_delimiter: "\n", + field_delimiter: "," + } + |> Map.merge(csv_input) + |> Utils.to_xml() + + "#{csv_params}" + end + + def input_params(%{ + json: json_input + }) do + json_params = + %{ + type: "DOCUMENT" + } + |> Map.merge(json_input) + |> Utils.to_xml() + + "#{json_params}" + end + + def input_params(%{ + parquet: _ + }) do + "" + end + + def output_params(%{ + csv: csv_output + }) do + csv_params = + %{ + record_delimiter: "\n", + field_delimiter: "," + } + |> Map.merge(csv_output) + |> Utils.to_xml() + + "#{csv_params}" + end + + def output_params(%{ + json: json_output + }) do + json_params = + %{ + record_delimiter: "\n" + } + |> Map.merge(json_output) + |> Utils.to_xml() + + "#{json_params}" + end + + def scan_range_params( + %{ + start: _range_start, + end: _range_end + } = scan_range + ) do + ExAws.S3.Utils.to_xml(%{ + scan_range: scan_range + }) + end + + def scan_range_params(_) do + "" + end + + def build_payload( + query, + input_serialization, + output_serialization, + scan_range + ) do + """ + + + #{query} + SQL + + #{input_params(input_serialization)} + + + #{output_params(output_serialization)} + + #{scan_range_params(scan_range)} + + """ + end + + defimpl ExAws.Operation do + alias ExAws.S3.SelectObjectContent + alias ExAws.S3.Parsers.EventStream + + def perform( + _, + _ + ) do + raise "Not implemented. Use stream! instead." + end + + def stream!( + %{ + bucket: bucket, + path: path, + query: query, + opts: opts + }, + config + ) do + input_serialization = opts[:input_serialization] || %{csv: %{}} + output_serialization = opts[:output_serialization] || %{csv: %{}} + scan_range = opts[:scan_range] || nil + + payload = + SelectObjectContent.build_payload( + query, + input_serialization, + output_serialization, + scan_range + ) + + params = %{"select" => "", "select-type" => "2"} + + ExAws.stream!( + %ExAws.Operation.S3{ + http_method: :post, + bucket: bucket, + path: path, + body: payload, + headers: %{}, + resource: "", + params: params, + stream_builder: :octet_stream, + parser: &EventStream.parse_raw_stream/1 + }, + config + ) + end + end +end diff --git a/lib/ex_aws/s3/utils.ex b/lib/ex_aws/s3/utils.ex index 1981943..30f352c 100644 --- a/lib/ex_aws/s3/utils.ex +++ b/lib/ex_aws/s3/utils.ex @@ -345,4 +345,24 @@ defmodule ExAws.S3.Utils do {{datetime.year, datetime.month, datetime.day}, {datetime.hour, datetime.minute, datetime.second}} end + + def to_xml(map) when is_map(map) do + Enum.map(map, fn {key, value} -> + key = key |> Atom.to_string() |> Macro.camelize() + "<#{key}>#{to_xml(value)}" + end) + |> Enum.join() + end + + def to_xml(list) when is_list(list) do + Enum.map(list, fn value -> + "#{to_xml(value)}" + end) + |> Enum.join() + end + + def to_xml(value) when is_atom(value), do: Atom.to_string(value) |> String.upcase() + def to_xml(value) when is_binary(value), do: value + def to_xml(value) when is_integer(value), do: Integer.to_string(value) + def to_xml(value) when is_float(value), do: Float.to_string(value) end diff --git a/test/lib/s3/event_stream/header_test.exs b/test/lib/s3/event_stream/header_test.exs new file mode 100644 index 0000000..392f22f --- /dev/null +++ b/test/lib/s3/event_stream/header_test.exs @@ -0,0 +1,45 @@ +defmodule ExAws.S3.EventStream.HeaderTest do + use ExUnit.Case, async: true + + alias ExAws.S3.Parsers.EventStream.Header + + describe "Header.extract_headers/1" do + test "can extract Record type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 111, 99, + 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, 118, 101, 110, 116, + 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "application/octet-stream", + ":event-type" => "Records" + } + end + + test "can extract Stats type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, 11, 58, 101, 118, 101, 110, + 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "text/xml", + ":event-type" => "Stats" + } + end + + test "can extract End type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, + 0, 3, 69, 110, 100>> + ) == %{ + ":message-type" => "event", + ":event-type" => "End" + } + end + end +end diff --git a/test/lib/s3/event_stream/message_test.exs b/test/lib/s3/event_stream/message_test.exs new file mode 100644 index 0000000..799d3bd --- /dev/null +++ b/test/lib/s3/event_stream/message_test.exs @@ -0,0 +1,88 @@ +defmodule ExAws.S3.EventStream.MessageTest do + use ExUnit.Case, async: true + + alias ExAws.S3.Parsers.EventStream + + describe "Message.parse/1" do + test "parses a binary EventStream Records chunk" do + chunk = + <<0, 0, 1, 81, 0, 0, 0, 85, 176, 12, 79, 204, 13, 58, 109, 101, 115, 115, 97, 103, 101, + 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, + 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, + 105, 111, 110, 47, 111, 99, 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, + 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115, + 123, 34, 73, 68, 34, 58, 34, 49, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 80, 108, 97, + 121, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 50, 34, 44, 34, 78, 97, 109, 101, 34, 58, + 34, 68, 101, 116, 97, 105, 108, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 51, 34, 44, + 34, 78, 97, 109, 101, 34, 58, 34, 68, 105, 114, 101, 99, 116, 111, 114, 34, 125, 10, + 123, 34, 73, 68, 34, 58, 34, 52, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 89, 101, 97, + 114, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 53, 34, 44, 34, 78, 97, 109, 101, 34, 58, + 34, 80, 114, 101, 115, 101, 110, 116, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 54, 34, + 44, 34, 78, 97, 109, 101, 34, 58, 34, 83, 117, 114, 102, 97, 99, 101, 34, 125, 10, 123, + 34, 73, 68, 34, 58, 34, 55, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 65, 34, 125, 10, + 123, 34, 73, 68, 34, 58, 34, 56, 34, 44, 34, 78, 97, 109, 101, 34, 58, 34, 83, 101, 97, + 115, 111, 110, 34, 125, 10, 123, 34, 73, 68, 34, 58, 34, 57, 34, 44, 34, 78, 97, 109, + 101, 34, 58, 34, 70, 105, 114, 101, 34, 125, 10, 66, 109, 206, 248>> + + assert {:ok, + %EventStream.Message{ + prelude: %EventStream.Prelude{ + total_length: 337, + headers_length: 85, + prelude_length: 12, + crc: 2_953_596_876, + payload_length: 236, + prelude_bytes: <<0, 0, 1, 81, 0, 0, 0, 85, 176, 12, 79, 204>> + }, + headers: %{ + ":content-type" => "application/octet-stream", + ":event-type" => "Records", + ":message-type" => "event" + }, + payload: + "{\"ID\":\"1\",\"Name\":\"Play\"}\n{\"ID\":\"2\",\"Name\":\"Detail\"}\n{\"ID\":\"3\",\"Name\":\"Director\"}\n{\"ID\":\"4\",\"Name\":\"Year\"}\n{\"ID\":\"5\",\"Name\":\"Present\"}\n{\"ID\":\"6\",\"Name\":\"Surface\"}\n{\"ID\":\"7\",\"Name\":\"A\"}\n{\"ID\":\"8\",\"Name\":\"Season\"}\n{\"ID\":\"9\",\"Name\":\"Fire\"}\n" + }} = parse(chunk) + end + + test "parses a binary EventStream Stats chunk" do + chunk = + <<0, 0, 0, 240, 0, 0, 0, 67, 194, 195, 159, 30, 13, 58, 109, 101, 115, 115, 97, 103, 101, + 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, + 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, + 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115, + 60, 63, 120, 109, 108, 32, 118, 101, 114, 115, 105, 111, 110, 61, 34, 49, 46, 48, 34, + 32, 101, 110, 99, 111, 100, 105, 110, 103, 61, 34, 85, 84, 70, 45, 56, 34, 63, 62, 60, + 83, 116, 97, 116, 115, 62, 60, 66, 121, 116, 101, 115, 83, 99, 97, 110, 110, 101, 100, + 62, 49, 48, 48, 49, 60, 47, 66, 121, 116, 101, 115, 83, 99, 97, 110, 110, 101, 100, 62, + 60, 66, 121, 116, 101, 115, 80, 114, 111, 99, 101, 115, 115, 101, 100, 62, 49, 48, 48, + 49, 60, 47, 66, 121, 116, 101, 115, 80, 114, 111, 99, 101, 115, 115, 101, 100, 62, 60, + 66, 121, 116, 101, 115, 82, 101, 116, 117, 114, 110, 101, 100, 62, 50, 51, 54, 60, 47, + 66, 121, 116, 101, 115, 82, 101, 116, 117, 114, 110, 101, 100, 62, 60, 47, 83, 116, 97, + 116, 115, 62, 8, 57, 236, 68>> + + assert {:ok, + %EventStream.Message{ + prelude: %EventStream.Prelude{ + total_length: 240, + headers_length: 67, + prelude_length: 12, + crc: 3_267_600_158, + payload_length: 157, + prelude_bytes: <<0, 0, 0, 240, 0, 0, 0, 67, 194, 195, 159, 30>> + }, + headers: %{ + ":content-type" => "text/xml", + ":event-type" => "Stats", + ":message-type" => "event" + }, + payload: + "10011001236" + }} = parse(chunk) + end + end + + defp parse(chunk) do + {:ok, prelude} = EventStream.Prelude.parse(chunk) + EventStream.parse_message(prelude, chunk) + end +end diff --git a/test/lib/s3/select_object_contents/request_test.exs b/test/lib/s3/select_object_contents/request_test.exs new file mode 100644 index 0000000..bf72b89 --- /dev/null +++ b/test/lib/s3/select_object_contents/request_test.exs @@ -0,0 +1,109 @@ +defmodule ExAws.S3.SelectObjectContent.RequestTest do + use ExUnit.Case, async: true + + alias ExAws.S3.SelectObjectContent + + describe "SelectObjectContent.build_payload/4" do + test "default payload" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{csv: %{}}, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n ,\n\n \n \n\n" + end + + test "CSV input" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{ + csv: %{ + file_header_info: "IGNORE", + record_delimiter: "\n", + field_delimiter: ",", + quote_character: "\"", + quote_escape_character: "\"", + comments: "#", + allow_quoted_record_delimiter: false + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n FALSE#,IGNORE\"\"\n\n \n \n ,\n\n \n \n\n" + end + + test "JSON Input" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{ + json: %{ + type: :document, + record_delimiter: "\n" + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n \nDOCUMENT\n \n \n ,\n\n \n \n\n" + end + + test "Parquet Input" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{ + parquet: %{ + record_delimiter: "\n" + } + }, + %{csv: %{}}, + nil + ) == + "\n\n select * from s3object\n SQL\n \n \n \n \n ,\n\n \n \n\n" + end + + test "CSV Output" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{csv: %{}}, + %{ + csv: %{ + record_delimiter: "\n", + field_delimiter: ",", + quote_character: "\"", + quote_escape_character: "\"", + quote_fields: :asneeded, + comments: "#" + } + }, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n #,\"\"ASNEEDED\n\n \n \n\n" + end + + test "JSON Output" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{csv: %{}}, + %{ + json: %{ + record_delimiter: "\n" + } + }, + nil + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n \n\n \n \n\n" + end + + test "Scan Range" do + assert SelectObjectContent.build_payload( + "select * from s3object", + %{csv: %{}}, + %{csv: %{}}, + %{start: 0, end: 100} + ) == + "\n\n select * from s3object\n SQL\n \n ,USE\n\n \n \n ,\n\n \n 1000\n\n" + end + end +end diff --git a/test/lib/s3/select_object_contents/stream_test.exs b/test/lib/s3/select_object_contents/stream_test.exs new file mode 100644 index 0000000..b5a91a4 --- /dev/null +++ b/test/lib/s3/select_object_contents/stream_test.exs @@ -0,0 +1,35 @@ +defmodule ExAws.S3.SelectObjectContent.StreamTest do + use ExUnit.Case, async: true + + import Support.BypassHelpers + alias ExAws.S3 + + describe "integration test" do + setup [:start_bypass] + + test "stream SelectObjectContent results", %{bypass: bypass} do + file = "test.csv" + bucket = "my-bucket" + setup_select_object_contents_backend(bypass, self(), bucket, file) + + bucket + |> S3.select_object_content(file, "select * from s3object") + |> ExAws.stream!(exaws_config_for_bypass(bypass)) + + assert_received :fetched_stream + end + end + + defp setup_select_object_contents_backend(bypass, test_pid, bucket_name, path) do + request_path = "/#{bucket_name}/#{path}" + + Bypass.expect(bypass, fn conn -> + case conn do + %{method: "POST", request_path: ^request_path, query_string: "select=&select-type=2"} -> + send(test_pid, :fetched_stream) + + Plug.Conn.send_resp(conn, 200, []) + end + end) + end +end diff --git a/test/lib/s3/utils_test.exs b/test/lib/s3/utils_test.exs index 376c66b..d8f7bab 100644 --- a/test/lib/s3/utils_test.exs +++ b/test/lib/s3/utils_test.exs @@ -132,4 +132,23 @@ defmodule ExAws.S3.ImplTest do "0000true0prefix/Enabled123" end end + + describe "to_xml/1" do + test "renders a simple map" do + assert %{ + foo: %{ + bar: "baz", + qux: :quux, + nested: %{foo: "bar"}, + num: 10, + float: 10.0, + bool: true, + null: nil, + snake_case: "camelCase" + } + } + |> Utils.to_xml() == + "bazTRUE10.0barNIL10QUUXcamelCase" + end + end end diff --git a/test/lib/s3_test.exs b/test/lib/s3_test.exs index 005b019..ed97cf3 100644 --- a/test/lib/s3_test.exs +++ b/test/lib/s3_test.exs @@ -690,6 +690,44 @@ defmodule ExAws.S3Test do assert url == "https://bucket.custom-domain.com" end + test "#select_object_content with input_serialization, output_serialization, and scan_range" do + expected = %ExAws.S3.SelectObjectContent{ + bucket: "bucket", + opts: [ + {:input_serialization, + %{csv: %{field_delimiter: ",", file_header_info: :use, record_delimiter: "\n"}}}, + {:output_serialization, %{csv: %{field_delimiter: ",", record_delimiter: "\n"}}}, + {:scan_range, %{end: 100, start: 0}} + ], + path: "object", + query: "select * from s3object", + service: :s3 + } + + assert S3.select_object_content( + "bucket", + "object", + "select * from s3object", + input_serialization: %{ + csv: %{ + file_header_info: :use, + record_delimiter: "\n", + field_delimiter: "," + } + }, + output_serialization: %{ + csv: %{ + record_delimiter: "\n", + field_delimiter: "," + } + }, + scan_range: %{ + start: 0, + end: 100 + } + ) == expected + end + @spec assert_pre_signed_url( url, expected_scheme_host_path,