Skip to content

Commit

Permalink
fix: remap selected fields, don't subquery in aggregate joins
Browse files Browse the repository at this point in the history
  • Loading branch information
zachdaniel committed Jan 20, 2024
1 parent f20a07f commit 445de28
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 44 deletions.
51 changes: 38 additions & 13 deletions lib/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ defmodule AshPostgres.Aggregate do
{:ok, aggregates} ->
query = AshPostgres.DataLayer.default_bindings(query, resource)

{query, aggregates, aggregate_name_mapping} =
{query, aggregates} =
Enum.reduce(
aggregates,
{query, [], %{}},
fn aggregate, {query, aggregates, aggregate_name_mapping} ->
{query, []},
fn aggregate, {query, aggregates} ->
if is_atom(aggregate.name) do
{query, [aggregate | aggregates], aggregate_name_mapping}
{query, [aggregate | aggregates]}
else
{query, name} = use_aggregate_name(query, aggregate.name)

{query, [%{aggregate | name: name} | aggregates],
Map.put(aggregate_name_mapping, name, aggregate.name)}
{query, [%{aggregate | name: name} | aggregates]}
end
end
)
Expand Down Expand Up @@ -75,7 +74,15 @@ defmodule AshPostgres.Aggregate do
{:ok, query, []},
fn {{[first_relationship | relationship_path], join_filters}, aggregates},
{:ok, query, dynamics} ->
first_relationship = Ash.Resource.Info.relationship(resource, first_relationship)
first_relationship =
case Ash.Resource.Info.relationship(resource, first_relationship) do
nil ->
raise "No such relationship for #{inspect(first_relationship)} aggregates #{inspect aggregates}"

first_relationship ->
first_relationship
end

is_single? = match?([_], aggregates)

cond do
Expand Down Expand Up @@ -154,10 +161,9 @@ defmodule AshPostgres.Aggregate do
join_filters
),
agg_root_query <-
Map.update!(
set_in_group(
agg_root_query,
:__ash_bindings__,
&Map.put(&1, :in_group?, true)
resource
),
{:ok, joined} <-
join_all_relationships(
Expand Down Expand Up @@ -221,7 +227,7 @@ defmodule AshPostgres.Aggregate do

case result do
{:ok, query, dynamics} ->
{:ok, add_aggregate_selects(query, dynamics, aggregate_name_mapping)}
{:ok, add_aggregate_selects(query, dynamics)}

{:error, error} ->
{:error, error}
Expand All @@ -232,6 +238,25 @@ defmodule AshPostgres.Aggregate do
end
end

defp set_in_group(%{__ash_bindings__: _} = query, _resource) do
Map.update!(
query,
:__ash_bindings__,
&Map.put(&1, :in_group?, true)
)
end

defp set_in_group(%Ecto.SubQuery{} = subquery, resource) do
subquery = from(row in subquery, [])

subquery
|> AshPostgres.DataLayer.default_bindings(resource)
|> Map.update!(
:__ash_bindings__,
&Map.put(&1, :in_group?, true)
)
end

defp apply_first_relationship_join_filters(
agg_root_query,
query,
Expand Down Expand Up @@ -797,7 +822,7 @@ defmodule AshPostgres.Aggregate do
end)
end

defp add_aggregate_selects(query, dynamics, name_mapping) do
defp add_aggregate_selects(query, dynamics) do
{in_aggregates, in_body} =
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)

Expand All @@ -815,7 +840,7 @@ defmodule AshPostgres.Aggregate do
aggs,
:aggregates,
Map.new(in_aggregates, fn {_, name, dynamic} ->
{name_mapping[name] || name, dynamic}
{name, dynamic}
end)
)
end
Expand Down
46 changes: 46 additions & 0 deletions lib/calculation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ defmodule AshPostgres.Calculation do

require Ecto.Query

@next_calculation_names Enum.reduce(0..999, %{}, fn i, acc ->
Map.put(acc, :"calculation_#{i}", :"calculation_#{i + 1}")
end)

