diff --git a/src/riak_kv_qry.erl b/src/riak_kv_qry.erl index 9cafc8e741..52bc67823f 100644 --- a/src/riak_kv_qry.erl +++ b/src/riak_kv_qry.erl @@ -197,9 +197,9 @@ do_describe(?DDL{fields = FieldSpecs, ColumnNames = [<<"Column">>, <<"Type">>, <<"Is Null">>, <<"Primary Key">>, <<"Local Key">>], ColumnTypes = [ varchar, varchar, boolean, sint64, sint64 ], Rows = - [{Name, list_to_binary(atom_to_list(Type)), Nullable, + [[Name, list_to_binary(atom_to_list(Type)), Nullable, column_pk_position_or_blank(Name, PKSpec), - column_lk_position_or_blank(Name, LKSpec)} + column_lk_position_or_blank(Name, LKSpec)] || #riak_field_v1{name = Name, type = Type, optional = Nullable} <- FieldSpecs], @@ -299,11 +299,11 @@ describe_table_columns_test() -> Res = do_describe(DDL), ?assertMatch( {ok, {_, _, - [{<<"f">>, <<"varchar">>, false, 1, 1}, - {<<"s">>, <<"varchar">>, false, 2, 2}, - {<<"t">>, <<"timestamp">>, false, 3, 3}, - {<<"w">>, <<"sint64">>, false, [], []}, - {<<"p">>, <<"double">>, true, [], []}]}}, + [[<<"f">>, <<"varchar">>, false, 1, 1], + [<<"s">>, <<"varchar">>, false, 2, 2], + [<<"t">>, <<"timestamp">>, false, 3, 3], + [<<"w">>, <<"sint64">>, false, [], []], + [<<"p">>, <<"double">>, true, [], []]]}}, Res). validate_make_insert_row_basic_test() -> diff --git a/src/riak_kv_qry_worker.erl b/src/riak_kv_qry_worker.erl index 40f066d46c..d2c68ae319 100644 --- a/src/riak_kv_qry_worker.erl +++ b/src/riak_kv_qry_worker.erl @@ -279,7 +279,7 @@ prepare_final_results(#state{ prepare_final_results2(#riak_sel_clause_v1{col_return_types = ColTypes, col_names = ColNames}, Rows) -> %% filter out empty records - FinalRows = [list_to_tuple(R) || R <- Rows, R /= [[]]], + FinalRows = [R || R <- Rows, R /= [[]]], {ColNames, ColTypes, FinalRows}. %%%=================================================================== @@ -292,10 +292,8 @@ prepare_final_results2(#riak_sel_clause_v1{col_return_types = ColTypes, prepare_final_results_test() -> Rows = [[12, <<"windy">>], [13, <<"windy">>]], - RowsAsTuples = [{12, <<"windy">>}, {13, <<"windy">>}], - % IndexedChunks = [{1, Rows}], ?assertEqual( - {[<<"a">>, <<"b">>], [sint64, varchar], RowsAsTuples}, + {[<<"a">>, <<"b">>], [sint64, varchar], Rows}, prepare_final_results( #state{ qry = diff --git a/src/riak_kv_ts_svc.erl b/src/riak_kv_ts_svc.erl index 7a19741ed8..ddf0890bdf 100644 --- a/src/riak_kv_ts_svc.erl +++ b/src/riak_kv_ts_svc.erl @@ -385,8 +385,9 @@ sub_tscoveragereq(Mod, _DDL, #tscoveragereq{table = Table, {reply, ts_query_responses() | #rpberrorresp{}, #state{}}. sub_tsqueryreq(_Mod, DDL = ?DDL{table = Table}, SQL, State) -> case riak_kv_ts_api:query(SQL, DDL) of - {ok, Data} -> - {reply, make_tsqueryresp(Data), State}; + {ok, {ColNames, ColTypes, LdbNativeRows}} -> + Rows = [list_to_tuple(R) || R <- LdbNativeRows], + {reply, make_tsqueryresp({ColNames, ColTypes, Rows}), State}; %% the following timeouts are known and distinguished: {error, no_type} -> diff --git a/src/riak_kv_web.erl b/src/riak_kv_web.erl index f556918f39..fc794f567e 100644 --- a/src/riak_kv_web.erl +++ b/src/riak_kv_web.erl @@ -2,7 +2,7 @@ %% %% riak_kv_web: setup Riak's KV HTTP interface %% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -121,7 +121,17 @@ raw_dispatch(Name) -> {Prefix ++ ["buckets", bucket, "index", field, '*'], riak_kv_wm_index, Props} - ] || {Prefix, Props} <- Props2 ]). + ] || {Prefix, Props} <- Props2 ]) ++ + + lists:flatten( + [ + %% Right now we only have version 1. When we get version 2 we have to + %% decide if we want to dispatch to separate resource modules or handle + %% the different versions inside the same resource handler module. + [{["ts", api_version, "tables", table, "list_keys"], riak_kv_wm_timeseries_listkeys, Props}, + {["ts", api_version, "tables", table, "keys", '*'], riak_kv_wm_timeseries, Props}, + {["ts", api_version, "query"], riak_kv_wm_timeseries_query, Props} + ] || {_Prefix, Props} <- Props2]). is_post(Req) -> wrq:method(Req) == 'POST'. diff --git a/src/riak_kv_wm_timeseries.erl b/src/riak_kv_wm_timeseries.erl new file mode 100644 index 0000000000..f1bd0c3ace --- /dev/null +++ b/src/riak_kv_wm_timeseries.erl @@ -0,0 +1,429 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_wm_timeseries: Webmachine resource for riak TS operations. +%% +%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Resource for Riak TS operations over HTTP. +%% +%% This resource is responsible for everything under +%% ``` +%% ts/v1/tables/Table/keys +%% ``` +%% Specific operations supported: +%% ``` +%% GET /ts/v1/tables/Table/keys/K1/V1/... single-key get +%% DELETE /ts/v1/tables/Table/keys/K1/V1/... single-key delete +%% POST /ts/v1/tables/Table/keys singe-key or batch put depending +%% on the body +%% ''' +%% +%% Request body is expected to be a JSON containing a struct or structs for the +%% POST. GET and DELETE have no body. +%% +%% Response is a JSON containing full records or {"success": true} for POST and +%% DELETE. +%% + +-module(riak_kv_wm_timeseries). + +%% webmachine resource exports +-export([ + init/1, + service_available/2, + allowed_methods/2, + malformed_request/2, + is_authorized/2, + forbidden/2, + content_types_provided/2, + content_types_accepted/2, + encodings_provided/2, + post_is_create/2, + process_post/2, + delete_resource/2, + resource_exists/2 + ]). + +%% webmachine body-producing functions +-export([to_json/2]). + +-include_lib("webmachine/include/webmachine.hrl"). +-include_lib("riak_ql/include/riak_ql_ddl.hrl"). +-include("riak_kv_wm_raw.hrl"). + +-record(ctx, + { + api_version :: undefined | integer(), + api_call :: undefined | get | put | delete, + table :: undefined | binary(), + mod :: undefined | module(), + key :: undefined | ts_rec(), + object, + timeout :: undefined | integer(), + options = [], %% for the call towards riak. + prefix, + riak + }). + +-define(DEFAULT_TIMEOUT, 60000). + +-type cb_rv_spec(T) :: {T, #wm_reqdata{}, #ctx{}}. +-type halt() :: {'halt', 200..599} | {'error' , term()}. +-type ts_rec() :: [riak_pb_ts_codec:ldbvalue()]. + +-spec init(proplists:proplist()) -> {ok, #ctx{}}. +%% @doc Initialize this resource. This function extracts the +%% 'prefix' and 'riak' properties from the dispatch args. +%% (But how exactly are those properties used?) +init(Props) -> + {ok, #ctx{prefix = proplists:get_value(prefix, Props), + riak = proplists:get_value(riak, Props)}}. + +-spec service_available(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean() | halt()). +%% @doc Determine whether or not a connection to Riak +%% can be established. +%% Convert the table name from the part of the URL. +service_available(RD, Ctx) -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> + Table = riak_kv_wm_ts_util:table_from_request(RD), + Mod = riak_ql_ddl:make_module_name(Table), + {true, RD, Ctx#ctx{api_version = ApiVersion, + table = Table, mod = Mod}}; + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) + end. + +is_authorized(RD, #ctx{table = Table} = Ctx) -> + Call = api_call(wrq:path_tokens(RD), wrq:method(RD)), + case riak_kv_wm_ts_util:authorize(Call, Table, RD) of + ok -> + {true, RD, Ctx#ctx{api_call = Call}}; + {error, ErrorMsg} -> + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) + end. + +-spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +forbidden(RD, Ctx) -> + Result = riak_kv_wm_utils:is_forbidden(RD), + {Result, RD, Ctx}. + +-spec allowed_methods(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([atom()]). +allowed_methods(RD, Ctx) -> + allowed_methods(wrq:path_tokens(RD), RD, Ctx). + +allowed_methods([], RD, Ctx) -> + {['POST'], RD, Ctx}; +allowed_methods(_KeyInURL, RD, Ctx) -> + {['GET', 'DELETE'], RD, Ctx}. + +-spec malformed_request(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +malformed_request(RD, Ctx) -> + try + Ctx2 = extract_params(wrq:req_qs(RD), Ctx), + %% NOTE: if the supplied JSON body is wrong a malformed requset + %% may be issued later. It will indeed, only it will be generated + %% manually, via handle_error, and not detected by webmachine from + %% malformed_request reporting it directly. + {false, RD, Ctx2} + catch + throw:ParamError -> + riak_kv_wm_ts_util:handle_error(ParamError, RD, Ctx) + end. + +-spec content_types_provided(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([{string(), atom()}]). +content_types_provided(RD, Ctx) -> + {[{"application/json", to_json}], + RD, Ctx}. + +-spec content_types_accepted(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([{string(), atom()}]). +content_types_accepted(RD, Ctx) -> + content_types_accepted(wrq:path_tokens(RD), RD, Ctx). + +content_types_accepted([], RD, Ctx) -> + %% the JSON in the POST will be handled by process_post, + %% so this handler will never be called. + {[{"application/json", undefined}], RD, Ctx}; +content_types_accepted(_, RD, Ctx) -> + {[], RD, Ctx}. + +-spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean() | halt()). +resource_exists(RD, #ctx{table = Table, mod = Mod} = Ctx) -> + case riak_kv_wm_ts_util:table_module_exists(Mod) of + true -> + resource_exists(wrq:path_tokens(RD), wrq:method(RD), RD, Ctx); + false -> + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) + end. + +resource_exists([], 'POST', RD, Ctx) -> + {true, RD, Ctx}; +resource_exists(Path, 'GET', RD, + #ctx{table = Table, + mod = Mod, + options = Options} = Ctx) -> + %% Would be nice if something cheaper than using get_data existed to check + %% if a key is present. + case validate_key(Path, Ctx) of + {ok, Key} -> + case riak_kv_ts_api:get_data(Key, Table, Mod, Options) of + {ok, Record} -> + {true, RD, Ctx#ctx{object = Record, + key = Key}}; + {error, notfound} -> + riak_kv_wm_ts_util:handle_error(notfound, RD, Ctx); + {error, InternalReason} -> + riak_kv_wm_ts_util:handle_error({riak_error, InternalReason}, RD, Ctx) + end; + {error, Reason} -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) + end; +resource_exists(Path, 'DELETE', RD, Ctx) -> + %% Since reading the object is expensive we will assume for now that the + %% object exists for a delete, but if it turns out that it does not then the + %% processing of the delete will return 404 at that point. + case validate_key(Path, Ctx) of + {ok, Key} -> + {true, RD, Ctx#ctx{key = Key}}; + {error, Reason} -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) + end. + +-spec encodings_provided(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([{Encoding::string(), Producer::function()}]). +encodings_provided(RD, Ctx) -> + {riak_kv_wm_utils:default_encodings(), RD, Ctx}. + +-spec post_is_create(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +post_is_create(RD, Ctx) -> + {false, RD, Ctx}. + +-spec process_post(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +process_post(RD, #ctx{mod = Mod, + table = Table} = Ctx) -> + try extract_data(RD, Mod, Table) of + Records -> + case riak_kv_ts_util:validate_rows(Mod, Records) of + [] -> + case riak_kv_ts_api:put_data(Records, Table, Mod) of + ok -> + Json = result_to_json(ok), + Resp = riak_kv_wm_ts_util:set_json_response(Json, RD), + {true, Resp, Ctx}; + {error, {some_failed, ErrorCount}} -> + riak_kv_wm_ts_util:handle_error({failed_some_puts, ErrorCount, Table}, RD, Ctx) + end; + BadRowIdxs when is_list(BadRowIdxs) -> + riak_kv_wm_ts_util:handle_error( + {invalid_data, string:join([integer_to_list(I) || I <- BadRowIdxs],", ")}, + RD, Ctx) + end + catch + throw:Reason -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) + end. + +-spec delete_resource(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|halt()). +delete_resource(RD, #ctx{table = Table, + mod = Mod, + key = Key, + options = Options} = Ctx) -> + case riak_kv_ts_api:delete_data(Key, Table, Mod, Options) of + ok -> + Json = result_to_json(ok), + Resp = riak_kv_wm_ts_util:set_json_response(Json, RD), + {true, Resp, Ctx}; + {error, notfound} -> + riak_kv_wm_ts_util:handle_error(notfound, RD, Ctx); + {error, Reason} -> + riak_kv_wm_ts_util:handle_error({riak_error, Reason}, RD, Ctx) + end. + +-spec to_json(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(iolist()|halt()). +to_json(RD, #ctx{api_call = get, object = Object} = Ctx) -> + try + Json = mochijson2:encode(Object), + {Json, RD, Ctx} + catch + _:Reason -> + riak_kv_wm_ts_util:handle_error({riak_error, Reason}, RD, Ctx) + end. + +-spec extract_params([{string(), string()}], #ctx{}) -> #ctx{} . +%% @doc right now we only allow a timeout parameter or nothing. +extract_params([], Ctx) -> + Ctx; +extract_params([{"timeout", TimeoutStr} | Rest], + Ctx0 = #ctx{options = Options0}) -> + case catch list_to_integer(TimeoutStr) of + Timeout when is_integer(Timeout), Timeout >= 0 -> + Options = lists:keystore(timeout, 1, Options0, {timeout, Timeout}), + extract_params(Rest, + Ctx0#ctx{timeout = Timeout, + options = Options}); + _ -> + Reason = io_lib:format("Bad timeout value: ~s", [TimeoutStr]), + throw({parameter_error, Reason}) + end; +extract_params([{UnknownParam, _}|_], _Ctx) -> + throw({parameter_error, io_lib:format("Unknown parameter: ~s", [UnknownParam])}). + +validate_key(Path, #ctx{table = Table, mod = Mod}) -> + UnquotedPath = lists:map(fun mochiweb_util:unquote/1, Path), + path_elements_to_key(UnquotedPath, Table, Mod, Mod:get_ddl()). + +path_elements_to_key(PEList, Table, Mod, + ?DDL{local_key = #key_v1{ast = LK}}) -> + try + TableKeyLength = length(LK), + if TableKeyLength * 2 == length(PEList) -> + %% values with field names: "f1/v1/f2/v2/f3/v3" + %% 1. check that supplied key fields exist and values + %% supplied are convertible to their types + FVList = + [convert_fv(Table, Mod, K, V) + || {K, V} <- empair(PEList, [])], + %% 2. possibly reorder field-value pairs to match the LK order + case ensure_lk_order_and_strip(LK, FVList) of + %% this will extract only LK-constituent fields, and catch + %% the case of the right number of fields, of which some + %% are not in LK + OrderedKeyValues when length(OrderedKeyValues) == TableKeyLength -> + {ok, OrderedKeyValues}; + _WrongNumberOfLKFields -> + {error, url_has_not_all_lk_fields} + end; + el/=se -> + {error, {url_malformed_key_path, length(PEList), TableKeyLength, Table}} + end + catch + throw:ConvertFailed -> + {error, ConvertFailed} + end. + +empair([], Q) -> lists:reverse(Q); +empair([K, V | T], Q) -> empair(T, [{K, V}|Q]). + +convert_fv(Table, Mod, FieldRaw, V) -> + Field = [list_to_binary(X) || X <- string:tokens(FieldRaw, ".")], + case Mod:is_field_valid(Field) of + true -> + try + convert_field(Table, Field, Mod:get_field_type(Field), V) + catch + error:badarg -> + %% rethrow with key, for more informative reporting + throw({bad_value, Table, Field}) + end; + false -> + throw({bad_field, Table, Field}) + end. + +convert_field(_T, F, varchar, V) -> + {F, list_to_binary(V)}; +convert_field(_T, F, sint64, V) -> + {F, list_to_integer(V)}; +convert_field(_T, F, double, V) -> + %% list_to_float("42") will fail, so + try + {F, list_to_float(V)} + catch + error:badarg -> + {F, float(list_to_integer(V))} + end; +convert_field(T, F, timestamp, V) -> + case list_to_integer(V) of + BadValue when BadValue < 1 -> + throw({url_key_bad_value, T, F}); + GoodValue -> + {F, GoodValue} + end. + +ensure_lk_order_and_strip(LK, FVList) -> + %% exclude fields not in LK + [V || V <- [proplists:get_value(F, FVList) || #param_v1{name = F} <- LK], + V /= undefined]. + + +extract_data(RD, Mod, Table) -> + try + Json = binary_to_list(wrq:req_body(RD)), + Batch = ensure_batch( + mochijson2:decode(Json)), + FieldTypes = ddl_fields_and_types(Mod), + [json_struct_to_obj(Rec, FieldTypes) || {struct, Rec} <- Batch] + catch + error:_JsonParserError -> + throw(invalid_json); + throw:{Kind, WhichFieldOrType} -> + %% our own custom errors caught in process_post: + %% inject Table and rethrow + throw({Kind, Table, WhichFieldOrType}) + end. + +ensure_batch({struct, SingleRecord}) -> + [{struct, SingleRecord}]; +ensure_batch(Batch) when is_list(Batch) -> + Batch. + +json_struct_to_obj(FieldValueList, Fields) -> + List = [extract_field_value(Field, FieldValueList) + || Field <- Fields], + list_to_tuple(List). + +extract_field_value({Name, Type}, FVList) -> + case proplists:get_value(Name, FVList) of + undefined -> + throw({missing_field, Name}); + Value -> + check_field_value(Name, Type, Value) + end. + +%% @todo: might be better if the DDL helper module had a +%% valid_field_value(Field, Value) -> boolean() function. +check_field_value(_Name, varchar, V) when is_binary(V) -> V; +check_field_value(_Name, sint64, V) when is_integer(V) -> V; +check_field_value(_Name, double, V) when is_number(V) -> V; +check_field_value(_Name, timestamp, V) when is_integer(V), V>0 -> V; +check_field_value(_Name, boolean, V) when is_boolean(V) -> V; +check_field_value(Name, Type, _V) -> + throw({bad_value, {Name, Type}}). + + + +%% @todo: this should be in the DDL helper module, so that the records don't +%% leak out of riak_ql. +ddl_fields_and_types(Mod) -> + ?DDL{fields = Fields} = Mod:get_ddl(), + [ {Name, Type} || #riak_field_v1{name=Name, type=Type} <- Fields ]. + +%% @private +api_call([] , 'POST') -> put; +api_call(_KeyInURL, 'GET') -> get; +api_call(_KeyInURL, 'DELETE') -> delete. + +%% @private +result_to_json(ok) -> + mochijson2:encode([{success, true}]). diff --git a/src/riak_kv_wm_timeseries_listkeys.erl b/src/riak_kv_wm_timeseries_listkeys.erl new file mode 100644 index 0000000000..c6ae9c8e14 --- /dev/null +++ b/src/riak_kv_wm_timeseries_listkeys.erl @@ -0,0 +1,202 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_wm_timeseries_listkeys: Webmachine resource for riak TS +%% streaming operations. +%% +%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Resource for Riak TS operations over HTTP. +%% +%% ``` +%% GET /ts/v1/table/Table/list_keys +%% ''' +%% +%% Response is HTML URLs for the entries in the table. +%% + +-module(riak_kv_wm_timeseries_listkeys). + +%% webmachine resource exports +-export([ + init/1, + service_available/2, + allowed_methods/2, + is_authorized/2, + forbidden/2, + resource_exists/2, + content_types_provided/2, + encodings_provided/2 + ]). + +%% webmachine body-producing functions +-export([produce_doc_body/2]). + +-include("riak_kv_wm_raw.hrl"). +-include_lib("webmachine/include/webmachine.hrl"). + +-record(ctx, + { + api_version :: undefined | integer(), + riak, + security, + table :: undefined | binary(), + mod :: module() + }). + +-type cb_rv_spec(T) :: {T, #wm_reqdata{}, #ctx{}}. + +-spec init(proplists:proplist()) -> {ok, #ctx{}}. +%% @doc Initialize this resource. This function extracts the +%% 'prefix' and 'riak' properties from the dispatch args. +init(Props) -> + {ok, #ctx{riak = proplists:get_value(riak, Props)}}. + +-spec service_available(#wm_reqdata{}, #ctx{}) -> + {boolean(), #wm_reqdata{}, #ctx{}}. +%% @doc Determine whether or not a connection to Riak +%% can be established. +service_available(RD, Ctx) -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> + Table = riak_kv_wm_ts_util:table_from_request(RD), + Mod = riak_ql_ddl:make_module_name(Table), + {true, RD, + Ctx#ctx{table = Table, + mod = Mod}}; + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) + end. + +is_authorized(RD, #ctx{table = Table} = Ctx) -> + case riak_kv_wm_ts_util:authorize(listkeys, Table, RD) of + ok -> + {true, RD, Ctx}; + {error, ErrorMsg} -> + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) + end. + +-spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +forbidden(RD, Ctx) -> + Result = riak_kv_wm_utils:is_forbidden(RD), + {Result, RD, Ctx}. + +-spec allowed_methods(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([atom()]). +%% @doc Get the list of methods this resource supports. +allowed_methods(RD, Ctx) -> + {['GET'], RD, Ctx}. + +-spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +resource_exists(RD, #ctx{mod = Mod, table = Table} = Ctx) -> + case riak_kv_wm_ts_util:table_module_exists(Mod) of + true -> + {true, RD, Ctx}; + false -> + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) + end. + +-spec encodings_provided(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([{Encoding::string(), Producer::function()}]). +%% @doc List the encodings available for representing this resource. +%% "identity" and "gzip" are available. +encodings_provided(RD, Ctx) -> + {riak_kv_wm_utils:default_encodings(), RD, Ctx}. + +-spec content_types_provided(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([{ContentType::string(), Producer::atom()}]). +%% @doc List the content types available for representing this resource. +content_types_provided(RD, Ctx) -> + {[{"text/plain", produce_doc_body}], RD, Ctx}. + +produce_doc_body(RD, Ctx = #ctx{table = Table, mod = Mod}) -> + {ok, ReqId} = riak_client:stream_list_keys( + {Table, Table}, undefined, {riak_client, [node(), undefined]}), + {{stream, {[], fun() -> stream_keys(ReqId, Table, Mod) end}}, RD, Ctx}. + +stream_keys(ReqId, Table, Mod) -> + receive + %% skip empty shipments + {ReqId, {keys, []}} -> + stream_keys(ReqId, Table, Mod); + {ReqId, From, {keys, []}} -> + _ = riak_kv_keys_fsm:ack_keys(From), + stream_keys(ReqId, Table, Mod); + {ReqId, From, {keys, Keys}} -> + _ = riak_kv_keys_fsm:ack_keys(From), + {ts_keys_to_body(Keys, Table, Mod), fun() -> stream_keys(ReqId, Table, Mod) end}; + {ReqId, {keys, Keys}} -> + {ts_keys_to_body(Keys, Table, Mod), fun() -> stream_keys(ReqId, Table, Mod) end}; + {ReqId, done} -> + {<<>>, done}; + {ReqId, {error, timeout}} -> + {mochijson2:encode({struct, [{error, timeout}]}), done}; + Weird -> + lager:warning("Unexpected message while waiting for list_keys batch with ReqId ~p, Table ~s: ~p", [ReqId, Table, Weird]), + stream_keys(ReqId, Table, Mod) + end. + +ts_keys_to_body(Keys, Table, Mod) -> + BaseUrl = base_url(Table), + KeyTypes = riak_kv_wm_ts_util:local_key_fields_and_types(Mod), + %% Dialyzer issues this warning if the lists:map is replaced with + %% the list comprehension (below): + %% riak_kv_wm_timeseries_listkeys.erl:168: The pattern [Key | _] can never match the type [] + %% for which no clear workaround could be found. + %% + %% URLs = [format_url(BaseUrl, KeyTypes, Key) + %% || Key <- Keys], + + URLs = + lists:map( + fun(Key) when is_tuple(Key) -> %% simple single-field keys like {1}, seen in the wild + format_url(BaseUrl, KeyTypes, tuple_to_list(Key)); + (Key) when is_binary(Key) -> %% sext-encoded ones + format_url(BaseUrl, KeyTypes, tuple_to_list(sext:decode(Key))) + end, + Keys), + iolist_to_binary(URLs). + + +format_url(BaseUrl, KeyTypes, Key) -> + iolist_to_binary([BaseUrl, key_to_string(lists:zip(Key, KeyTypes)), $\n]). + +key_to_string(KFTypes) -> + string:join( + [[Field, $/, mochiweb_util:quote_plus(value_to_url_string(Key, Type))] + || {Key, {Field, Type}} <- KFTypes], + "/"). + +value_to_url_string(V, varchar) -> + binary_to_list(V); +value_to_url_string(V, sint64) -> + integer_to_list(V); +value_to_url_string(V, timestamp) -> + integer_to_list(V). + +base_url(Table) -> + {ok, [{Server, Port}]} = application:get_env(riak_api, http), + lists:flatten( + io_lib:format( + "http://~s:~B/ts/~s/tables/~s/keys/", + [Server, Port, riak_kv_wm_ts_util:current_api_version_string(), Table])). diff --git a/src/riak_kv_wm_timeseries_query.erl b/src/riak_kv_wm_timeseries_query.erl new file mode 100644 index 0000000000..d182917863 --- /dev/null +++ b/src/riak_kv_wm_timeseries_query.erl @@ -0,0 +1,298 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_wm_timeseries_query: Webmachine resource for riak TS query call. +%% +%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Resource for Riak TS operations over HTTP. +%% +%% ``` +%% POST /ts/v1/query?query="query string" execute SQL query +%% ''' +%% +%% Response is a JSON containing data rows with column headers. +%% + +-module(riak_kv_wm_timeseries_query). + +%% webmachine resource exports +-export([ + init/1, + service_available/2, + is_authorized/2, + malformed_request/2, + forbidden/2, + allowed_methods/2, + resource_exists/2, + post_is_create/2, + process_post/2, + content_types_accepted/2, + content_types_provided/2, + encodings_provided/2, + produce_doc_body/2 + ]). + +-include_lib("webmachine/include/webmachine.hrl"). +-include_lib("riak_ql/include/riak_ql_ddl.hrl"). +-include("riak_kv_wm_raw.hrl"). +-include("riak_kv_ts.hrl"). + +-record(ctx, + { + api_version :: undefined | integer(), + table :: undefined | binary(), + mod :: undefined | module(), + method :: atom(), + timeout :: undefined | integer(), %% passed-in timeout value in ms + security, %% security context + sql_type :: undefined | riak_kv_qry:query_type(), + compiled_query :: undefined | ?DDL{} | riak_kv_qry:sql_query_type_record(), + with_props :: undefined | proplists:proplist(), + result :: undefined | ok | {Headers::[binary()], Rows::[ts_rec()]} + }). + +-define(DEFAULT_TIMEOUT, 60000). +-define(TABLE_ACTIVATE_WAIT, 30). %% wait until table's bucket type is activated + +-type cb_rv_spec(T) :: {T, #wm_reqdata{}, #ctx{}}. +-type halt() :: {'halt', 200..599} | {'error' , term()}. +-type ts_rec() :: [riak_pb_ts_codec:ldbvalue()]. + + +-spec init(proplists:proplist()) -> {ok, #ctx{}}. +init(_Props) -> + {ok, #ctx{}}. + +-spec service_available(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +%% @doc Determine whether or not a connection to Riak +%% can be established. +service_available(RD, Ctx) -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> + %% always available because no client connection is required + {true, RD, Ctx}; + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) + end. + + +-spec allowed_methods(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([atom()]). +allowed_methods(RD, Ctx) -> + {['POST'], RD, Ctx}. + +-spec malformed_request(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +malformed_request(RD, Ctx) -> + try + {SqlType, SQL, WithProps} = query_from_request(RD), + Table = riak_kv_ts_util:queried_table(SQL), + Mod = riak_ql_ddl:make_module_name(Table), + {false, RD, Ctx#ctx{sql_type = SqlType, + compiled_query = SQL, + with_props = WithProps, + table = Table, + mod = Mod}} + catch + throw:Condition -> + riak_kv_wm_ts_util:handle_error(Condition, RD, Ctx) + end. + +-spec is_authorized(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|string()|halt()). +is_authorized(RD, #ctx{sql_type = SqlType, table = Table} = Ctx) -> + Call = riak_kv_ts_api:api_call_from_sql_type(SqlType), + case riak_kv_wm_ts_util:authorize(Call, Table, RD) of + ok -> + {true, RD, Ctx}; + {error, ErrorMsg} -> + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) + end. + +-spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +forbidden(RD, Ctx) -> + case riak_kv_wm_utils:is_forbidden(RD) of + true -> + {true, RD, Ctx}; + false -> + %% depends on query type, we will check this later; pass + %% for now + {false, RD, Ctx} + end. + +-spec content_types_provided(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([{ContentType::string(), Producer::atom()}]). +content_types_provided(RD, Ctx) -> + {[{"application/json", produce_doc_body}], RD, Ctx}. + + +-spec encodings_provided(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([{Encoding::string(), Producer::function()}]). +encodings_provided(RD, Ctx) -> + {riak_kv_wm_utils:default_encodings(), RD, Ctx}. + + +-spec content_types_accepted(#wm_reqdata{}, #ctx{}) -> + cb_rv_spec([ContentType::string()]). +content_types_accepted(RD, Ctx) -> + {["text/plain"], RD, Ctx}. + + +-spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|halt()). +resource_exists(RD, #ctx{sql_type = ddl, + mod = Mod, + table = Table} = Ctx) -> + case riak_kv_wm_ts_util:table_module_exists(Mod) of + false -> + {true, RD, Ctx}; + true -> + riak_kv_wm_ts_util:handle_error({table_exists, Table}, RD, Ctx) + end; +resource_exists(RD, #ctx{sql_type = Type, + mod = Mod, + table = Table} = Ctx) when Type /= ddl -> + case riak_kv_wm_ts_util:table_module_exists(Mod) of + true -> + {true, RD, Ctx}; + false -> + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) + end. + +-spec post_is_create(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +post_is_create(RD, Ctx) -> + {false, RD, Ctx}. + +-spec process_post(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). +process_post(RD, #ctx{sql_type = ddl, compiled_query = SQL, with_props = WithProps} = Ctx) -> + case create_table(SQL, WithProps) of + ok -> + Result = [{success, true}], %% represents ok + Json = to_json(Result), + {true, wrq:append_to_response_body(Json, RD), Ctx}; + {error, Reason} -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) + end; +process_post(RD, #ctx{sql_type = QueryType, + compiled_query = SQL, + table = Table, + mod = Mod} = Ctx) -> + DDL = Mod:get_ddl(), %% might be faster to store this earlier on + case riak_kv_ts_api:query(SQL, DDL) of + {ok, Data} -> + {ColumnNames, _ColumnTypes, Rows} = Data, + Json = to_json({ColumnNames, Rows}), + {true, wrq:append_to_response_body(Json, RD), Ctx}; + %% the following timeouts are known and distinguished: + {error, qry_worker_timeout} -> + %% the eleveldb process didn't send us any response after + %% 10 sec (hardcoded in riak_kv_qry), and probably died + riak_kv_wm_ts_util:handle_error(query_worker_timeout, RD, Ctx); + {error, backend_timeout} -> + %% the eleveldb process did manage to send us a timeout + %% response + riak_kv_wm_ts_util:handle_error(backend_timeout, RD, Ctx); + {error, Reason} -> + riak_kv_wm_ts_util:handle_error({query_exec_error, QueryType, Table, Reason}, RD, Ctx) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +query_from_request(RD) -> + QueryStr = query_string_from_request(RD), + compile_query(QueryStr). + +query_string_from_request(RD) -> + case wrq:req_body(RD) of + undefined -> + throw(no_query_in_body); + Str -> + binary_to_list(Str) + end. + +compile_query(QueryStr) -> + case catch riak_ql_parser:ql_parse( + riak_ql_lexer:get_tokens(QueryStr)) of + %% parser messages have a tuple for Reason: + {error, {_LineNo, riak_ql_parser, Msg}} when is_integer(_LineNo) -> + throw({query_parse_error, Msg}); + {error, {Token, riak_ql_parser, _}} -> + throw({query_parse_error, io_lib:format("Unexpected token: '~s'", [Token])}); + {'EXIT', {Reason, _StackTrace}} -> %% these come from deep in the lexer + throw({query_parse_error, Reason}); + {error, Reason} -> + throw({query_compile_error, Reason}); + {ddl, _DDL, _Props} = Res -> + Res; + {Type, Compiled} -> + {ok, SQL} = riak_kv_ts_util:build_sql_record( + Type, Compiled, undefined), + {Type, SQL, undefined} + end. + + +create_table(DDL = ?DDL{table = Table}, Props) -> + %% would be better to use a function to get the table out. + {ok, Props1} = riak_kv_ts_util:apply_timeseries_bucket_props( + DDL, riak_ql_ddl_compiler:get_compiler_version(), Props), + Props2 = [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props1], + case riak_core_bucket_type:create(Table, Props2) of + ok -> + wait_until_active(Table, ?TABLE_ACTIVATE_WAIT); + {error, Reason} -> + {error, {table_create_fail, Table, Reason}} + end. + +wait_until_active(Table, 0) -> + {error, {table_activate_fail, Table}}; +wait_until_active(Table, Seconds) -> + case riak_core_bucket_type:activate(Table) of + ok -> + ok; + {error, not_ready} -> + timer:sleep(1000), + wait_until_active(Table, Seconds - 1); + {error, undefined} -> + %% this is inconceivable because create(Table) has + %% just succeeded, so it's here mostly to pacify + %% the dialyzer (and of course, for the odd chance + %% of Erlang imps crashing nodes between create + %% and activate calls) + {error, {table_created_missing, Table}} + end. + +-spec produce_doc_body(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(iolist()). +produce_doc_body(RD, Ctx = #ctx{result = {Columns, Rows}}) -> + {mochijson2:encode( + {struct, [{<<"columns">>, Columns}, + {<<"rows">>, Rows}]}), + RD, Ctx}. + +to_json({Columns, Rows}) when is_list(Columns), is_list(Rows) -> + mochijson2:encode( + {struct, [{<<"columns">>, Columns}, + {<<"rows">>, Rows}]}); +to_json(Other) -> + mochijson2:encode(Other). + +%% log(Format, Args) -> +%% lager:log(info, self(), Format, Args). diff --git a/src/riak_kv_wm_ts_util.erl b/src/riak_kv_wm_ts_util.erl new file mode 100644 index 0000000000..a7e1866185 --- /dev/null +++ b/src/riak_kv_wm_ts_util.erl @@ -0,0 +1,245 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_wm_ts_util: utility functions for riak_kv_wm_timeseries* resources. +%% +%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_kv_wm_ts_util). + + +-export([authorize/3, + current_api_version/0, + current_api_version_string/0, + extract_api_version/1, + handle_error/3, + is_supported_api_version/1, + local_key/1, + local_key_fields_and_types/1, + set_json_response/2, + set_text_resp_header/2, + table_from_request/1, + table_module_exists/1, + utf8_to_binary/1]). + + +-include_lib("webmachine/include/webmachine.hrl"). +-include_lib("riak_ql/include/riak_ql_ddl.hrl"). + +-spec current_api_version() -> non_neg_integer(). +current_api_version() -> + 1. +-spec current_api_version_string() -> string(). +current_api_version_string() -> + "v1". + +-spec extract_api_version(#wm_reqdata{} | string()) -> integer() | undefined. +extract_api_version(RD = #wm_reqdata{}) -> + extract_api_version( + wrq:path_info(api_version, RD)); +extract_api_version("v1") -> + 1; +extract_api_version(_) -> + undefined. + +is_supported_api_version(Ver) when is_integer(Ver) -> + Ver =< current_api_version(); +is_supported_api_version(_) -> + false. + + + +%% @private +table_from_request(RD) -> + utf8_to_binary( + mochiweb_util:unquote( + wrq:path_info(table, RD))). + +%% move to util module. +utf8_to_binary(S) -> + unicode:characters_to_binary(S, utf8, utf8). + +flat_format(Format, Args) -> + lists:flatten(io_lib:format(Format, Args)). + +set_text_resp_header(IoList, RD) -> + wrq:set_resp_header( + "Content-Type", "text/plain", wrq:append_to_response_body(IoList, RD)). + +set_json_response(Json, RD) -> + wrq:set_resp_header("Content-Type", "application/json", + wrq:append_to_response_body(Json, RD)). + + +authorize(Call, Table, RD) -> + case riak_api_web_security:is_authorized(RD) of + false -> + {error, "Basic realm=\"Riak\""}; + {true, undefined} -> %% @todo: why is this returned during testing? + ok; + {true, SecContext} -> + case riak_core_security:check_permission( + {riak_kv_ts_api:api_call_to_perm(Call), Table}, SecContext) of + {false, Error, _} -> + {error, utf8_to_binary(Error)}; + _ -> + ok + end; + insecure -> + insecure + end. + +%% @todo: this should be in riak_ql_ddl and should probably check deeper. +-spec table_module_exists(module()) -> boolean(). +table_module_exists(Mod) -> + try Mod:get_ddl() of + _DDL -> %#ddl_v1{} -> + true + catch + _:_ -> + false + end. + + +local_key(Mod) -> + ddl_local_key(Mod:get_ddl()). + +%% this should be in the DDL helper module. +-spec ddl_local_key(?DDL{}) -> [binary()]. +ddl_local_key(?DDL{local_key = #key_v1{ast = Ast}}) -> + [ param_name(P) || P <- Ast]. + +param_name(#param_v1{name=[Name]}) -> + Name. + +local_key_fields_and_types(Mod) -> + LK = local_key(Mod), + Types = [Mod:get_field_type([F]) || F <- LK ], + LKStr = [ binary_to_list(F) || F <- LK ], + lists:zip(LKStr, Types). + + +error_out(Type, Fmt, Args, RD, Ctx) -> + {Type, + wrq:set_resp_header( + "Content-Type", "text/plain", wrq:append_to_response_body( + flat_format(Fmt, Args), RD)), + Ctx}. + +-spec handle_error(atom()|tuple(), #wm_reqdata{}, Ctx::tuple()) -> + {tuple(), #wm_reqdata{}, Ctx::tuple()}. +handle_error(Error, RD, Ctx) -> + case Error of + {not_ready, State} -> + error_out(false, + "Not ready (~s)", [State], RD, Ctx); + insecure_connection -> + error_out({halt, 426}, + "Security is enabled and Riak does not" + " accept credentials over HTTP. Try HTTPS instead.", [], RD, Ctx); + {unsupported_version, BadVersion} -> + error_out({halt, 412}, + "Unsupported API version ~s", [BadVersion], RD, Ctx); + {not_permitted, Table, ErrMsg} -> + error_out({halt, 401}, + "Access to table \"~ts\" not allowed (~s)", [Table, ErrMsg], RD, Ctx); + {malformed_request, Method} -> + error_out({halt, 400}, + "Malformed ~s request", [Method], RD, Ctx); + {url_key_bad_method, Method} -> + error_out({halt, 400}, + "Inappropriate ~s request", [Method], RD, Ctx); + {bad_parameter, Param} -> + error_out({halt, 400}, + "Bad value for parameter \"~s\"", [Param], RD, Ctx); + {no_such_table, Table} -> + error_out({halt, 404}, + "Table \"~ts\" does not exist", [Table], RD, Ctx); + {table_exists, Table} -> + error_out({halt, 409}, + "Table \"~ts\" already exists", [Table], RD, Ctx); + {failed_some_puts, NoOfFailures, Table} -> + error_out({halt, 400}, + "Failed to put ~b records to table \"~ts\"", [NoOfFailures, Table], RD, Ctx); + {invalid_data, BadRowIdxs} -> + error_out({halt, 400}, + "Invalid record #~s", [hd(BadRowIdxs)], RD, Ctx); + invalid_json -> + error_out({halt, 400}, + "Invalid json in body", [], RD, Ctx); + {key_element_count_mismatch, Got, Need} -> + error_out({halt, 400}, + "Incorrect number of elements (~b) for key of length ~b", [Need, Got], RD, Ctx); + {bad_field, Table, Field} -> + error_out({halt, 400}, + "Table \"~ts\" has no field named \"~s\"", [Table, Field], RD, Ctx); + {missing_field, Table, Field} -> + error_out({halt, 400}, + "Missing field \"~s\" for key in table \"~ts\"", [Field, Table], RD, Ctx); + {bad_value, Table, {Field, Type}} -> + error_out({halt, 400}, + "Bad value for field \"~s\" of type ~s in table \"~ts\"", + [Field, Type, Table], RD, Ctx); + {url_malformed_key_path, NPathElements, KeyLength, Table} -> + error_out({halt, 400}, + "Need ~b field/value pairs for key in table \"~ts\", got ~b path elements", + [KeyLength, Table, NPathElements], RD, Ctx); + url_has_not_all_lk_fields -> + error_out({halt, 400}, + "Not all key-constituent fields given on URL", [], RD, Ctx); + notfound -> + error_out({halt, 404}, + "Key not found", [], RD, Ctx); + no_query_in_body -> + error_out({halt, 400}, + "No query in request body", [], RD, Ctx); + {query_parse_error, Details} -> + error_out({halt, 400}, + "Query error: ~s", [Details], RD, Ctx); + {query_compile_error, Details} -> + error_out({halt, 400}, + "~s", [Details], RD, Ctx); + {table_create_fail, Table, Reason} -> + error_out({halt, 500}, + "Failed to create table \"~ts\": ~p", [Table, Reason], RD, Ctx); + {table_activate_fail, Table} -> + error_out({halt, 500}, + "Failed to activate bucket type of table \"~ts\"", [Table], RD, Ctx); + {table_created_missing, Table} -> + error_out({halt, 500}, + "Table \"~ts\" has been created but disappeared", [Table], RD, Ctx); + {query_exec_error, Type, Table, {_Kind, Explained}} -> + error_out({halt, 500}, + "Execution of ~s query failed on table \"~ts\" (~s)", + [Type, Table, Explained], RD, Ctx); + {query_exec_error, Type, Table, ErrMsg} -> + error_out({halt, 500}, + "Execution of ~s query failed on table \"~ts\" (~p)", + [Type, Table, ErrMsg], RD, Ctx); + query_worker_timeout -> + error_out({halt, 503}, + "Query worker timeout", [], RD, Ctx); + backend_timeout -> + error_out({halt, 503}, + "Storage backend timeout", [], RD, Ctx); + {parameter_error, Message} -> + error_out({halt, 400}, + "~s", [Message], RD, Ctx); + {riak_error, Detailed} -> + error_out({halt, 500}, + "Internal riak error: ~p", [Detailed], RD, Ctx) + end. diff --git a/src/riak_kv_wm_utils.erl b/src/riak_kv_wm_utils.erl index f5d7dd9493..f65f88ffd9 100644 --- a/src/riak_kv_wm_utils.erl +++ b/src/riak_kv_wm_utils.erl @@ -355,6 +355,8 @@ jsonify_bucket_prop({search_extractor, {M, F, Arg}}) -> {?JSON_ARG, Arg}]}}; jsonify_bucket_prop({name, {_T, B}}) -> {<<"name">>, B}; +jsonify_bucket_prop({ddl, _DDL}) -> + {<<"ddl">>, <<"riak_ql_to_string:sql_to_string might be useful here.">>}; jsonify_bucket_prop({Prop, Value}) -> {atom_to_binary(Prop, utf8), Value}.