Skip to content
This repository has been archived by the owner on Jan 17, 2025. It is now read-only.

Add dry run support #30

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions lib/req_bigquery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule ReqBigQuery do
alias Req.Request
alias ReqBigQuery.Result

@allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms)a
@allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms dry_run)a
@base_url "https://bigquery.googleapis.com/bigquery/v2"
@max_results 10_000
@use_legacy_sql false
Expand Down Expand Up @@ -47,6 +47,8 @@ defmodule ReqBigQuery do
returns without any results and with the 'jobComplete' flag set to false. The default
value is 10000 milliseconds (10 seconds).

* `:dry_run` - Optional. Specifies whether to run the given query in dry run mode.

If you want to set any of these options when attaching the plugin, pass them as the second argument.

## Examples
Expand All @@ -72,6 +74,7 @@ defmodule ReqBigQuery do
columns: ["title", "views"],
job_id: "job_JDDZKquJWkY7x0LlDcmZ4nMQqshb",
num_rows: 10,
total_bytes_processed: 18161868216,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
Expand Down Expand Up @@ -108,6 +111,7 @@ defmodule ReqBigQuery do
columns: ["year", "views"],
job_id: "job_GXiJvALNsTAoAOJ39Eg3Mw94XMUQ",
num_rows: 7,
total_bytes_processed: 15686357820,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
Expand Down Expand Up @@ -143,6 +147,7 @@ defmodule ReqBigQuery do
|> Map.put(:maxResults, options[:max_results])
|> Map.put(:useLegacySql, options[:use_legacy_sql])
|> Map.put(:timeoutMs, options[:timeout_ms])
|> Map.put(:dryRun, options[:dry_run] || false)

%{request | url: uri}
|> Request.merge_options(auth: {:bearer, token}, json: json)
Expand Down Expand Up @@ -192,13 +197,15 @@ defmodule ReqBigQuery do
"kind" => "bigquery#queryResponse",
"rows" => _rows,
"schema" => %{"fields" => fields},
"totalRows" => num_rows
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
} = initial_response,
request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: initial_response |> rows_stream(request_options) |> decode_rows(fields),
columns: decode_columns(fields)
}
Expand All @@ -209,13 +216,33 @@ defmodule ReqBigQuery do
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalRows" => num_rows
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
end

