Skip to content

Commit

Permalink
decode #tscoverageresp properly
Browse files Browse the repository at this point in the history
relies on the code moved from riak_kv_ts_svc over to riak_pb_ts_codec.
  • Loading branch information
Andrei Zavada committed Apr 17, 2016
1 parent 3cd3730 commit dcad486
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 139 deletions.
4 changes: 4 additions & 0 deletions src/riak_kv_pb_ts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,9 @@ encode_response({reply, {tsgetresp, {CNames, CTypes, Rows}}, State}) ->
R = riak_pb_ts_codec:encode_rows(CTypes, Rows),
Encoded = #tsgetresp{columns = C, rows = R},
{reply, Encoded, State};

encode_response({reply, {tscoverageresp, Entries}, State}) ->
Encoded = #tscoverageresp{entries = riak_pb_ts_codec:encode_cover_list(Entries)},
{reply, Encoded, State};
encode_response(Response) ->
Response.
157 changes: 18 additions & 139 deletions src/riak_kv_ts_svc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -355,148 +355,27 @@ sub_tslistkeysreq(Mod, DDL, #tslistkeysreq{table = Table,
sub_tscoveragereq(Mod, _DDL, #tscoveragereq{table = Table,
query = Q},
State) ->
case compile(Mod, catch decode_query(Q, undefined)) of
{error, #rpberrorresp{} = Error} ->
{reply, Error, State};
{error, _} ->
Client = {riak_client, [node(), undefined]},
%% all we need from decode_query is to compile the query,
%% but also to check permissions
case decode_query(Q, undefined) of
{_QryType, {ok, SQL}} ->
case riak_kv_ts_api:compile_to_per_quantum_queries(Mod, SQL) of
{ok, Compiled} ->
Bucket = riak_kv_ts_util:table_to_bucket(Table),
{reply,
{tscoverageresp, riak_kv_ts_util:sql_to_cover(Client, Compiled, Bucket, [])},
State};
{error, Reason} ->
{reply, make_rpberrresp(
?E_BAD_QUERY, flat_format("Failed to compile query: ~p", [Reason])), State}
end;
{error, Reason} ->
{reply, make_rpberrresp(
?E_BAD_QUERY, "Failed to compile query"),
State};
SQL ->
%% SQL is a list of queries (1 per quantum)
Bucket = riak_kv_ts_util:table_to_bucket(Table),
Client = {riak_client, [node(), undefined]},
convert_cover_list(sql_to_cover(Client, SQL, Bucket, []), State)
?E_BAD_QUERY, flat_format("Failed to parse query: ~p", [Reason])),
State}
end.

%% Copied and modified from riak_kv_pb_coverage:convert_list. Would
%% be nice to collapse them back together, probably with a closure,
%% but time and effort.
convert_cover_list({error, Error}, State) ->
{error, Error, State};
convert_cover_list(Results, State) ->
%% Pull hostnames & ports
%% Wrap each element of this list into a rpbcoverageentry
Resp = #tscoverageresp{
entries =
[begin
Node = proplists:get_value(node, Cover),
{IP, Port} = riak_kv_pb_coverage:node_to_pb_details(Node),
#tscoverageentry{
cover_context = riak_kv_pb_coverage:term_to_checksum_binary(
{Cover, Range}),
ip = IP, port = Port,
range = assemble_ts_range(Range, SQLtext)
}
end || {Cover, Range, SQLtext} <- Results]
},
{reply, Resp, State}.

assemble_ts_range({FieldName, {{StartVal, StartIncl}, {EndVal, EndIncl}}}, Text) ->
#tsrange{
field_name = FieldName,
lower_bound = StartVal,
lower_bound_inclusive = StartIncl,
upper_bound = EndVal,
upper_bound_inclusive = EndIncl,
desc = Text
}.


%% Result from riak_client:get_cover is a nested list of coverage plan
%% because KV coverage requests are designed that way, but in our case
%% all we want is the singleton head

%% If any of the results from get_cover are errors, we want that tuple
%% to be the sole return value
sql_to_cover(_Client, [], _Bucket, Accum) ->
lists:reverse(Accum);
sql_to_cover(Client, [SQL|Tail], Bucket, Accum) ->
case Client:get_cover(riak_kv_qry_coverage_plan, Bucket, undefined,
{SQL, Bucket}) of
{error, Error} ->
{error, Error};
[Cover] ->
{Description, RangeReplacement} = reverse_sql(SQL),
sql_to_cover(Client, Tail, Bucket, [{Cover, RangeReplacement,
Description}|Accum])
end.

%% Generate a human-readable description of the target
%% <<"<TABLE> / time > X and time < Y">>
%% Generate a start/end timestamp for future replacement in a query
reverse_sql(?SQL_SELECT{'FROM' = Table,
'WHERE' = KeyProplist,
partition_key = PartitionKey}) ->
QuantumField = identify_quantum_field(PartitionKey),
RangeTuple = extract_time_boundaries(QuantumField, KeyProplist),
Desc = derive_description(Table, QuantumField, RangeTuple),
ReplacementValues = {QuantumField, RangeTuple},
{Desc, ReplacementValues}.


derive_description(Table, Field, {{Start, StartInclusive}, {End, EndInclusive}}) ->
StartOp = pick_operator(">", StartInclusive),
EndOp = pick_operator("<", EndInclusive),
unicode:characters_to_binary(
flat_format("~ts / ~ts ~s ~B and ~ts ~s ~B",
[Table, Field, StartOp, Start,
Field, EndOp, End]), utf8).

pick_operator(LGT, true) ->
LGT ++ "=";
pick_operator(LGT, false) ->
LGT.

extract_time_boundaries(FieldName, WhereList) ->
{FieldName, timestamp, Start} =
lists:keyfind(FieldName, 1, proplists:get_value(startkey, WhereList, [])),
{FieldName, timestamp, End} =
lists:keyfind(FieldName, 1, proplists:get_value(endkey, WhereList, [])),
StartInclusive = proplists:get_value(start_inclusive, WhereList, true),
EndInclusive = proplists:get_value(end_inclusive, WhereList, false),
{{Start, StartInclusive}, {End, EndInclusive}}.


%%%%%%%%%%%%
%% FRAGILE HORRIBLE BAD BAD BAD AST MANGLING
identify_quantum_field(#key_v1{ast = KeyList}) ->
HashFn = find_hash_fn(KeyList),
P_V1 = hd(HashFn#hash_fn_v1.args),
hd(P_V1#param_v1.name).

find_hash_fn([]) ->
throw(wtf);
find_hash_fn([#hash_fn_v1{}=Hash|_T]) ->
Hash;
find_hash_fn([_H|T]) ->
find_hash_fn(T).

%%%%%%%%%%%%


compile(_Mod, {error, Err}) ->
{error, make_decoder_error_response(Err)};
compile(_Mod, {'EXIT', {Err, _}}) ->
{error, make_decoder_error_response(Err)};
compile(Mod, {ok, ?SQL_SELECT{}=SQL}) ->
case (catch Mod:get_ddl()) of
{_, {undef, _}} ->
{error, no_helper_module};
DDL ->
case riak_ql_ddl:is_query_valid(Mod, DDL,
riak_kv_ts_util:sql_record_to_tuple(SQL)) of
true ->
case riak_kv_qry_compiler:compile(DDL, SQL, undefined) of
{error,_} = Error ->
Error;
{ok, Queries} ->
Queries
end;
{false, _Errors} ->
{error, invalid_query}
end
end.

%% ----------
%% query
Expand Down

0 comments on commit dcad486

Please sign in to comment.