Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configuring default connection query language #80

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v2.3.0-dev

- Enhancements
- The default query language can be configured at the connection level. If no per-query option is given it will be used instead of the default for your `:version` configuration (defaults are `:flux` for `:v2` and `:influxql` for `:v1`)

## v2.2.0 (2022-10-16)

- Enhancements
Expand Down
13 changes: 13 additions & 0 deletions lib/instream/connection/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ defmodule Instream.Connection.Config do
`:query` (InfluxDB v1). If nothing or an invalid value is given the connection
will be made using `:basic` authentication.

## Query Language

By default Instream uses the default query language based on the configured
connection version (`:influxql` for `:v1`, `:flux` for `:v2`).

You can change this default in your configuration:

config :my_app, MyConnection,
query_language: :influxql # or :flux

This configuration will then be used if you are not passing a specific
`:query_language` option for your query.

## Point Writer

If you are using the regular line protocol writer `Instream.Writer.Line`
Expand Down
14 changes: 7 additions & 7 deletions lib/instream/connection/query_runner_v1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ defmodule Instream.Connection.QueryRunnerV1 do
headers = Headers.assemble(config, opts)
http_opts = http_opts(config, opts)

body = read_body(query, opts)
method = read_method(opts)
body = read_body(query, config, opts)
method = read_method(config, opts)
url = read_url(conn, query, opts)

{query_time, response} =
Expand Down Expand Up @@ -208,15 +208,15 @@ defmodule Instream.Connection.QueryRunnerV1 do

defp log(_, _), do: :ok

defp read_body(query, opts) do
case opts[:query_language] do
defp read_body(query, config, opts) do
case opts[:query_language] || config[:query_language] do
:flux -> query
_ -> ""
end
end

defp read_method(opts) do
case opts[:query_language] do
defp read_method(config, opts) do
case opts[:query_language] || config[:query_language] do
:flux -> :post
_ -> opts[:method] || :get
end
Expand All @@ -226,7 +226,7 @@ defmodule Instream.Connection.QueryRunnerV1 do
config = conn.config()
url = URL.query(config, opts)

case opts[:query_language] do
case opts[:query_language] || config[:query_language] do
:flux ->
url

Expand Down
6 changes: 4 additions & 2 deletions lib/instream/connection/query_runner_v2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ defmodule Instream.Connection.QueryRunnerV2 do
defp log(_, _), do: :ok

defp read_body(conn, query, opts) do
case opts[:query_language] do
config = conn.config()

case opts[:query_language] || config[:query_language] do
:influxql ->
""

Expand All @@ -228,7 +230,7 @@ defmodule Instream.Connection.QueryRunnerV2 do
config = conn.config()
url = URL.query(config, opts)

case opts[:query_language] do
case opts[:query_language] || config[:query_language] do
:influxql ->
case opts[:params] do
params when is_map(params) ->
Expand Down
2 changes: 1 addition & 1 deletion lib/instream/query/headers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Instream.Query.Headers do
def assemble(config, options \\ []) do
assemble_auth(config[:auth]) ++
assemble_encoding(options[:result_as]) ++
assemble_language(config[:version], options[:query_language])
assemble_language(config[:version], options[:query_language] || config[:query_language])
end