def add_calculations(query, [], _, _, _select?), do: {:ok, query}

def add_calculations(query, calculations, resource, source_binding, select?) do
Expand All @@ -27,6 +31,21 @@ defmodule AshPostgres.Calculation do
end)
|> Enum.uniq()

{query, calculations} =
Enum.reduce(
calculations,
{query, []},
fn {calculation, expression}, {query, calculations} ->
if is_atom(calculation.name) do
{query, [{calculation, expression} | calculations]}
else
{query, name} = use_calculation_name(query, calculation.name)

{query, [{%{calculation | name: name}, expression} | calculations]}
end
end
)

case AshPostgres.Aggregate.add_aggregates(
query,
aggregates,
Expand Down Expand Up @@ -90,6 +109,33 @@ defmodule AshPostgres.Calculation do
end
end

def next_calculation_name(i) do
@next_calculation_names[i] ||
raise Ash.Error.Framework.AssumptionFailed,
message: """
All 1000 static names for calculations have been used in a single query.
Congratulations, this means that you have gone so wildly beyond our imagination
of how much can fit into a single quer. Please file an issue and we will raise the limit.
"""
end

defp use_calculation_name(query, aggregate_name) do
{%{
query
| __ash_bindings__: %{
query.__ash_bindings__
| current_calculation_name:
next_calculation_name(query.__ash_bindings__.current_calculation_name),
calculation_names:
Map.put(
query.__ash_bindings__.calculation_names,
aggregate_name,
query.__ash_bindings__.current_calculation_name
)
}
}, query.__ash_bindings__.current_calculation_name}
end

defp add_calculation_selects(query, dynamics) do
{in_calculations, in_body} =
Enum.split_with(dynamics, fn {load, _name, _dynamic} -> is_nil(load) end)
Expand Down
44 changes: 39 additions & 5 deletions lib/data_layer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,15 @@ defmodule AshPostgres.DataLayer do

@impl true
def run_query(query, resource) do
query = default_bindings(query, resource)

if AshPostgres.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
raise_table_error!(resource, :read)
else
repo = dynamic_repo(resource, query)

with_savepoint(repo, query, fn ->
{:ok, repo.all(query, repo_opts(nil, nil, resource))}
{:ok, repo.all(query, repo_opts(nil, nil, resource)) |> remap_mapped_fields(query)}
end)
end
rescue
Expand Down Expand Up @@ -916,24 +918,54 @@ defmodule AshPostgres.DataLayer do
root_data,
path
) do
{:ok, query} ->
{:ok, lateral_join_query} ->
source_resource =
path
|> Enum.at(0)
|> elem(0)
|> Map.get(:resource)

{:ok,
dynamic_repo(source_resource, query).all(
query,
dynamic_repo(source_resource, lateral_join_query).all(
lateral_join_query,
repo_opts(nil, nil, source_resource)
)}
)
|> remap_mapped_fields(query)}

{:error, error} ->
{:error, error}
end
end

defp remap_mapped_fields(results, query) do
calculation_names = query.__ash_bindings__.calculation_names
aggregate_names = query.__ash_bindings__.aggregate_names

if Enum.empty?(calculation_names) and Enum.empty?(aggregate_names) do
results
else
Enum.map(results, fn result ->
result
|> remap(:calculations, calculation_names)
|> remap(:aggregates, aggregate_names)
end)
end
end

defp remap(record, _subfield, mapping) when mapping == %{} do
record
end

defp remap(record, subfield, mapping) do
Map.update!(record, subfield, fn subfield_values ->
Enum.reduce(mapping, subfield_values, fn {dest, source}, subfield_values ->
subfield_values
|> Map.put(dest, Map.get(subfield_values, source))
|> Map.delete(source)
end)
end)
end

