Skip to content

Commit

Permalink
Add WebMachine API for Query
Browse files Browse the repository at this point in the history
Add HTTP API for Query - allowing for PSOT of JSON with JSON response.
  • Loading branch information
martinsumner committed Dec 17, 2024
1 parent 52fd5a0 commit d86226e
Show file tree
Hide file tree
Showing 6 changed files with 881 additions and 23 deletions.
40 changes: 27 additions & 13 deletions src/riak_kv_query.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
add_accumulation_option/2,
add_accumulation_term/2,
add_result_provision/2,
add_queries/3
add_queries/3,
is_query/1
]
).

Expand Down Expand Up @@ -103,7 +104,7 @@
evaluated_query()|
{aggregation_function(), list({aggregation_tag(), evaluated_query()})}.
-type validation_stage() ::
aggregation_function|accumulation_option|accumulation_term|
aggregation_expression|accumulation_option|accumulation_term|
result_provision|query_evaluation.
-type validation_error() ::
{error, validation_stage(), binary()}.
Expand Down Expand Up @@ -152,7 +153,8 @@
evaluated_query/0,
validation_error/0,
query_definition/0,
complex_query_definition/0
complex_query_definition/0,
query_user_input/0
]
).

Expand Down Expand Up @@ -239,33 +241,38 @@ get_r(Query) -> Query#riak_kv_query.r.
get_querytype(Query) -> Query#riak_kv_query.type.

