diff --git a/src/riak_kv_wm_timeseries.erl b/src/riak_kv_wm_timeseries.erl index 4c9c2f502c..ad01249172 100644 --- a/src/riak_kv_wm_timeseries.erl +++ b/src/riak_kv_wm_timeseries.erl @@ -31,7 +31,7 @@ %% GET /ts/v1/tables/Table/keys/K1/V1/... single-key get %% DELETE /ts/v1/tables/Table/keys/K1/V1/... single-key delete %% POST /ts/v1/tables/Table/keys singe-key or batch put depending -%% on the body +%% on the body %% ''' %% %% Request body is expected to be a JSON containing a struct or structs for the @@ -44,7 +44,8 @@ -module(riak_kv_wm_timeseries). %% webmachine resource exports --export([init/1, +-export([ + init/1, service_available/2, allowed_methods/2, malformed_request/2, @@ -56,7 +57,8 @@ post_is_create/2, process_post/2, delete_resource/2, - resource_exists/2]). + resource_exists/2 + ]). %% webmachine body-producing functions -export([to_json/2]). @@ -64,18 +66,20 @@ -include_lib("webmachine/include/webmachine.hrl"). -include_lib("riak_ql/include/riak_ql_ddl.hrl"). -include("riak_kv_wm_raw.hrl"). --include("riak_kv_ts.hrl"). -record(ctx, - {api_call :: 'undefined' | 'get' | 'put' | 'delete', - table :: 'undefined' | binary(), - mod :: 'undefined' | module(), - key :: 'undefined' | ts_rec(), - object, - timeout :: 'undefined' | integer(), - options, %% for the call towards riak. - prefix, - riak}). + { + api_version :: undefined | integer(), + api_call :: undefined | get | put | delete, + table :: undefined | binary(), + mod :: undefined | module(), + key :: undefined | ts_rec(), + object, + timeout :: undefined | integer(), + options = [], %% for the call towards riak. + prefix, + riak + }). -define(DEFAULT_TIMEOUT, 60000). @@ -86,38 +90,39 @@ -spec init(proplists:proplist()) -> {ok, #ctx{}}. %% @doc Initialize this resource. This function extracts the %% 'prefix' and 'riak' properties from the dispatch args. +%% (But how exactly are those properties used?) init(Props) -> {ok, #ctx{prefix = proplists:get_value(prefix, Props), riak = proplists:get_value(riak, Props)}}. - %% {{trace, "/tmp"}, #ctx{prefix = proplists:get_value(prefix, Props), - %% riak = proplists:get_value(riak, Props)}}. -%% wmtrace_resource:add_dispatch_rule("wmtrace", "/tmp"). -spec service_available(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean() | halt()). %% @doc Determine whether or not a connection to Riak %% can be established. %% Convert the table name from the part of the URL. -service_available(RD, #ctx{riak = RiakProps}=Ctx) -> - case riak_kv_wm_utils:get_riak_client( - RiakProps, riak_kv_wm_utils:get_client_id(RD)) of - {ok, _C} -> +service_available(RD, Ctx) -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> Table = riak_kv_wm_ts_util:table_from_request(RD), Mod = riak_ql_ddl:make_module_name(Table), - {true, RD, Ctx#ctx{table=Table, mod=Mod}}; - {error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message("Node not ready: ~p", [Reason], RD), - {false, Resp, Ctx} + {true, RD, Ctx#ctx{api_version = ApiVersion, + table = Table, mod = Mod}}; + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) end. -is_authorized(RD, #ctx{table=Table}=Ctx) -> +is_authorized(RD, #ctx{table = Table} = Ctx) -> Call = api_call(wrq:path_tokens(RD), wrq:method(RD)), case riak_kv_wm_ts_util:authorize(Call, Table, RD) of ok -> - {true, RD, Ctx#ctx{api_call=Call}}; + {true, RD, Ctx#ctx{api_call = Call}}; {error, ErrorMsg} -> - {ErrorMsg, RD, Ctx}; - {insecure, Halt, Resp} -> - {Halt, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) end. -spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). @@ -138,23 +143,16 @@ allowed_methods(_KeyInURL, RD, Ctx) -> malformed_request(RD, Ctx) -> try Ctx2 = extract_params(wrq:req_qs(RD), Ctx), - malformed_request(wrq:path_tokens(RD), RD, Ctx2) + %% NOTE: if the supplied JSON body is wrong a malformed requset + %% may be issued later. It will indeed, only it will be generated + %% manually, via handle_error, and not detected by webmachine from + %% malformed_request reporting it directly. + {false, RD, Ctx2} catch - throw:{parameter_error, Error} -> - Resp = riak_kv_wm_ts_util:set_error_message("parameter error: ~p", [Error], RD), - {true, Resp, Ctx} + throw:ParamError -> + riak_kv_wm_ts_util:handle_error(ParamError, RD, Ctx) end. -malformed_request([], RD, Ctx) -> - %% NOTE: if the supplied JSON body is wrong a malformed requset may be - %% issued later. - %% @todo: should the validation of the JSON happen here??? - {false, RD, Ctx}; -malformed_request(KeyInUrl, RD, Ctx) when length(KeyInUrl) rem 2 == 0 -> - {false, RD, Ctx}; -malformed_request(_, RD, Ctx) -> - {true, RD, Ctx}. - -spec content_types_provided(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([{string(), atom()}]). content_types_provided(RD, Ctx) -> {[{"application/json", to_json}], @@ -172,56 +170,45 @@ content_types_accepted(_, RD, Ctx) -> {[], RD, Ctx}. -spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean() | halt()). -resource_exists(RD, #ctx{mod=Mod} = Ctx) -> +resource_exists(RD, #ctx{table = Table, mod = Mod} = Ctx) -> case riak_kv_wm_ts_util:table_module_exists(Mod) of true -> resource_exists(wrq:path_tokens(RD), wrq:method(RD), RD, Ctx); false -> - Resp = riak_kv_wm_ts_util:set_error_message("table ~p not created", [Mod], RD), - {false, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) end. resource_exists([], 'POST', RD, Ctx) -> {true, RD, Ctx}; resource_exists(Path, 'GET', RD, - #ctx{table=Table, - mod=Mod, - options=Options}=Ctx) -> + #ctx{table = Table, + mod = Mod, + options = Options} = Ctx) -> %% Would be nice if something cheaper than using get_data existed to check %% if a key is present. - try - Key = validate_key(Path, Mod), - case riak_kv_ts_api:get_data(Key, Table, Mod, Options) of - {ok, Record} -> - {true, RD, Ctx#ctx{object=Record, - key=Key}}; - {error, notfound} -> - {{halt, 404}, RD, Ctx}; - {error, InternalReason} -> - InternalResp = riak_kv_wm_ts_util:set_error_message("Internal error: ~p", [InternalReason], RD), - {{halt, 500}, InternalResp, Ctx} - end - catch - throw:{path_error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message( - "lookup on ~p failed due to ~p", - [Path, Reason], - RD), - {false, Resp, Ctx} + case validate_key(Path, Ctx) of + {ok, Key} -> + case riak_kv_ts_api:get_data(Key, Table, Mod, Options) of + {ok, Record} -> + {true, RD, Ctx#ctx{object = Record, + key = Key}}; + {error, notfound} -> + riak_kv_wm_ts_util:handle_error(notfound, RD, Ctx); + {error, InternalReason} -> + riak_kv_wm_ts_util:handle_error({riak_error, InternalReason}, RD, Ctx) + end; + {error, Reason} -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) end; -resource_exists(Path, 'DELETE', RD, #ctx{mod=Mod}=Ctx) -> +resource_exists(Path, 'DELETE', RD, Ctx) -> %% Since reading the object is expensive we will assume for now that the %% object exists for a delete, but if it turns out that it does not then the %% processing of the delete will return 404 at that point. - try - Key = validate_key(Path, Mod), - {true, RD, Ctx#ctx{key=Key}} - catch - throw:{path_error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message("deletion of ~p failed due to ~p", - [Path, Reason], - RD), - {false, Resp, Ctx} + case validate_key(Path, Ctx) of + {ok, Key} -> + {true, RD, Ctx#ctx{key = Key}}; + {error, Reason} -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) end. -spec encodings_provided(#wm_reqdata{}, #ctx{}) -> @@ -234,9 +221,9 @@ post_is_create(RD, Ctx) -> {false, RD, Ctx}. -spec process_post(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). -process_post(RD, #ctx{mod=Mod, - table=Table}=Ctx) -> - try extract_data(RD, Mod) of +process_post(RD, #ctx{mod = Mod, + table = Table} = Ctx) -> + try extract_data(RD, Mod, Table) of Records -> case riak_kv_ts_util:validate_rows(Mod, Records) of [] -> @@ -246,161 +233,188 @@ process_post(RD, #ctx{mod=Mod, Resp = riak_kv_wm_ts_util:set_json_response(Json, RD), {true, Resp, Ctx}; {error, {some_failed, ErrorCount}} -> - Resp = riak_kv_wm_ts_util:set_error_message("failed some puts ~p ~p", - [ErrorCount, Table], - RD), - {{halt, 400}, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({failed_some_puts, ErrorCount, Table}, RD, Ctx) end; BadRowIdxs when is_list(BadRowIdxs) -> - Resp = riak_kv_wm_ts_util:set_error_message("invalid data: ~p", - [BadRowIdxs], - RD), - {{halt, 400}, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({invalid_data, BadRowIdxs}, RD, Ctx) end catch - throw:{data_problem,Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message("wrong body: ~p", [Reason], RD), - {{halt, 400}, Resp, Ctx} + throw:Reason -> + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) end. -spec delete_resource(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|halt()). -delete_resource(RD, #ctx{table=Table, - mod=Mod, - key=Key, - options=Options}=Ctx) -> - try riak_kv_ts_api:delete_data(Key, Table, Mod, Options) of +delete_resource(RD, #ctx{table = Table, + mod = Mod, + key = Key, + options = Options} = Ctx) -> + case riak_kv_ts_api:delete_data(Key, Table, Mod, Options) of ok -> - Json = result_to_json(ok), - Resp = riak_kv_wm_ts_util:set_json_response(Json, RD), - {true, Resp, Ctx}; - {error, notfound} -> - Resp = riak_kv_wm_ts_util:set_error_message( - "resource ~p does not exist - impossible to delete", - [wrq:path(RD)], - RD), - {{halt, 404}, Resp, Ctx} - catch - _:Reason -> - Resp = riak_kv_wm_ts_util:set_error_message("Internal error: ~p", [Reason], RD), - {{halt, 500}, Resp, Ctx} + Json = result_to_json(ok), + Resp = riak_kv_wm_ts_util:set_json_response(Json, RD), + {true, Resp, Ctx}; + {error, notfound} -> + riak_kv_wm_ts_util:handle_error(notfound, RD, Ctx); + {error, Reason} -> + riak_kv_wm_ts_util:handle_error({riak_error, Reason}, RD, Ctx) end. -spec to_json(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(iolist()|halt()). -to_json(RD, #ctx{api_call=get, object=Object}=Ctx) -> +to_json(RD, #ctx{api_call = get, object = Object} = Ctx) -> try Json = mochijson2:encode(Object), {Json, RD, Ctx} catch _:Reason -> - Resp = riak_kv_wm_ts_util:set_error_message("object error ~p", [Reason], RD), - {{halt, 500}, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({riak_error, Reason}, RD, Ctx) end. -spec extract_params([{string(), string()}], #ctx{}) -> #ctx{} . %% @doc right now we only allow a timeout parameter or nothing. extract_params([], Ctx) -> - Ctx#ctx{options=[]}; -extract_params([{"timeout", TimeoutStr}], Ctx) -> - try - Timeout = list_to_integer(TimeoutStr), - Ctx#ctx{timeout = Timeout, - options = [{timeout, Timeout}]} - catch - _:_ -> - Reason = io_lib:format("timeout not an integer value: ~s", [TimeoutStr]), + Ctx; +extract_params([{"timeout", TimeoutStr} | Rest], + Ctx0 = #ctx{options = Options0}) -> + case catch list_to_integer(TimeoutStr) of + Timeout when is_integer(Timeout), Timeout >= 0 -> + Options = lists:keystore(timeout, 1, Options0, {timeout, Timeout}), + extract_params(Rest, + Ctx0#ctx{timeout = Timeout, + options = Options}); + _ -> + Reason = io_lib:format("Bad timeout value: ~s", [TimeoutStr]), throw({parameter_error, Reason}) end; -extract_params(Params, _Ctx) -> - Reason = io_lib:format("incorrect paramters: ~p", [Params]), - throw({parameter_error, Reason}). +extract_params([{UnknownParam, _}|_], _Ctx) -> + throw({parameter_error, io_lib:format("Unknown parameter: ~s", [UnknownParam])}). -validate_key(Path, Mod) -> +validate_key(Path, #ctx{table = Table, mod = Mod}) -> UnquotedPath = lists:map(fun mochiweb_util:unquote/1, Path), - path_elements(Mod, UnquotedPath). + path_elements_to_key(UnquotedPath, Table, Mod, Mod:get_ddl()). -%% extract keys from path elements in the URL (.../K1/V1/K2/V2/... -> -%% [V1, V2, ...]), check with Table's DDL to make sure keys are -%% correct and values are of (convertible to) appropriate types, and -%% return the KV list -%% @private --spec path_elements(module(), [string()]) -> - [riak_pb_ts_codec:ldbvalue()]. -path_elements(Mod, Path) -> - KeyTypes = riak_kv_wm_ts_util:local_key_fields_and_types(Mod), - match_path(Path, KeyTypes). - -match_path([], []) -> - []; -match_path([F,V|Path], [{F, Type}|KeyTypes]) -> - [convert_field_value(Type, V)|match_path(Path, KeyTypes)]; -match_path(Path, _KeyTypes) -> - throw({path_error, io_lib:format("incorrect path ~p", [Path])}). +path_elements_to_key(PEList, Table, Mod, + ?DDL{local_key = #key_v1{ast = LK}}) -> + try + TableKeyLength = length(LK), + if TableKeyLength * 2 == length(PEList) -> + %% values with field names: "f1/v1/f2/v2/f3/v3" + %% 1. check that supplied key fields exist and values + %% supplied are convertible to their types + FVList = + [convert_fv(Table, Mod, K, V) + || {K, V} <- empair(PEList, [])], + %% 2. possibly reorder field-value pairs to match the LK order + case ensure_lk_order_and_strip(LK, FVList) of + %% this will extract only LK-constituent fields, and catch + %% the case of the right number of fields, of which some + %% are not in LK + OrderedKeyValues when length(OrderedKeyValues) == TableKeyLength -> + {ok, OrderedKeyValues}; + _WrongNumberOfLKFields -> + {error, url_has_not_all_lk_fields} + end; + el/=se -> + {error, {url_malformed_key_path, length(PEList), TableKeyLength, Table}} + end + catch + throw:ConvertFailed -> + {error, ConvertFailed} + end. -%% @private -convert_field_value(varchar, V) -> - list_to_binary(V); -convert_field_value(sint64, V) -> - list_to_integer(V); -convert_field_value(double, V) -> +empair([], Q) -> lists:reverse(Q); +empair([K, V | T], Q) -> empair(T, [{K, V}|Q]). + +convert_fv(Table, Mod, FieldRaw, V) -> + Field = [list_to_binary(X) || X <- string:tokens(FieldRaw, ".")], + case Mod:is_field_valid(Field) of + true -> + try + convert_field(Table, Field, Mod:get_field_type(Field), V) + catch + error:badarg -> + %% rethrow with key, for more informative reporting + throw({bad_value, Table, Field}) + end; + false -> + throw({bad_field, Table, Field}) + end. + +convert_field(_T, F, varchar, V) -> + {F, list_to_binary(V)}; +convert_field(_T, F, sint64, V) -> + {F, list_to_integer(V)}; +convert_field(_T, F, double, V) -> + %% list_to_float("42") will fail, so try - list_to_float(V) + {F, list_to_float(V)} catch error:badarg -> - float(list_to_integer(V)) + {F, float(list_to_integer(V))} end; -convert_field_value(timestamp, V) -> +convert_field(T, F, timestamp, V) -> case list_to_integer(V) of - GoodValue when GoodValue > 0 -> - GoodValue; - _ -> - throw({path_error, "incorrect field value"}) + BadValue when BadValue < 1 -> + throw({url_key_bad_value, T, F}); + GoodValue -> + {F, GoodValue} end. -extract_data(RD, Mod) -> +ensure_lk_order_and_strip(LK, FVList) -> + %% exclude fields not in LK + [V || V <- [proplists:get_value(F, FVList) || #param_v1{name = F} <- LK], + V /= undefined]. + + +extract_data(RD, Mod, Table) -> try - JsonStr = binary_to_list(wrq:req_body(RD)), - Json = mochijson2:decode(JsonStr), - DDLFieldTypes = ddl_fields_and_types(Mod), - extract_records(Json, DDLFieldTypes) + Json = binary_to_list(wrq:req_body(RD)), + Batch = ensure_batch( + mochijson2:decode(Json)), + FieldTypes = ddl_fields_and_types(Mod), + [json_struct_to_obj(Rec, FieldTypes) || {struct, Rec} <- Batch] catch - _Error:Reason -> - throw({data_problem, Reason}) + error:_JsonParserError -> + throw(invalid_json); + throw:{Kind, WhichFieldOrType} -> + %% our own custom errors caught in process_post: + %% inject Table and rethrow + throw({Kind, Table, WhichFieldOrType}) end. -extract_records({struct, _}=Struct, Fields) -> - [json_struct_to_obj(Struct, Fields)]; -extract_records(Structs, Fields) when is_list(Structs) -> - [json_struct_to_obj(S, Fields) || S <- Structs]. +ensure_batch({struct, SingleRecord}) -> + [{struct, SingleRecord}]; +ensure_batch(Batch) when is_list(Batch) -> + Batch. -json_struct_to_obj({struct, FieldValueList}, Fields) -> - List = [ extract_field_value(Field, FieldValueList) - || Field <- Fields], +json_struct_to_obj(FieldValueList, Fields) -> + List = [extract_field_value(Field, FieldValueList) + || Field <- Fields], list_to_tuple(List). extract_field_value({Name, Type}, FVList) -> case proplists:get_value(Name, FVList) of undefined -> - throw({data_problem, {missing_field, Name}}); + throw({missing_field, Name}); Value -> - check_field_value(Type, Value) + check_field_value(Name, Type, Value) end. %% @todo: might be better if the DDL helper module had a %% valid_field_value(Field, Value) -> boolean() function. -check_field_value(varchar, V) when is_binary(V) -> V; -check_field_value(sint64, V) when is_integer(V) -> V; -check_field_value(double, V) when is_number(V) -> V; -check_field_value(timestamp, V) when is_integer(V), V>0 -> V; -check_field_value(boolean, V) when is_boolean(V) -> V; -check_field_value(Type, V) -> - throw({data_problem, {wrong_type, Type, V}}). +check_field_value(_Name, varchar, V) when is_binary(V) -> V; +check_field_value(_Name, sint64, V) when is_integer(V) -> V; +check_field_value(_Name, double, V) when is_number(V) -> V; +check_field_value(_Name, timestamp, V) when is_integer(V), V>0 -> V; +check_field_value(_Name, boolean, V) when is_boolean(V) -> V; +check_field_value(Name, Type, _V) -> + throw({bad_value, {Name, Type}}). %% @todo: this should be in the DDL helper module, so that the records don't %% leak out of riak_ql. ddl_fields_and_types(Mod) -> - #ddl_v1{fields=Fields} = Mod:get_ddl(), + ?DDL{fields = Fields} = Mod:get_ddl(), [ {Name, Type} || #riak_field_v1{name=Name, type=Type} <- Fields ]. %% @private diff --git a/src/riak_kv_wm_timeseries_listkeys.erl b/src/riak_kv_wm_timeseries_listkeys.erl index 4ba6fa430a..b4b07b09f6 100644 --- a/src/riak_kv_wm_timeseries_listkeys.erl +++ b/src/riak_kv_wm_timeseries_listkeys.erl @@ -41,7 +41,8 @@ forbidden/2, resource_exists/2, content_types_provided/2, - encodings_provided/2]). + encodings_provided/2 + ]). %% webmachine body-producing functions -export([produce_doc_body/2]). @@ -49,17 +50,17 @@ -include("riak_kv_wm_raw.hrl"). -include_lib("webmachine/include/webmachine.hrl"). --record(ctx, {riak, - security, - client, - table :: undefined | binary(), - mod :: module() - }). +-record(ctx, + { + api_version :: undefined | integer(), + riak, + security, + table :: undefined | binary(), + mod :: module() + }). -type cb_rv_spec(T) :: {T, #wm_reqdata{}, #ctx{}}. --define(DEFAULT_TIMEOUT, 60000). - -spec init(proplists:proplist()) -> {ok, #ctx{}}. %% @doc Initialize this resource. This function extracts the %% 'prefix' and 'riak' properties from the dispatch args. @@ -70,34 +71,35 @@ init(Props) -> {boolean(), #wm_reqdata{}, #ctx{}}. %% @doc Determine whether or not a connection to Riak %% can be established. -service_available(RD, Ctx = #ctx{riak = RiakProps}) -> - case riak_kv_wm_utils:get_riak_client( - RiakProps, riak_kv_wm_utils:get_client_id(RD)) of - {ok, C} -> +service_available(RD, Ctx) -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> Table = riak_kv_wm_ts_util:table_from_request(RD), Mod = riak_ql_ddl:make_module_name(Table), {true, RD, - Ctx#ctx{client = C, - table = Table, + Ctx#ctx{table = Table, mod = Mod}}; - {error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message("Unable to connect to Riak: ~p", [Reason], RD), - {false, Resp, Ctx} + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) end. -is_authorized(RD, #ctx{table=Table}=Ctx) -> +is_authorized(RD, #ctx{table = Table} = Ctx) -> case riak_kv_wm_ts_util:authorize(listkeys, Table, RD) of ok -> {true, RD, Ctx}; {error, ErrorMsg} -> - {ErrorMsg, RD, Ctx}; - {insecure, Halt, Resp} -> - {Halt, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) end. -spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). forbidden(RD, Ctx) -> - Result = riak_kv_wm_utils:is_forbidden(RD), + Result = riak_kv_wm_utils:is_forbidden(RD), {Result, RD, Ctx}. -spec allowed_methods(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([atom()]). @@ -106,15 +108,12 @@ allowed_methods(RD, Ctx) -> {['GET'], RD, Ctx}. -spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). -resource_exists(RD, #ctx{mod=Mod, - table=Table} = Ctx) -> +resource_exists(RD, #ctx{mod = Mod, table = Table} = Ctx) -> case riak_kv_wm_ts_util:table_module_exists(Mod) of true -> {true, RD, Ctx}; false -> - Resp = riak_kv_wm_ts_util:set_error_message( - "table ~p does not exist", [Table], RD), - {false, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) end. -spec encodings_provided(#wm_reqdata{}, #ctx{}) -> @@ -130,10 +129,9 @@ encodings_provided(RD, Ctx) -> content_types_provided(RD, Ctx) -> {[{"text/plain", produce_doc_body}], RD, Ctx}. -produce_doc_body(RD, Ctx = #ctx{table = Table, mod=Mod, - client = Client}) -> +produce_doc_body(RD, Ctx = #ctx{table = Table, mod = Mod}) -> {ok, ReqId} = riak_client:stream_list_keys( - {Table, Table}, undefined, Client), + {Table, Table}, undefined, {riak_client, [node(), undefined]}), {{halt, 200}, wrq:set_resp_body({stream, prepare_stream(ReqId, Table, Mod)}, RD), Ctx}. prepare_stream(ReqId, Table, Mod) -> @@ -205,5 +203,7 @@ value_to_url_string(V, timestamp) -> base_url(Table) -> {ok, [{Server, Port}]} = application:get_env(riak_api, http), - lists:flatten(io_lib:format("http://~s:~B/ts/v1/tables/~s/keys/", - [Server, Port, Table])). + lists:flatten( + io_lib:format( + "http://~s:~B/ts/~s/tables/~s/keys/", + [Server, Port, riak_kv_wm_ts_util:current_api_version_string(), Table])). diff --git a/src/riak_kv_wm_timeseries_query.erl b/src/riak_kv_wm_timeseries_query.erl index 6ebfce1a82..ff2bd7036a 100644 --- a/src/riak_kv_wm_timeseries_query.erl +++ b/src/riak_kv_wm_timeseries_query.erl @@ -44,10 +44,8 @@ process_post/2, content_types_accepted/2, content_types_provided/2, - encodings_provided/2 - ]). - --export([produce_doc_body/2 + encodings_provided/2, + produce_doc_body/2 ]). -include_lib("webmachine/include/webmachine.hrl"). @@ -55,17 +53,19 @@ -include("riak_kv_wm_raw.hrl"). -include("riak_kv_ts.hrl"). --record(ctx, { - table :: 'undefined' | binary(), - mod :: 'undefined' | module(), - method :: atom(), - timeout, %% integer() - passed-in timeout value in ms +-record(ctx, + { + api_version :: undefined | integer(), + table :: undefined | binary(), + mod :: undefined | module(), + method :: atom(), + timeout :: undefined | integer(), %% passed-in timeout value in ms security, %% security context - sql_type, - compiled_query :: undefined | #ddl_v1{} | #riak_sql_describe_v1{} | #riak_select_v1{}, + sql_type :: undefined | riak_kv_qry:query_type(), + compiled_query :: undefined | ?DDL{} | #riak_select_v1{} | + #riak_sql_describe_v1{} | #riak_sql_insert_v1{}, with_props :: undefined | proplists:proplist(), - result :: undefined | ok | {Headers::[binary()], Rows::[ts_rec()]} | - [{entry, proplists:proplist()}] + result :: undefined | ok | {Headers::[binary()], Rows::[ts_rec()]} }). -define(DEFAULT_TIMEOUT, 60000). @@ -84,15 +84,19 @@ init(_Props) -> %% @doc Determine whether or not a connection to Riak %% can be established. service_available(RD, Ctx) -> - case init:get_status() of - {started, _} -> + ApiVersion = riak_kv_wm_ts_util:extract_api_version(RD), + case {riak_kv_wm_ts_util:is_supported_api_version(ApiVersion), + init:get_status()} of + {true, {started, _}} -> + %% always available because no client connection is required {true, RD, Ctx}; - Status -> - Resp = riak_kv_wm_ts_util:set_error_message("Unable to connect to Riak: ~p", - [Status], RD), - {false, Resp, Ctx} + {false, {started, _}} -> + riak_kv_wm_ts_util:handle_error({unsupported_version, ApiVersion}, RD, Ctx); + {_, {InternalStatus, _}} -> + riak_kv_wm_ts_util:handle_error({not_ready, InternalStatus}, RD, Ctx) end. + -spec allowed_methods(#wm_reqdata{}, #ctx{}) -> cb_rv_spec([atom()]). allowed_methods(RD, Ctx) -> {['POST'], RD, Ctx}. @@ -101,35 +105,28 @@ allowed_methods(RD, Ctx) -> malformed_request(RD, Ctx) -> try {SqlType, SQL, WithProps} = query_from_request(RD), - Table = table_from_sql(SQL), + Table = riak_kv_ts_util:queried_table(SQL), Mod = riak_ql_ddl:make_module_name(Table), - {false, RD, Ctx#ctx{sql_type=SqlType, - compiled_query=SQL, - with_props=WithProps, - table=Table, - mod=Mod}} + {false, RD, Ctx#ctx{sql_type = SqlType, + compiled_query = SQL, + with_props = WithProps, + table = Table, + mod = Mod}} catch - throw:{query, Reason} -> - Response = riak_kv_wm_ts_util:set_error_message("bad query: ~s", [Reason], RD), - {true, Response, Ctx}; - throw:{unsupported_sql_type, Type} -> - Response = riak_kv_wm_ts_util:set_error_message( - "The ~p query type is not supported over the HTTP API yet", - [Type], RD), - {{halt, 503}, Response, Ctx} + throw:Condition -> + riak_kv_wm_ts_util:handle_error(Condition, RD, Ctx) end. -spec is_authorized(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|string()|halt()). -is_authorized(RD, #ctx{sql_type=SqlType, table=Table}=Ctx) -> - Call = call_from_sql_type(SqlType), +is_authorized(RD, #ctx{sql_type = SqlType, table = Table} = Ctx) -> + Call = riak_kv_ts_api:api_call_from_sql_type(SqlType), case riak_kv_wm_ts_util:authorize(Call, Table, RD) of ok -> {true, RD, Ctx}; {error, ErrorMsg} -> - ErrorStr = lists:flatten(io_lib:format("~p", [ErrorMsg])), - {ErrorStr, RD, Ctx}; - {insecure, Halt, Resp} -> - {Halt, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({not_permitted, Table, ErrorMsg}, RD, Ctx); + insecure -> + riak_kv_wm_ts_util:handle_error(insecure_connection, RD, Ctx) end. -spec forbidden(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). @@ -162,33 +159,24 @@ content_types_accepted(RD, Ctx) -> -spec resource_exists(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()|halt()). -resource_exists(RD, #ctx{sql_type=ddl, - mod=Mod, - table=Table}=Ctx) -> +resource_exists(RD, #ctx{sql_type = ddl, + mod = Mod, + table = Table} = Ctx) -> case riak_kv_wm_ts_util:table_module_exists(Mod) of false -> {true, RD, Ctx}; true -> - Resp = riak_kv_wm_ts_util:set_error_message("table ~p already exists", - [Table], RD), - {{halt, 409}, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({table_exists, Table}, RD, Ctx) end; -resource_exists(RD, #ctx{sql_type=Type, - mod=Mod, - table=Table}=Ctx) when Type == describe; - Type == select -> +resource_exists(RD, #ctx{sql_type = Type, + mod = Mod, + table = Table} = Ctx) when Type /= ddl -> case riak_kv_wm_ts_util:table_module_exists(Mod) of true -> {true, RD, Ctx}; false -> - Resp = riak_kv_wm_ts_util:set_error_message("table ~p does not exist", - [Table], RD), - {false, Resp, Ctx} - end; -resource_exists(RD, Ctx) -> - Resp = riak_kv_wm_ts_util:set_error_message("no such resource ~p", - wrq:path(RD), RD), - {false, Resp, Ctx}. + riak_kv_wm_ts_util:handle_error({no_such_table, Table}, RD, Ctx) + end. -spec post_is_create(#wm_reqdata{}, #ctx{}) -> cb_rv_spec(boolean()). post_is_create(RD, Ctx) -> @@ -198,35 +186,23 @@ post_is_create(RD, Ctx) -> process_post(RD, #ctx{sql_type = ddl, compiled_query = SQL, with_props = WithProps} = Ctx) -> case create_table(SQL, WithProps) of ok -> - Result = [{success, true}], %% represents ok + Result = [{success, true}], %% represents ok Json = to_json(Result), {true, wrq:append_to_response_body(Json, RD), Ctx}; {error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message("query error: ~p", - [Reason], - RD), - {{halt, 500}, Resp, Ctx} + riak_kv_wm_ts_util:handle_error(Reason, RD, Ctx) end; -process_post(RD, #ctx{sql_type=describe, - compiled_query=SQL, - mod=Mod}=Ctx) -> +process_post(RD, #ctx{sql_type = QueryType, + compiled_query = SQL, + table = Table, + mod = Mod} = Ctx) -> DDL = Mod:get_ddl(), %% might be faster to store this earlier on case riak_kv_ts_api:query(SQL, DDL) of - {ok, Data} -> + {ok, Data} when QueryType == describe -> ColumnNames = [<<"Column">>, <<"Type">>, <<"Is Null">>, <<"Primary Key">>, <<"Local Key">>], Json = to_json({ColumnNames, Data}), {true, wrq:append_to_response_body(Json, RD), Ctx}; - {error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message( - "describe failed: ~p", [Reason], RD), - {{halt, 500}, Resp, Ctx} - end; -process_post(RD, #ctx{sql_type=select, - compiled_query=SQL, - mod=Mod}=Ctx) -> - DDL = Mod:get_ddl(), %% might be faster to store this earlier on - case riak_kv_ts_api:query(SQL, DDL) of {ok, Data} -> {ColumnNames, _ColumnTypes, Rows} = Data, Json = to_json({ColumnNames, Rows}), @@ -235,19 +211,13 @@ process_post(RD, #ctx{sql_type=select, {error, qry_worker_timeout} -> %% the eleveldb process didn't send us any response after %% 10 sec (hardcoded in riak_kv_qry), and probably died - Resp = riak_kv_wm_ts_util:set_error_message( - "qry_worker_timeout", [], RD), - {false, Resp, Ctx}; + riak_kv_wm_ts_util:handle_error(query_worker_timeout, RD, Ctx); {error, backend_timeout} -> %% the eleveldb process did manage to send us a timeout %% response - Resp = riak_kv_wm_ts_util:set_error_message( - "backend_timeout", [], RD), - {false, Resp, Ctx}; + riak_kv_wm_ts_util:handle_error(backend_timeout, RD, Ctx); {error, Reason} -> - Resp = riak_kv_wm_ts_util:set_error_message( - "select query execution error: ~p", [Reason], RD), - {false, Resp, Ctx} + riak_kv_wm_ts_util:handle_error({query_exec_error, QueryType, Table, Reason}, RD, Ctx) end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -260,43 +230,33 @@ query_from_request(RD) -> query_string_from_request(RD) -> case wrq:req_body(RD) of undefined -> - throw({query, "no query in body"}); + throw(no_query_in_body); Str -> binary_to_list(Str) end. compile_query(QueryStr) -> - try riak_ql_parser:ql_parse( - riak_ql_lexer:get_tokens(QueryStr)) of + case catch riak_ql_parser:ql_parse( + riak_ql_lexer:get_tokens(QueryStr)) of + %% parser messages have a tuple for Reason: + {error, {_LineNo, riak_ql_parser, Msg}} when is_integer(_LineNo) -> + throw({query_parse_error, Msg}); + {error, {Token, riak_ql_parser, _}} -> + throw({query_parse_error, io_lib:format("Unexpected token: '~s'", [Token])}); + {'EXIT', {Reason, _StackTrace}} -> %% these come from deep in the lexer + throw({query_parse_error, Reason}); {error, Reason} -> - ErrorMsg = lists:flatten(io_lib:format("parse error: ~p", [Reason])), - throw({query, ErrorMsg}); + throw({query_compile_error, Reason}); {ddl, _DDL, _Props} = Res -> Res; - {Type, Compiled} when Type == select; - Type == describe; - Type == insert -> + {Type, Compiled} -> {ok, SQL} = riak_kv_ts_util:build_sql_record( - Type, Compiled, undefined), - {Type, SQL, undefined}; - {UnsupportedType, _ } -> - throw({unsupported_sql_type, UnsupportedType}) - catch - E:T -> - ErrorMsg = io_lib:format("query error: ~p:~p", [E, T]), - throw({query, ErrorMsg}) + Type, Compiled, undefined), + {Type, SQL, undefined} end. -%% @todo: should really be in riak_ql somewhere -table_from_sql(#ddl_v1{table=Table}) -> Table; -table_from_sql(#riak_select_v1{'FROM'=Table}) -> Table; -table_from_sql(#riak_sql_describe_v1{'DESCRIBE'=Table}) -> Table. - -call_from_sql_type(ddl) -> query_create_table; -call_from_sql_type(select) -> query_select; -call_from_sql_type(describe) -> query_describe. -create_table(DDL = #ddl_v1{table = Table}, Props) -> +create_table(DDL = ?DDL{table = Table}, Props) -> %% would be better to use a function to get the table out. {ok, Props1} = riak_kv_ts_util:apply_timeseries_bucket_props(DDL, Props), Props2 = [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props1], @@ -304,7 +264,7 @@ create_table(DDL = #ddl_v1{table = Table}, Props) -> ok -> wait_until_active(Table, ?TABLE_ACTIVATE_WAIT); {error, Reason} -> - {error,{table_create_fail, Table, Reason}} + {error, {table_create_fail, Table, Reason}} end. wait_until_active(Table, 0) -> diff --git a/src/riak_kv_wm_ts_util.erl b/src/riak_kv_wm_ts_util.erl index 67b13f6e95..a7e1866185 100644 --- a/src/riak_kv_wm_ts_util.erl +++ b/src/riak_kv_wm_ts_util.erl @@ -22,24 +22,45 @@ -module(riak_kv_wm_ts_util). --export([table_from_request/1]). --export([utf8_to_binary/1]). --export([set_text_resp_header/2]). --export([set_error_message/3]). --export([set_json_response/2]). +-export([authorize/3, + current_api_version/0, + current_api_version_string/0, + extract_api_version/1, + handle_error/3, + is_supported_api_version/1, + local_key/1, + local_key_fields_and_types/1, + set_json_response/2, + set_text_resp_header/2, + table_from_request/1, + table_module_exists/1, + utf8_to_binary/1]). --export([authorize/3]). --export([table_module_exists/1]). +-include_lib("webmachine/include/webmachine.hrl"). +-include_lib("riak_ql/include/riak_ql_ddl.hrl"). --export([local_key/1]). +-spec current_api_version() -> non_neg_integer(). +current_api_version() -> + 1. +-spec current_api_version_string() -> string(). +current_api_version_string() -> + "v1". --export([local_key_fields_and_types/1]). +-spec extract_api_version(#wm_reqdata{} | string()) -> integer() | undefined. +extract_api_version(RD = #wm_reqdata{}) -> + extract_api_version( + wrq:path_info(api_version, RD)); +extract_api_version("v1") -> + 1; +extract_api_version(_) -> + undefined. + +is_supported_api_version(Ver) when is_integer(Ver) -> + Ver =< current_api_version(); +is_supported_api_version(_) -> + false. --include_lib("webmachine/include/webmachine.hrl"). --include_lib("riak_ql/include/riak_ql_ddl.hrl"). --include("riak_kv_wm_raw.hrl"). --include("riak_kv_ts.hrl"). %% @private @@ -57,19 +78,13 @@ flat_format(Format, Args) -> set_text_resp_header(IoList, RD) -> wrq:set_resp_header( - "Content-Type", "text/plain", wrq:append_to_response_body(IoList,RD)). - -set_error_message(Format, Args, RD) -> - Str = flat_format(Format, Args), - Json = mochijson2:encode([{error, list_to_binary(Str)}]), - set_json_response(Json, RD). + "Content-Type", "text/plain", wrq:append_to_response_body(IoList, RD)). set_json_response(Json, RD) -> wrq:set_resp_header("Content-Type", "application/json", wrq:append_to_response_body(Json, RD)). - authorize(Call, Table, RD) -> case riak_api_web_security:is_authorized(RD) of false -> @@ -78,17 +93,14 @@ authorize(Call, Table, RD) -> ok; {true, SecContext} -> case riak_core_security:check_permission( - {riak_kv_ts_util:api_call_to_perm(Call), Table}, SecContext) of + {riak_kv_ts_api:api_call_to_perm(Call), Table}, SecContext) of {false, Error, _} -> {error, utf8_to_binary(Error)}; _ -> ok end; insecure -> - ErrorMsg = "Security is enabled and Riak does not" ++ - " accept credentials over HTTP. Try HTTPS instead.", - Resp = set_text_resp_header(ErrorMsg, RD), - {insecure, {halt, 426}, Resp} + insecure end. %% @todo: this should be in riak_ql_ddl and should probably check deeper. @@ -107,9 +119,8 @@ local_key(Mod) -> ddl_local_key(Mod:get_ddl()). %% this should be in the DDL helper module. --spec ddl_local_key(#ddl_v1{}) -> [binary()]. -ddl_local_key(#ddl_v1{local_key=LK}) -> - #key_v1{ast=Ast} = LK, +-spec ddl_local_key(?DDL{}) -> [binary()]. +ddl_local_key(?DDL{local_key = #key_v1{ast = Ast}}) -> [ param_name(P) || P <- Ast]. param_name(#param_v1{name=[Name]}) -> @@ -120,3 +131,115 @@ local_key_fields_and_types(Mod) -> Types = [Mod:get_field_type([F]) || F <- LK ], LKStr = [ binary_to_list(F) || F <- LK ], lists:zip(LKStr, Types). + + +error_out(Type, Fmt, Args, RD, Ctx) -> + {Type, + wrq:set_resp_header( + "Content-Type", "text/plain", wrq:append_to_response_body( + flat_format(Fmt, Args), RD)), + Ctx}. + +-spec handle_error(atom()|tuple(), #wm_reqdata{}, Ctx::tuple()) -> + {tuple(), #wm_reqdata{}, Ctx::tuple()}. +handle_error(Error, RD, Ctx) -> + case Error of + {not_ready, State} -> + error_out(false, + "Not ready (~s)", [State], RD, Ctx); + insecure_connection -> + error_out({halt, 426}, + "Security is enabled and Riak does not" + " accept credentials over HTTP. Try HTTPS instead.", [], RD, Ctx); + {unsupported_version, BadVersion} -> + error_out({halt, 412}, + "Unsupported API version ~s", [BadVersion], RD, Ctx); + {not_permitted, Table, ErrMsg} -> + error_out({halt, 401}, + "Access to table \"~ts\" not allowed (~s)", [Table, ErrMsg], RD, Ctx); + {malformed_request, Method} -> + error_out({halt, 400}, + "Malformed ~s request", [Method], RD, Ctx); + {url_key_bad_method, Method} -> + error_out({halt, 400}, + "Inappropriate ~s request", [Method], RD, Ctx); + {bad_parameter, Param} -> + error_out({halt, 400}, + "Bad value for parameter \"~s\"", [Param], RD, Ctx); + {no_such_table, Table} -> + error_out({halt, 404}, + "Table \"~ts\" does not exist", [Table], RD, Ctx); + {table_exists, Table} -> + error_out({halt, 409}, + "Table \"~ts\" already exists", [Table], RD, Ctx); + {failed_some_puts, NoOfFailures, Table} -> + error_out({halt, 400}, + "Failed to put ~b records to table \"~ts\"", [NoOfFailures, Table], RD, Ctx); + {invalid_data, BadRowIdxs} -> + error_out({halt, 400}, + "Invalid record #~s", [hd(BadRowIdxs)], RD, Ctx); + invalid_json -> + error_out({halt, 400}, + "Invalid json in body", [], RD, Ctx); + {key_element_count_mismatch, Got, Need} -> + error_out({halt, 400}, + "Incorrect number of elements (~b) for key of length ~b", [Need, Got], RD, Ctx); + {bad_field, Table, Field} -> + error_out({halt, 400}, + "Table \"~ts\" has no field named \"~s\"", [Table, Field], RD, Ctx); + {missing_field, Table, Field} -> + error_out({halt, 400}, + "Missing field \"~s\" for key in table \"~ts\"", [Field, Table], RD, Ctx); + {bad_value, Table, {Field, Type}} -> + error_out({halt, 400}, + "Bad value for field \"~s\" of type ~s in table \"~ts\"", + [Field, Type, Table], RD, Ctx); + {url_malformed_key_path, NPathElements, KeyLength, Table} -> + error_out({halt, 400}, + "Need ~b field/value pairs for key in table \"~ts\", got ~b path elements", + [KeyLength, Table, NPathElements], RD, Ctx); + url_has_not_all_lk_fields -> + error_out({halt, 400}, + "Not all key-constituent fields given on URL", [], RD, Ctx); + notfound -> + error_out({halt, 404}, + "Key not found", [], RD, Ctx); + no_query_in_body -> + error_out({halt, 400}, + "No query in request body", [], RD, Ctx); + {query_parse_error, Details} -> + error_out({halt, 400}, + "Query error: ~s", [Details], RD, Ctx); + {query_compile_error, Details} -> + error_out({halt, 400}, + "~s", [Details], RD, Ctx); + {table_create_fail, Table, Reason} -> + error_out({halt, 500}, + "Failed to create table \"~ts\": ~p", [Table, Reason], RD, Ctx); + {table_activate_fail, Table} -> + error_out({halt, 500}, + "Failed to activate bucket type of table \"~ts\"", [Table], RD, Ctx); + {table_created_missing, Table} -> + error_out({halt, 500}, + "Table \"~ts\" has been created but disappeared", [Table], RD, Ctx); + {query_exec_error, Type, Table, {_Kind, Explained}} -> + error_out({halt, 500}, + "Execution of ~s query failed on table \"~ts\" (~s)", + [Type, Table, Explained], RD, Ctx); + {query_exec_error, Type, Table, ErrMsg} -> + error_out({halt, 500}, + "Execution of ~s query failed on table \"~ts\" (~p)", + [Type, Table, ErrMsg], RD, Ctx); + query_worker_timeout -> + error_out({halt, 503}, + "Query worker timeout", [], RD, Ctx); + backend_timeout -> + error_out({halt, 503}, + "Storage backend timeout", [], RD, Ctx); + {parameter_error, Message} -> + error_out({halt, 400}, + "~s", [Message], RD, Ctx); + {riak_error, Detailed} -> + error_out({halt, 500}, + "Internal riak error: ~p", [Detailed], RD, Ctx) + end.