@doc """
Expand Down
57 changes: 33 additions & 24 deletions lib/instream/query/url.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,12 @@ defmodule Instream.Query.URL do
"""
@spec query(Keyword.t(), Keyword.t()) :: String.t()
def query(config, opts) do
case {config[:version], opts[:query_language]} do
{:v2, :influxql} ->
config
|> url("query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))

{:v2, _} ->
config
|> url("api/v2/query")
|> append_param("org", opts[:org] || config[:org])

{:v1, :flux} ->
config
|> url("api/v2/query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))

{:v1, _} ->
config
|> url("query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))
end
query_url(
config[:version],
opts[:query_language] || config[:query_language],
config,
opts
)
end

@doc """
Expand Down Expand Up @@ -113,6 +95,33 @@ defmodule Instream.Query.URL do
defp encode_precision(:nanosecond), do: "ns"
defp encode_precision(_), do: ""

defp query_url(:v2, :influxql, config, opts) do
config
|> url("query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))
end

defp query_url(:v2, _, config, opts) do
config
|> url("api/v2/query")
|> append_param("org", opts[:org] || config[:org])
end

defp query_url(:v1, :flux, config, opts) do
config
|> url("api/v2/query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))
end

defp query_url(:v1, _, config, opts) do
config
|> url("query")
|> append_param("db", opts[:database] || config[:database])
|> append_param("epoch", encode_precision(opts[:precision]))
end

defp url(config, endpoint) do
url =
[
Expand Down
72 changes: 72 additions & 0 deletions test/influxdb_v1/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ defmodule Instream.InfluxDBv1.ConnectionTest do
]
end

defmodule QueryLanguageConnection do
use Instream.Connection,
otp_app: :instream,
config: [
init: {__MODULE__.Initializer, :init}
]

defmodule Initializer do
def init(conn) do
config =
Keyword.merge(
Application.get_env(:instream, TestConnection),
query_language: :flux
)

Application.put_env(:instream, conn, config)
end
end
end

defmodule TestSeries do
use Instream.Series

Expand Down Expand Up @@ -160,4 +180,56 @@ defmodule Instream.InfluxDBv1.ConnectionTest do
assert @tags == values_tags
assert 0 < length(value_rows)
end

describe "query language from connection config" do
test "use default from config" do
start_supervised!(QueryLanguageConnection)

measurement = "default_query_language_config"

:ok =
QueryLanguageConnection.write([
%{
measurement: measurement,
fields: %{value: 42}
}
])

result =
QueryLanguageConnection.query("""
from(bucket:"test_database/autogen")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "#{measurement}")
|> last()
""")

assert [
%{
"_field" => "value",
"_measurement" => ^measurement,
"_value" => 42,
"result" => "_result"
}
] = result
end

test "override default config" do
start_supervised!(QueryLanguageConnection)

measurement = "default_query_language_override"

:ok =
QueryLanguageConnection.write([
%{
measurement: measurement,
fields: %{value: 42}
}
])

assert %{results: [%{series: [%{name: ^measurement, values: [[_, 42]]}]}]} =
QueryLanguageConnection.query("SELECT LAST(value) FROM #{measurement}",
query_language: :influxql
)
end
end
end
73 changes: 73 additions & 0 deletions test/influxdb_v2/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@ defmodule Instream.InfluxDBv2.ConnectionTest do
]
end

defmodule QueryLanguageConnection do
use Instream.Connection,
otp_app: :instream,
config: [
init: {__MODULE__.Initializer, :init}
]

defmodule Initializer do
def init(conn) do
config =
Keyword.merge(
Application.get_env(:instream, TestConnection),
query_language: :influxql
)

Application.put_env(:instream, conn, config)
end
end
end

defmodule TestSeries do
use Instream.Series

Expand Down Expand Up @@ -167,4 +187,57 @@ defmodule Instream.InfluxDBv2.ConnectionTest do
assert %{results: [%{series: [%{name: "params", values: [[_, ^test_field]]}]}]} =
TestConnection.query(query, query_language: :influxql, params: params)
end

describe "query language from connection config" do
test "use default from config" do
start_supervised!(QueryLanguageConnection)

measurement = "default_query_language_config"

:ok =
QueryLanguageConnection.write([
%{
measurement: measurement,
fields: %{value: 42}
}
])

assert %{results: [%{series: [%{name: ^measurement, values: [[_, 42]]}]}]} =
QueryLanguageConnection.query("SELECT LAST(value) FROM #{measurement}")
end

test "override default config" do
start_supervised!(QueryLanguageConnection)

measurement = "default_query_language_override"

:ok =
QueryLanguageConnection.write([
%{
measurement: measurement,
fields: %{value: 42}
}
])

result =
QueryLanguageConnection.query(
"""
from(bucket:"#{QueryLanguageConnection.config(:bucket)}")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "#{measurement}")
|> last()
""",
query_language: :flux
)

assert [
%{
"_field" => "value",
"_measurement" => ^measurement,
"_value" => 42,
"result" => "_result"
}
] = result
end
end
end