-spec add_aggregation_expression(
complex_query_definition(), string())
complex_query_definition(), string()|undefined)
-> {ok, complex_query_definition()}|validation_error().
add_aggregation_expression(
#riak_kv_query{type = Type} = Query, AggregationString)
when Type == combo_query ->
when Type == combo_query, AggregationString =/= undefined ->
case leveled_setop:generate_setop_function(AggregationString) of
{error, ParseError} ->
?LOG_WARNING(
"Invalid aggregation submitted ~s due to reason ~0p",
[AggregationString, ParseError]
),
{error, aggregation_function, <<"Invalid function">>};
{error, aggregation_expression, <<"Invalid function">>};
AppFunction ->
{
ok,
Query#riak_kv_query{aggregation_expression = AppFunction}
}
end;
add_aggregation_expression(
#riak_kv_query{type = Type} = Query, undefined)
when Type == single_query ->
{ok, Query};
add_aggregation_expression(_Query, _AggregationString) ->
{
error,
aggregation_function,
aggregation_expression,
<<"Attempt to aggregate single query">>
}.

-spec add_accumulation_option(complex_query_definition(), binary()) ->
{ok, complex_query_definition()}|validation_error().
-spec add_accumulation_option(
complex_query_definition(), binary()|undefined) ->
{ok, complex_query_definition()}|validation_error().
add_accumulation_option(
#riak_kv_query{type = Type} = Query,
AccumulationOption)
Expand Down Expand Up @@ -305,6 +312,8 @@ add_accumulation_option(
<<"Unsupported option in combination query">>
}
end;
add_accumulation_option(Query, undefined) ->
{ok, Query};
add_accumulation_option(_Q, BadOption) when is_binary(BadOption) ->
{
error,
Expand All @@ -315,7 +324,7 @@ add_accumulation_option(_Q, BadOption) when is_binary(BadOption) ->
}.

-spec add_accumulation_term(
complex_query_definition(), binary()) ->
complex_query_definition(), binary()|undefined) ->
{ok, complex_query_definition()}|validation_error().
add_accumulation_term(
#riak_kv_query{accumulation_option = AccOpt} = Query,
Expand All @@ -332,6 +341,8 @@ add_accumulation_term(
Query#riak_kv_query{
accumulation_term = AccumulationTerm}
};
add_accumulation_term(Query, undefined) ->
{ok, Query};
add_accumulation_term(
#riak_kv_query{accumulation_option = AccOpt}, AccumulationTerm) ->
{
Expand Down Expand Up @@ -534,6 +545,9 @@ decode_option(<<"term_with_keycount">>) -> term_with_keycount.
get_reqid() ->
erlang:phash2({self(), os:timestamp(), crypto:strong_rand_bytes(2)}).

-spec is_query(complex_query_definition()) -> boolean().
is_query(Query) -> is_record(Query, riak_kv_query).

%%%============================================================================
%%% Test
%%%============================================================================
Expand All @@ -547,16 +561,16 @@ new(Bucket, Type) -> new(Bucket, Type, 60).
bad_aggregation_expression_test() ->
QS = new({<<"Type">>, <<"Bucket">>}, single_query),
?assertMatch(
{error, aggregation_function, <<"Attempt to aggregate single query">>},
{error, aggregation_expression, <<"Attempt to aggregate single query">>},
add_aggregation_expression(QS, <<"$1 UNION $2">>)
),
QC = new(<<"Bucket">>, combo_query),
?assertMatch(
{error, aggregation_function, <<"Invalid function">>},
{error, aggregation_expression, <<"Invalid function">>},
add_aggregation_expression(QC, <<"$1 UNION S2">>)
),
?assertMatch(
{error, aggregation_function, <<"Invalid function">>},
{error, aggregation_expression, <<"Invalid function">>},
add_aggregation_expression(QC, <<"$1 XOR $2">>)
),
{ok, UpdQ} = add_aggregation_expression(QC, <<"$1 UNION $2">>),
Expand Down
8 changes: 7 additions & 1 deletion src/riak_kv_query_buffer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,10 @@ do_flush(#buffer{type = T} = Buffer) ->
-spec aggregate(reply_type(), reply_type()|none) -> reply_type().
aggregate(R, none) ->
R;
aggregate({T, KL}, {T, AggKL}) when T == keys; T == term_with_keys ->
aggregate({T, KL}, {T, AggKL}) when T == keys ->
{T, lists:merge(KL, AggKL)};
aggregate({T, KL}, {T, AggKL}) when T == term_with_keys ->
{T, lists:umerge(KL, AggKL)};
aggregate({T, C}, {T, AggC}) when T == match_count; T == key_count ->
{T, AggC + C};
aggregate({T, TM}, {T, AggTM})
Expand All @@ -262,6 +264,7 @@ reply(#buffer{reply_fun = R}, {T, KL})
reply(#buffer{reply_fun = R}, Result) ->
R(Result).


%%%============================================================================
%%% Test
%%%============================================================================
Expand Down Expand Up @@ -429,6 +432,9 @@ term_with_keys_test() ->
)
},
ok = get_reply(),
{L1DupsSubset, _Rest} = lists:split(10, L1),
A ! {term_with_keys, L1DupsSubset},
ok = get_reply(),

A ! stop,
{term_with_keys, L3} = get_reply(),
Expand Down
8 changes: 2 additions & 6 deletions src/riak_kv_query_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@
-type vnode_id() :: non_neg_integer().
-type vnode_monitor() :: #{vnode_id() => non_neg_integer()}|#{}.

-export_type([results/0]).


%%%============================================================================
%%% API
Expand Down Expand Up @@ -258,7 +260,6 @@ handle_info(
{{ReqID, Vnode}, {From, _B, {keys, Results}}},
#state{req_id = ReqID} = State) ->
riak_kv_vnode:ack_keys(From),
?LOG_INFO("Received result size ~w ack'd to ~0p", [length(Results), From]),
{keys, UpdResults} =
riak_kv_query_buffer:aggregate(
{keys, Results},
Expand Down Expand Up @@ -387,7 +388,6 @@ handle_info(
{RM, lists:sum(maps:values(RM))}
end,
{raw, ClientReqID, ClientPid} = State#state.from,
?LOG_INFO("Returning response ~0p", [Results]),
ClientPid ! {ClientReqID, Results},
log_timings(
UpdTimings,
Expand All @@ -396,10 +396,6 @@ handle_info(
),
{stop, normal, State};
_ ->
?LOG_INFO(
"Query done for ~w remainining ~w",
[Vnode, sets:size(UpdCoverageVnodes)]
),
{
noreply,
State#state{
Expand Down
3 changes: 0 additions & 3 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3588,7 +3588,6 @@ result_fun_ack(Bucket, Sender) ->
riak_core_vnode:reply(Sender, {{self(), Monitor}, Bucket, Items}),
receive
{Monitor, ok} ->
?LOG_INFO("Received Ack at Sender ~0p", [Sender]),
erlang:demonitor(Monitor, [flush]);
{Monitor, stop_fold} ->
erlang:demonitor(Monitor, [flush]),
Expand All @@ -3615,14 +3614,12 @@ stop_fold({Pid, Ref}) ->
%% @private
finish_fun(BufferMod, Sender) ->
fun(Buffer) ->
?LOG_INFO("Finsh fold, send to ~0p", [Sender]),
finish_fold(BufferMod, Buffer, Sender)
end.

%% @private
finish_fold(BufferMod, Buffer, Sender) ->
BufferMod:flush(Buffer),
?LOG_INFO("Results flushed - reply to Sender ~0p with done", [Sender]),
riak_core_vnode:reply(Sender, done).

%% @private
Expand Down
3 changes: 3 additions & 0 deletions src/riak_kv_web.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ raw_dispatch(Name) ->
{Prefix ++ ["buckets", bucket, "index", field, '*'],
riak_kv_wm_index, Props},

{Prefix ++ ["buckets", bucket, "query"],
riak_kv_wm_query, Props},

%% AAE fold URLs
{["cachedtrees", "nvals", nval, "root"],
riak_kv_wm_aaefold, Props},
Expand Down
Loading

0 comments on commit d86226e

Please sign in to comment.