defp decode_body(
%{
"jobReference" => %{},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: nil,
num_rows: 0,
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
Expand Down
8 changes: 5 additions & 3 deletions lib/req_bigquery/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ defmodule ReqBigQuery.Result do
* `rows` - The result set. A list of lists, each inner list corresponding to a
row, each element in the inner list corresponds to a column;
* `num_rows` - The number of fetched or affected rows;
* `job_id` - The ID of the Google BigQuery's executed job.
* `total_bytes_processed` - The total number of bytes processed for the query;
* `job_id` - The ID of the Google BigQuery's executed job. Returns nil for dry runs.
"""

@type t :: %__MODULE__{
columns: [String.t()],
rows: [[term()] | binary()],
num_rows: non_neg_integer(),
job_id: binary()
total_bytes_processed: non_neg_integer(),
job_id: binary() | nil
}

defstruct [:job_id, num_rows: 0, rows: [], columns: []]
defstruct [:job_id, :total_bytes_processed, num_rows: 0, rows: [], columns: []]
end

if Code.ensure_loaded?(Table.Reader) do
Expand Down
38 changes: 38 additions & 0 deletions test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,49 @@ defmodule IntegrationTest do

assert result.columns == ["en_description"]
assert result.num_rows == 1
assert result.total_bytes_processed == 285_337

rows = Enum.to_list(result.rows)
assert rows == [["fruit of the apple tree"]]
end

test "returns the Google BigQuery's response with total processed bytes for a dry run request",
%{
test: goth
} do
credentials =
System.get_env("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json")
|> File.read!()
|> Jason.decode!()

project_id = System.get_env("PROJECT_ID", credentials["project_id"])

source = {:service_account, credentials, []}
start_supervised!({Goth, name: goth, source: source, http_client: &Req.request/1})

query = """
SELECT en_description
FROM `bigquery-public-data.wikipedia.wikidata`
WHERE id = ?
AND numeric_id = ?
"""

response =
Req.new()
|> ReqBigQuery.attach(project_id: project_id, goth: goth)
|> Req.post!(bigquery: {query, ["Q89", 89]}, dry_run: true)

assert response.status == 200

result = response.body

assert result.total_bytes_processed == 285_337
assert result.columns == ["en_description"]
assert result.num_rows == 0
assert result.job_id == nil
assert result.rows == []
end

test "encodes and decodes types received from Google BigQuery's response", %{
test: goth
} do
Expand Down
84 changes: 79 additions & 5 deletions test/req_bigquery_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule ReqBigQueryTest do
"query" => "select * from iris",
"maxResults" => 10000,
"useLegacySql" => true,
"timeoutMs" => 20000
"timeoutMs" => 20000,
"dryRun" => false
}

assert URI.to_string(request.url) ==
Expand All @@ -42,7 +43,8 @@ defmodule ReqBigQueryTest do
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalRows: "2"
totalRows: "2",
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
Expand Down Expand Up @@ -99,7 +101,8 @@ defmodule ReqBigQueryTest do
],
"useLegacySql" => false,
"maxResults" => 10000,
"timeoutMs" => 10000
"timeoutMs" => 10000,
"dryRun" => true
}

assert URI.to_string(request.url) ==
Expand All @@ -121,7 +124,8 @@ defmodule ReqBigQueryTest do
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalRows: "2"
totalRows: "2",
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
Expand All @@ -130,7 +134,8 @@ defmodule ReqBigQueryTest do
opts = [
goth: ctx.test,
project_id: "my_awesome_project_id",
default_dataset_id: "my_awesome_dataset"
default_dataset_id: "my_awesome_dataset",
dry_run: true
]

assert response =
Expand All @@ -150,6 +155,75 @@ defmodule ReqBigQueryTest do
assert Enum.to_list(response.body.rows) == [[1, "Ale"], [2, "Wojtek"]]
end

test "executes a dry run query", ctx do
fake_goth = fn request ->
data = %{access_token: "dummy", expires_in: 3599, token_type: "Bearer"}
{request, Req.Response.json(data)}
end

start_supervised!(
{Goth,
name: ctx.test,
source: {:service_account, goth_credentials(), []},
http_client: {&Req.request/1, adapter: fake_goth}}
)

fake_bigquery = fn request ->
assert Jason.decode!(request.body) == %{
"defaultDataset" => %{"datasetId" => "my_awesome_dataset"},
"query" => "select * from iris",
"maxResults" => 10000,
"useLegacySql" => true,
"timeoutMs" => 20000,
"dryRun" => true
}

assert URI.to_string(request.url) ==
"https://bigquery.googleapis.com/bigquery/v2/projects/my_awesome_project_id/queries"

assert Req.Request.get_header(request, "content-type") == ["application/json"]
assert Req.Request.get_header(request, "authorization") == ["Bearer dummy"]

data = %{
jobReference: %{},
kind: "bigquery#queryResponse",
schema: %{
fields: [
%{mode: "NULLABLE", name: "id", type: "INTEGER"},
%{mode: "NULLABLE", name: "name", type: "STRING"}
]
},
totalBytesProcessed: "1547899"
}

{request, Req.Response.json(data)}
end

opts = [
goth: ctx.test,
project_id: "my_awesome_project_id",
default_dataset_id: "my_awesome_dataset",
use_legacy_sql: true,
timeout_ms: 20_000,
dry_run: true
]

assert response =
Req.new(adapter: fake_bigquery)
|> ReqBigQuery.attach(opts)
|> Req.post!(bigquery: "select * from iris")

assert response.status == 200

assert %ReqBigQuery.Result{
columns: ["id", "name"],
job_id: nil,
num_rows: 0,
rows: [],
total_bytes_processed: 1_547_899
} = response.body
end

defp goth_credentials do
private_key = :public_key.generate_key({:rsa, 2048, 65_537})

Expand Down
Loading