defp lateral_join_query(
query,
root_data,
Expand Down Expand Up @@ -2778,7 +2810,9 @@ defmodule AshPostgres.DataLayer do
parent_resources: [],
aggregate_defs: %{},
current_aggregate_name: :aggregate_0,
current_calculation_name: :calculation_0,
aggregate_names: %{},
calculation_names: %{},
context: context,
bindings: %{start_bindings => %{path: [], type: :root, source: resource}}
})
Expand Down
30 changes: 28 additions & 2 deletions lib/expr.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule AshPostgres.Expr do
Lazy,
Length,
Now,
Round,
StringJoin,
StringLength,
StringSplit,
Expand Down Expand Up @@ -1170,6 +1171,31 @@ defmodule AshPostgres.Expr do
end
end

defp do_dynamic_expr(
query,
%Round{arguments: [num | rest], embedded?: pred_embedded?},
bindings,
embedded?,
acc,
_type
) do
precision = Enum.at(rest, 0) || 1

frag =
%Fragment{
embedded?: pred_embedded?,
arguments: [
raw: "ROUND(",
expr: num,
raw: ", ",
expr: precision,
raw: ")"
]
}

do_dynamic_expr(query, frag, bindings, pred_embedded? || embedded?, acc)
end

defp do_dynamic_expr(
query,
%Type{arguments: [arg1, arg2, constraints]},
Expand Down Expand Up @@ -1646,7 +1672,7 @@ defmodule AshPostgres.Expr do
if is_list(other) do
list_expr(query, other, bindings, true, acc, type)
else
raise "Unsupported expression in AshPostgres query: #{inspect(other)}"
raise "Unsupported expression in AshPostgres query: #{inspect(other, structs: false)}"
end
else
maybe_sanitize_list(query, other, bindings, true, acc, type)
Expand Down Expand Up @@ -1677,7 +1703,7 @@ defmodule AshPostgres.Expr do
if is_list(value) do
list_expr(query, value, bindings, false, acc, type)
else
raise "Unsupported expression in AshPostgres query: #{inspect(value)}"
raise "Unsupported expression in AshPostgres query: #{inspect(value, structs: false)}"
end
else
case maybe_sanitize_list(query, value, bindings, true, acc, type) do
Expand Down
32 changes: 8 additions & 24 deletions lib/join.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,31 +198,15 @@ defmodule AshPostgres.Join do
end)
end

# defp expand_join_paths(joins) do
# Enum.flat_map(joins, fn {type, path} ->
# path
# |> sub_paths()
# |> Enum.map(&add_relationship_filter_paths/1)
# end)
# end

# defp add_relationship_filter_paths(path) do
# last = List.last(path)
# prefix = :lists.droplast(path)

# end

# defp sub_paths(path) do
# Enum.map(1..Enum.count(path), fn i ->
# Enum.take(path, i)
# end)
# end

def relationship_path_to_relationships(resource, path, acc \\ [])
def relationship_path_to_relationships(_resource, [], acc), do: Enum.reverse(acc)

def relationship_path_to_relationships(resource, [relationship | rest], acc) do
relationship = Ash.Resource.Info.relationship(resource, relationship)
def relationship_path_to_relationships(resource, [name | rest], acc) do
relationship = Ash.Resource.Info.relationship(resource, name)

if !relationship do
raise "no such relationship #{inspect resource}.#{name}"
end

relationship_path_to_relationships(relationship.destination, rest, [relationship | acc])
end
Expand Down Expand Up @@ -320,10 +304,10 @@ defmodule AshPostgres.Join do
end
|> case do
{:ok, query, acc} ->
if Enum.empty?(query.joins) do
if Enum.empty?(query.joins) || is_subquery? do
{:ok, query, acc}
else
{:ok, subquery(query), acc}
{:ok, (from row in subquery(query), []) |> Map.put(:__ash_bindings__, query.__ash_bindings__), acc}
end

{:error, error} ->
Expand Down

0 comments on commit 445de28

Please sign in to comment.