diff --git a/src/riak_kv_query.erl b/src/riak_kv_query.erl index d84a30e60..92bb67a1f 100644 --- a/src/riak_kv_query.erl +++ b/src/riak_kv_query.erl @@ -40,7 +40,8 @@ add_accumulation_option/2, add_accumulation_term/2, add_result_provision/2, - add_queries/3 + add_queries/3, + is_query/1 ] ). @@ -103,7 +104,7 @@ evaluated_query()| {aggregation_function(), list({aggregation_tag(), evaluated_query()})}. -type validation_stage() :: - aggregation_function|accumulation_option|accumulation_term| + aggregation_expression|accumulation_option|accumulation_term| result_provision|query_evaluation. -type validation_error() :: {error, validation_stage(), binary()}. @@ -152,7 +153,8 @@ evaluated_query/0, validation_error/0, query_definition/0, - complex_query_definition/0 + complex_query_definition/0, + query_user_input/0 ] ). @@ -239,33 +241,38 @@ get_r(Query) -> Query#riak_kv_query.r. get_querytype(Query) -> Query#riak_kv_query.type. -spec add_aggregation_expression( - complex_query_definition(), string()) + complex_query_definition(), string()|undefined) -> {ok, complex_query_definition()}|validation_error(). add_aggregation_expression( #riak_kv_query{type = Type} = Query, AggregationString) - when Type == combo_query -> + when Type == combo_query, AggregationString =/= undefined -> case leveled_setop:generate_setop_function(AggregationString) of {error, ParseError} -> ?LOG_WARNING( "Invalid aggregation submitted ~s due to reason ~0p", [AggregationString, ParseError] ), - {error, aggregation_function, <<"Invalid function">>}; + {error, aggregation_expression, <<"Invalid function">>}; AppFunction -> { ok, Query#riak_kv_query{aggregation_expression = AppFunction} } end; +add_aggregation_expression( + #riak_kv_query{type = Type} = Query, undefined) + when Type == single_query -> + {ok, Query}; add_aggregation_expression(_Query, _AggregationString) -> { error, - aggregation_function, + aggregation_expression, <<"Attempt to aggregate single query">> }. --spec add_accumulation_option(complex_query_definition(), binary()) -> - {ok, complex_query_definition()}|validation_error(). +-spec add_accumulation_option( + complex_query_definition(), binary()|undefined) -> + {ok, complex_query_definition()}|validation_error(). add_accumulation_option( #riak_kv_query{type = Type} = Query, AccumulationOption) @@ -305,6 +312,8 @@ add_accumulation_option( <<"Unsupported option in combination query">> } end; +add_accumulation_option(Query, undefined) -> + {ok, Query}; add_accumulation_option(_Q, BadOption) when is_binary(BadOption) -> { error, @@ -315,7 +324,7 @@ add_accumulation_option(_Q, BadOption) when is_binary(BadOption) -> }. -spec add_accumulation_term( - complex_query_definition(), binary()) -> + complex_query_definition(), binary()|undefined) -> {ok, complex_query_definition()}|validation_error(). add_accumulation_term( #riak_kv_query{accumulation_option = AccOpt} = Query, @@ -332,6 +341,8 @@ add_accumulation_term( Query#riak_kv_query{ accumulation_term = AccumulationTerm} }; +add_accumulation_term(Query, undefined) -> + {ok, Query}; add_accumulation_term( #riak_kv_query{accumulation_option = AccOpt}, AccumulationTerm) -> { @@ -534,6 +545,9 @@ decode_option(<<"term_with_keycount">>) -> term_with_keycount. get_reqid() -> erlang:phash2({self(), os:timestamp(), crypto:strong_rand_bytes(2)}). +-spec is_query(complex_query_definition()) -> boolean(). +is_query(Query) -> is_record(Query, riak_kv_query). + %%%============================================================================ %%% Test %%%============================================================================ @@ -547,16 +561,16 @@ new(Bucket, Type) -> new(Bucket, Type, 60). bad_aggregation_expression_test() -> QS = new({<<"Type">>, <<"Bucket">>}, single_query), ?assertMatch( - {error, aggregation_function, <<"Attempt to aggregate single query">>}, + {error, aggregation_expression, <<"Attempt to aggregate single query">>}, add_aggregation_expression(QS, <<"$1 UNION $2">>) ), QC = new(<<"Bucket">>, combo_query), ?assertMatch( - {error, aggregation_function, <<"Invalid function">>}, + {error, aggregation_expression, <<"Invalid function">>}, add_aggregation_expression(QC, <<"$1 UNION S2">>) ), ?assertMatch( - {error, aggregation_function, <<"Invalid function">>}, + {error, aggregation_expression, <<"Invalid function">>}, add_aggregation_expression(QC, <<"$1 XOR $2">>) ), {ok, UpdQ} = add_aggregation_expression(QC, <<"$1 UNION $2">>), diff --git a/src/riak_kv_query_buffer.erl b/src/riak_kv_query_buffer.erl index 8ac105cc7..d81339585 100644 --- a/src/riak_kv_query_buffer.erl +++ b/src/riak_kv_query_buffer.erl @@ -247,8 +247,10 @@ do_flush(#buffer{type = T} = Buffer) -> -spec aggregate(reply_type(), reply_type()|none) -> reply_type(). aggregate(R, none) -> R; -aggregate({T, KL}, {T, AggKL}) when T == keys; T == term_with_keys -> +aggregate({T, KL}, {T, AggKL}) when T == keys -> {T, lists:merge(KL, AggKL)}; +aggregate({T, KL}, {T, AggKL}) when T == term_with_keys -> + {T, lists:umerge(KL, AggKL)}; aggregate({T, C}, {T, AggC}) when T == match_count; T == key_count -> {T, AggC + C}; aggregate({T, TM}, {T, AggTM}) @@ -262,6 +264,7 @@ reply(#buffer{reply_fun = R}, {T, KL}) reply(#buffer{reply_fun = R}, Result) -> R(Result). + %%%============================================================================ %%% Test %%%============================================================================ @@ -429,6 +432,9 @@ term_with_keys_test() -> ) }, ok = get_reply(), + {L1DupsSubset, _Rest} = lists:split(10, L1), + A ! {term_with_keys, L1DupsSubset}, + ok = get_reply(), A ! stop, {term_with_keys, L3} = get_reply(), diff --git a/src/riak_kv_query_server.erl b/src/riak_kv_query_server.erl index 9a4f6b69f..593cb8d7b 100644 --- a/src/riak_kv_query_server.erl +++ b/src/riak_kv_query_server.erl @@ -139,6 +139,8 @@ -type vnode_id() :: non_neg_integer(). -type vnode_monitor() :: #{vnode_id() => non_neg_integer()}|#{}. +-export_type([results/0]). + %%%============================================================================ %%% API @@ -258,7 +260,6 @@ handle_info( {{ReqID, Vnode}, {From, _B, {keys, Results}}}, #state{req_id = ReqID} = State) -> riak_kv_vnode:ack_keys(From), - ?LOG_INFO("Received result size ~w ack'd to ~0p", [length(Results), From]), {keys, UpdResults} = riak_kv_query_buffer:aggregate( {keys, Results}, @@ -387,7 +388,6 @@ handle_info( {RM, lists:sum(maps:values(RM))} end, {raw, ClientReqID, ClientPid} = State#state.from, - ?LOG_INFO("Returning response ~0p", [Results]), ClientPid ! {ClientReqID, Results}, log_timings( UpdTimings, @@ -396,10 +396,6 @@ handle_info( ), {stop, normal, State}; _ -> - ?LOG_INFO( - "Query done for ~w remainining ~w", - [Vnode, sets:size(UpdCoverageVnodes)] - ), { noreply, State#state{ diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 7fe2cbf51..dbc6e3683 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -3588,7 +3588,6 @@ result_fun_ack(Bucket, Sender) -> riak_core_vnode:reply(Sender, {{self(), Monitor}, Bucket, Items}), receive {Monitor, ok} -> - ?LOG_INFO("Received Ack at Sender ~0p", [Sender]), erlang:demonitor(Monitor, [flush]); {Monitor, stop_fold} -> erlang:demonitor(Monitor, [flush]), @@ -3615,14 +3614,12 @@ stop_fold({Pid, Ref}) -> %% @private finish_fun(BufferMod, Sender) -> fun(Buffer) -> - ?LOG_INFO("Finsh fold, send to ~0p", [Sender]), finish_fold(BufferMod, Buffer, Sender) end. %% @private finish_fold(BufferMod, Buffer, Sender) -> BufferMod:flush(Buffer), - ?LOG_INFO("Results flushed - reply to Sender ~0p with done", [Sender]), riak_core_vnode:reply(Sender, done). %% @private diff --git a/src/riak_kv_web.erl b/src/riak_kv_web.erl index 848386b2c..cc5dc80f8 100644 --- a/src/riak_kv_web.erl +++ b/src/riak_kv_web.erl @@ -120,6 +120,9 @@ raw_dispatch(Name) -> {Prefix ++ ["buckets", bucket, "index", field, '*'], riak_kv_wm_index, Props}, + {Prefix ++ ["buckets", bucket, "query"], + riak_kv_wm_query, Props}, + %% AAE fold URLs {["cachedtrees", "nvals", nval, "root"], riak_kv_wm_aaefold, Props}, diff --git a/src/riak_kv_wm_query.erl b/src/riak_kv_wm_query.erl new file mode 100644 index 000000000..e2af16de8 --- /dev/null +++ b/src/riak_kv_wm_query.erl @@ -0,0 +1,842 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2016 Basho Technologies, Inc. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Webmachine resource for running queries on secondary indexes. +%% +%% Available operations: +%% +%% ``` +%% POST types/BucketType/buckets/Bucket/query +%% ``` +%% +%% The query should be posted as the HTTP body, where there are the following +%% JSON keys at the root of the document +%% +%% - aggregation_expression (optional) +%% If multiple queries are to be run, the aggregation expression is used to +%% inform the database how those results should be combined, using $1, $2 etc +%% to refer to the numeric aggregation_tag for each query - with the key words +%% UNION, INTERSECT and SUBTRACT to show how the sets of results are to be +%% combined. Parenthesis may be used for clarity. +%% e.g. ($1 INTERSECT $2) UNION ($3 SUBTRACT $1) +%% +%% - accumulation_option (optional - default = keys) +%% There are six options for accumulating the results from a single query: +%% keys (return a list of keys), term_with_keys (return a list of term/key +%% tuples), match_count (return a count of the matches made), key_count (return +%% a count of unique keys matched), term_with_matchcount/term_with_keycount +%% (return a map of term to either count of matches, or count of unique keys). +%% If an aggregation_expression is used, only keys, key_count and match_count +%% can be used. +%% +%% - accumulation_term (optional - default = $term) +%% When using an accumulation option of term_with_keys, term_with_matchcount or +%% term_with_keycount which term in the evaluated index term should be used. +%% The defualt is $term - the whole term. However a sub-term extracted in the +%% evaluation expression may be used instead. +%% +%% - result_provision (not yet implemented) +%% +%% - max_results (not yet implemented) +%% +%% - term_rate_kpersec (not yet implemented) +%% +%% - substitutions (optional) +%% A array of key/value pairs that match string that are referred to in queries +%% to substitution values that should replace those keys in the query. +%% e.g. {"low_dob" : "19550301", "high_dob" : "19560630"} can be passed as +%% substitutions to populate an evaluation of +%% "$dob" BETWEEN ":low_dob" AND ":high_dob" +%% +%% - timeout (optional) +%% The timeout in seconds to wait for the query to complete +%% +%% - query_list +%% A list of queries (should be a list of just one query if an +%% aggregation_expression is not used). +%% Each query has the following parts: +%% - aggregation_tag (optional unless an aggregation_expression is used) +%% - index_name (should be a binary index) +%% - start_term +%% - end_term +%% - regular expression (optional alternative to using evaluation or filter +%% expressions) +%% - evaluation_expression (optional, an expression to extract projected +%% attributes from the term) +%% - filter_expression (optional, an expression to filter results based on +%% those projected attributes) + +-module(riak_kv_wm_query). + +-include_lib("webmachine/include/webmachine.hrl"). +-include("riak_kv_wm_raw.hrl"). + +%% webmachine resource exports +-export([ + init/1, + service_available/2, + is_authorized/2, + forbidden/2, + allowed_methods/2, + malformed_request/2, + resource_exists/2, + process_post/2 +]). + +-record(ctx, { + client, %% riak_client() - the store client + riak, %% local | {node(), atom()} - params for riak client + bucket_type, %% Bucket type (from uri) + query, %% The query.. + security, %% security context + accumulation_option + }). +-type context() :: #ctx{}. +-type request_data() :: #wm_reqdata{}. + +-define(AGGREGATION_EXPRESSION, <<"aggregation_expression">>). +-define(ACCUMULATION_OPTION, <<"accumulation_option">>). +-define(ACCUMULATION_TERM, <<"accumulation_term">>). +-define(SUBSTITUTIONS, <<"substitutions">>). +-define(TIMEOUT, <<"timeout">>). +-define(QUERY_LIST, <<"query_list">>). +-define(QL_AGGREGATION_TAG, <<"aggregation_tag">>). +-define(QL_INDEX_NAME, <<"index_name">>). +-define(QL_START_TERM, <<"start_term">>). +-define(QL_END_TERM, <<"end_term">>). +-define(QL_REGULAR_EXPRESSION, <<"regular_expression">>). +-define(QL_EVALUATION_EXPRESSION, <<"evaluation_expression">>). +-define(QL_FILTER_EXPRESSION, <<"filter_expression">>). + +-define(ACCKEY_KEYS, <<"keys">>). +-define(ACCKEY_TERMKEYS, <<"term_with_keys">>). +-define(ACCKEY_KEYCOUNT, <<"key_count">>). +-define(ACCKEY_MATCHCOUNT, <<"match_count">>). +-define(ACCKEY_TERMMATCHCOUNT, <<"term_with_matchcount">>). +-define(ACCKEY_TERMKEYCOUNT, <<"term_with_keycount">>). + +-define(REQUIRED_KEYS, [?QUERY_LIST]). +-define(POSSIBLE_KEYS, + [ + ?AGGREGATION_EXPRESSION, + ?ACCUMULATION_OPTION, + ?ACCUMULATION_TERM, + ?SUBSTITUTIONS, + ?TIMEOUT, + ?QUERY_LIST + ] +). +-define(REQUIRED_QL_KEYS, + [ + ?QL_INDEX_NAME, + ?QL_START_TERM, + ?QL_END_TERM + ] +). +-define(POSSIBLE_QL_KEYS, + [ + ?QL_AGGREGATION_TAG, + ?QL_INDEX_NAME, + ?QL_START_TERM, + ?QL_END_TERM, + ?QL_REGULAR_EXPRESSION, + ?QL_EVALUATION_EXPRESSION, + ?QL_FILTER_EXPRESSION + ] +). + +-define(REQUEST_CLASS, {riak_kv, secondary_index}). + +-define(QUERY_TIMEOUT, 60). + +-type query_map() :: + #{binary() => binary()|non_neg_integer()|list(map())}. + + +-spec init(proplists:proplist()) -> {ok, context()}. +%% @doc Initialize this resource. +init(Props) -> + {ok, #ctx{ + riak=proplists:get_value(riak, Props), + bucket_type=proplists:get_value(bucket_type, Props) + }}. + + +-spec service_available(request_data(), context()) -> + {boolean(), request_data(), context()}. +%% @doc Determine whether or not a connection to Riak +%% can be established. Also, extract query params. +service_available(RD, Ctx0=#ctx{riak=RiakProps}) -> + Ctx = riak_kv_wm_utils:ensure_bucket_type(RD, Ctx0, #ctx.bucket_type), + case riak_kv_wm_utils:get_riak_client(RiakProps, riak_kv_wm_utils:get_client_id(RD)) of + {ok, C} -> + {true, RD, Ctx#ctx { client=C }}; + Error -> + {false, + wrq:set_resp_body( + io_lib:format("Unable to connect to Riak: ~p~n", [Error]), + wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD)), + Ctx} + end. + +resource_exists(RD, #ctx{bucket_type=BType}=Ctx) -> + {riak_kv_wm_utils:bucket_type_exists(BType), RD, Ctx}. + +-spec is_authorized(request_data(), context()) -> + {true | string() | {halt, 426}, request_data(), context()}. +is_authorized(ReqData, Ctx) -> + case riak_api_web_security:is_authorized(ReqData) of + false -> + {"Basic realm=\"Riak\"", ReqData, Ctx}; + {true, SecContext} -> + {true, ReqData, Ctx#ctx{security=SecContext}}; + insecure -> + %% XXX 301 may be more appropriate here, but since the http and + %% https port are different and configurable, it is hard to figure + %% out the redirect URL to serve. + { + {halt, 426}, + wrq:append_to_resp_body( + <<"Security is enabled and " + "Riak does not accept credentials over HTTP. Try HTTPS " + "instead.">>, + ReqData + ), + Ctx + } + end. + +-spec forbidden(request_data(), context()) + -> {boolean(), request_data(), context()}. +forbidden(ReqDataIn, #ctx{security = undefined} = Context) -> + riak_kv_wm_utils:is_forbidden(ReqDataIn, ?REQUEST_CLASS, Context); +forbidden(ReqDataIn, #ctx{bucket_type = BT, security = Sec} = Context) -> + {Answer, ReqData, _} = Result = + riak_kv_wm_utils:is_forbidden(ReqDataIn, ?REQUEST_CLASS, Context), + case Answer of + false -> + Bucket = erlang:list_to_binary( + riak_kv_wm_utils:maybe_decode_uri( + ReqData, wrq:path_info(bucket, ReqData))), + case riak_core_security:check_permission( + {"riak_kv.index", {BT, Bucket}}, Sec) of + {false, Error, _} -> + {true, + wrq:append_to_resp_body( + unicode:characters_to_binary(Error, utf8, utf8), + wrq:set_resp_header( + "Content-Type", "text/plain", ReqData)), + Context}; + {true, _} -> + {false, ReqData, Context} + end; + _ -> + Result + end. + +-spec allowed_methods( + request_data(), context()) -> {list(atom()), request_data(), context()}. +allowed_methods(RD, Ctx) -> + {['POST'], RD, Ctx}. + +-spec malformed_request( + request_data(), context()) -> + {boolean(), request_data(), context()}. +malformed_request(RD, Ctx) -> + Bucket = + list_to_binary( + riak_kv_wm_utils:maybe_decode_uri(RD, wrq:path_info(bucket, RD) + ) + ), + BT = riak_kv_wm_utils:maybe_bucket_type(Ctx#ctx.bucket_type, Bucket), + Body = riak_kv_wm_utils:accept_value("application/json", wrq:req_body(RD)), + case decode_json_body(Body) of + {ok, QueryMap} -> + case check_keys(maps:keys(QueryMap), request) of + ok -> + QueryList = maps:get(?QUERY_LIST, QueryMap), + case check_querylist(QueryList, false) of + ok -> + case make_query(BT, QueryMap) of + {ok, Query} -> + {false, RD, Ctx#ctx{query = Query}}; + {error, Stage, Reason} -> + { + true, + return_json_error( + expand_query_reason(Stage, Reason), + RD + ), + Ctx + } + end; + {error, Reason} -> + {true, return_json_error(Reason, RD), Ctx} + end; + {error, Reason} -> + {true, return_json_error(Reason, RD), Ctx} + end; + {error, Reason} -> + {true, return_json_error(Reason, RD), Ctx} + end. + +expand_query_reason(Stage, Reason) -> + lists:flatten( + io_lib:format( + << "Validation failure at stage ~0p due to ~s">>, + [Stage, Reason] + ) + ). + +-spec return_json_error(string(), request_data()) -> request_data(). +return_json_error(Reason, RD) -> + wrq:append_to_resp_body( + riak_kv_wm_json:encode(#{error => Reason}), + wrq:set_resp_header( + ?HEAD_CTYPE, "application/json", RD + ) + ). + +-spec decode_json_body(binary()) -> {ok, map()}| {error, term()}. +decode_json_body(JsonBody) -> + try + DecodedBody = riak_kv_wm_json:decode(JsonBody), + {ok, DecodedBody} + catch + error:Reason -> + ExpandedReason = + lists:flatten( + io_lib:format( + <<"Malformed json request - ~0p">>, + [Reason] + ) + ), + {error, ExpandedReason} + end. + +check_querylist([], true) -> + ok; +check_querylist([], false) -> + {error, <<"No valid query provided">>}; +check_querylist([HdQuery|Rest], _AtLeastOne) -> + case check_keys(maps:keys(HdQuery), query) of + ok -> + check_querylist(Rest, true); + Error -> + Error + end. + +check_keys(Keys, request) -> + check_keys(Keys, ?REQUIRED_KEYS, ?POSSIBLE_KEYS); +check_keys(Keys, query) -> + check_keys(Keys, ?REQUIRED_QL_KEYS, ?POSSIBLE_QL_KEYS). + +-spec check_keys( + list(binary()), list(binary()), list(binary())) -> ok|{error, string()}. +check_keys(Keys, RequiredKeys, PossibleKeys) -> + RequiredKeyList = + lists:filter( + fun(K) -> lists:member(K, Keys) end, + RequiredKeys + ), + PossibleKeyList = + lists:filter( + fun(K) -> lists:member(K, PossibleKeys) end, + Keys + ), + case RequiredKeyList of + RequiredKeys -> + case PossibleKeyList of + Keys -> + ok; + NotAllKeys -> + ExtraKeys = lists:subtract(Keys, NotAllKeys), + { + error, + lists:flatten( + io_lib:format( + <<"Unexpected keys in request ~0p">>, + [ExtraKeys] + ) + ) + } + end; + NotAllRequiredKeys -> + MissingKeys = lists:subtract(RequiredKeys, NotAllRequiredKeys), + { + error, + lists:flatten( + io_lib:format( + <<"Missing required keys in request ~0p">>, + [MissingKeys] + ) + ) + } + end. + +-spec make_query( + riak_object:bucket(), query_map()) -> + {ok, riak_kv_query:complex_query_definition()}|riak_kv_query:validation_error(). +make_query(BucketType, QueryMap) -> + Timeout = + maps:get( + ?TIMEOUT, + QueryMap, + application:get_env(riak_kv, query_timeout_secs, ?QUERY_TIMEOUT) + ), + case Timeout of + T when is_integer(T), T > 0 -> + InitQuery = + case maps:get(?QUERY_LIST, QueryMap) of + QueryList when length(QueryList) == 1 -> + riak_kv_query:new(BucketType, single_query, Timeout); + QueryList when length(QueryList) > 1 -> + riak_kv_query:new(BucketType, combo_query, Timeout) + end, + case make_accumulation(QueryMap, InitQuery) of + {ok, Q1} -> + AggExpr = + maps:get( + ?AGGREGATION_EXPRESSION, QueryMap, undefined + ), + case riak_kv_query:add_aggregation_expression(Q1, AggExpr) of + {ok, Q2} -> + Subs = + maps:get(?SUBSTITUTIONS, QueryMap, maps:new()), + riak_kv_query:add_queries( + Q2, + lists:map(fun convert_query/1, QueryList), + Subs + ); + Error -> + Error + end; + Error -> + Error + end; + _ -> + {error, init, <<"Bad timeout">>} + end. + +-spec make_accumulation( + query_map(), riak_kv_query:complex_query_definition()) + -> + {ok, riak_kv_query:complex_query_definition()} | + riak_kv_query:validation_error(). +make_accumulation(QueryMap, InitQuery) -> + AccOpt = maps:get(?ACCUMULATION_OPTION, QueryMap, undefined), + AccTerm = maps:get(?ACCUMULATION_TERM, QueryMap, undefined), + case riak_kv_query:add_accumulation_option(InitQuery, AccOpt) of + {ok, UpdQuery} -> + riak_kv_query:add_accumulation_term(UpdQuery, AccTerm); + Error -> + Error + end. + +-spec convert_query(map()) -> riak_kv_query:query_user_input(). +convert_query(QM) -> + { + maps:get(<<"aggregation_tag">>, QM, undefined), + maps:get(<<"index_name">>, QM), + maps:get(<<"start_term">>, QM), + maps:get(<<"end_term">>, QM), + maps:get(<<"regular_expression">>, QM, undefined), + maps:get(<<"evaluation_expression">>, QM, undefined), + maps:get(<<"filter_expression">>, QM, undefined) + }. + +-spec process_post(request_data(), context()) -> + {boolean()|{halt, pos_integer()}, request_data(), context()}. +%% @doc Produce the JSON response to an index lookup. +process_post(RD, Ctx) -> + Query = Ctx#ctx.query, + Client = Ctx#ctx.client, + AccOpt = riak_kv_query:get_accumulator(Query), + case riak_client:query(Query, Client) of + {error, timeout} -> + {{halt, 503}, return_json_error("timeout", RD), Ctx}; + Results -> + JsonEncodedResults = encode_results(AccOpt, Results), + { + true, + wrq:append_to_resp_body( + JsonEncodedResults, + wrq:set_resp_header(?HEAD_CTYPE, "application/json", RD) + ), + Ctx + } + end. + +-spec encode_results( + riak_kv_query:accumulation_option(), + riak_kv_query_server:results() | {error, term()}) -> + binary(). +encode_results(AccOpt, {error, Reason}) -> + iolist_to_binary( + lists:flatten( + io_lib:format( + <<"Query with option ~w failed - ~0p">>, + [AccOpt, Reason] + ) + ) + ); +encode_results(keys, Results) -> + iolist_to_binary( + riak_kv_wm_json:encode( + #{?ACCKEY_KEYS => Results}, + fun riak_kv_wm_index:keys_encode/2 + ) + ); +encode_results(term_with_keys, Results) -> + iolist_to_binary( + riak_kv_wm_json:encode( + #{?ACCKEY_TERMKEYS => Results}, + fun riak_kv_wm_index:results_encode/2 + ) + ); +encode_results(match_count, Count) -> + iolist_to_binary( + riak_kv_wm_json:encode(#{?ACCKEY_MATCHCOUNT => Count}) + ); +encode_results(key_count, Count) -> + iolist_to_binary( + riak_kv_wm_json:encode(#{?ACCKEY_KEYCOUNT => Count}) + ); +encode_results(term_with_matchcount, CountMap) -> + iolist_to_binary( + riak_kv_wm_json:encode(#{?ACCKEY_TERMMATCHCOUNT => CountMap}) + ); +encode_results(term_with_keycount, CountMap) -> + iolist_to_binary( + riak_kv_wm_json:encode(#{?ACCKEY_TERMKEYCOUNT => CountMap}) + ). + +%% =================================================================== +%% EUnit tests +%% =================================================================== + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +invalid_json_test() -> + InvalidJson = + <<" + { + \"accumulation_option\" : \"keys\", + \"timeout\" : 60, + \"query_list\" : + [ + { + \"index_name\" : \"example_bin\" + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + ] + } + ">>, % Missing comma after example_bin + R = decode_json_body(InvalidJson), + io:format("~p~n", [R]), + ?assertMatch( + {error, "Malformed json request - {invalid_byte,34}"}, + R + ). + +simple_query_test() -> + SimpleQueryJson = + <<" + { + \"timeout\" : 60, + \"query_list\" : + [ + { + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + ] + } + ">>, + {ok, M} = decode_json_body(SimpleQueryJson), + {ok, Q} = make_query({<<"BT">>, <<"B">>}, M), + ?assert(riak_kv_query:is_query(Q)). + +invalid_query_ae1_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"query_list\" : + [ + { + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + {error, S, _E} = make_query({<<"BT">>, <<"B">>}, M), + ?assertMatch(aggregation_expression, S). + +invalid_query_ae2_test() -> + IQJson = + <<" + { + \"timeout\" : 60, + \"query_list\" : + [ + { + \"aggregation_tag\" : 1, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + {error, S, _E} = make_query({<<"BT">>, <<"B">>}, M), + ?assertMatch(aggregation_expression, S). + +invalid_query_ae3_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"query_list\" : + [ + { + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + {error, S, E} = make_query({<<"BT">>, <<"B">>}, M), + ?assertMatch(query_evaluation, S), + ?assertMatch(<<"Untagged query in combination request">>, E). + +valid_query_ae4_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"query_list\" : + [ + { + \"aggregation_tag\" : 1, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + {ok, Q} = make_query({<<"BT">>, <<"B">>}, M), + ?assert(riak_kv_query:is_query(Q)), + QueryList = maps:get(<<"query_list">>, M), + ?assertMatch(ok, check_querylist(QueryList, false)). + +invalid_query_to_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 0, + \"query_list\" : + [ + { + \"aggregation_tag\" : 1, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + {error, S, E} = make_query({<<"BT">>, <<"B">>}, M), + ?assertMatch(init, S), + ?assertMatch(<<"Bad timeout">>, E). + +invalid_query_extratag_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"subs\" : {\"dob\" : \"19260812\"}, + \"query_list\" : + [ + { + \"aggregation_tag\" : 1, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\", + \"end_key\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + ?assertMatch( + {error, "Unexpected keys in request [<<\"subs\">>]"}, + check_keys(maps:keys(M), request) + ), + ?assertMatch( + {error, "Unexpected keys in request [<<\"end_key\">>]"}, + check_querylist(maps:get(<<"query_list">>, M), false) + ). + +invalid_query_missingtag1_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"subs\" : {\"dob\" : \"19260812\"} + } + ">>, + {ok, M} = decode_json_body(IQJson), + ?assertMatch( + {error, "Missing required keys in request [<<\"query_list\">>]"}, + check_keys(maps:keys(M), request) + ). + +invalid_query_missingtag2_test() -> + IQJson = + <<" + { + \"aggregation_expression\" : \"$1 INTERSECT $2\", + \"timeout\" : 60, + \"query_list\" : + [ + { + \"aggregation_tag\" : 1, + \"index_name\" : \"example_bin\", + \"start_term\" : \"A\", + \"end_term\" : \"B\" + }, + { + \"aggregation_tag\" : 2, + \"index_name\" : \"example_bin\", + \"end_term\" : \"B\" + } + + ] + } + ">>, + {ok, M} = decode_json_body(IQJson), + ?assertMatch( + {error, "Missing required keys in request [<<\"start_term\">>]"}, + check_querylist(maps:get(<<"query_list">>, M), false) + ). + +encode_results_test() -> + BinMC = encode_results(match_count, 500), + ?assertMatch( + 500, + maps:get(?ACCKEY_MATCHCOUNT, riak_kv_wm_json:decode(BinMC)) + ), + BinKC = encode_results(key_count, 600), + ?assertMatch( + 600, + maps:get(?ACCKEY_KEYCOUNT, riak_kv_wm_json:decode(BinKC)) + ), + KeyList = [<<"K00001">>, <<"K00002">>, <<"K0003">>], + BinKL = encode_results(keys, KeyList), + ?assertMatch( + KeyList, + maps:get(?ACCKEY_KEYS, riak_kv_wm_json:decode(BinKL)) + ), + TermKeyList = [{<<"T0001">>, <<"K0002">>}, {<<"T0002">>, <<"K0001">>}], + BinTKL = encode_results(term_with_keys, TermKeyList), + ?assertMatch( + TermKeyList, + lists:sort( + lists:map( + fun(M) -> [{T, K}] = maps:to_list(M), {T, K} end, + maps:get(?ACCKEY_TERMKEYS, riak_kv_wm_json:decode(BinTKL)) + ) + ) + ), + TermCount = #{<<"T0001">> => 12, <<"T0002">> => 10}, + BinTKC = encode_results(term_with_keycount, TermCount), + ?assertMatch( + 10, + maps:get( + <<"T0002">>, + maps:get(?ACCKEY_TERMKEYCOUNT, riak_kv_wm_json:decode(BinTKC)) + ) + ), + BinTMC = encode_results(term_with_matchcount, TermCount), + ?assertMatch( + 12, + maps:get( + <<"T0001">>, + maps:get(?ACCKEY_TERMMATCHCOUNT, riak_kv_wm_json:decode(BinTMC)) + ) + ) + . + +-endif. \ No newline at end of file