Skip to content

Commit

Permalink
Merge pull request #1371 from lehoff/feature-th-http_client
Browse files Browse the repository at this point in the history
Feature th http client for RTS-258

Reviewed-by: hmmr
  • Loading branch information
borshop committed Apr 5, 2016
2 parents 4e20c20 + 383e049 commit 2985758
Show file tree
Hide file tree
Showing 23 changed files with 2,133 additions and 1,480 deletions.
20 changes: 14 additions & 6 deletions include/riak_kv_ts.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
%% For dialyzer types
-include_lib("riak_ql/include/riak_ql_ddl.hrl").

%% the result type of a query, rows means to return all mataching rows, aggregate
%% the result type of a query, rows means to return all matching rows, aggregate
%% returns one row calculated from the result set for the query.
-type select_result_type() :: rows | aggregate.

-record(riak_sel_clause_v1,
{
calc_type = rows :: select_result_type(),
initial_state = [] :: [any()],
col_return_types = [] :: [field_type()],
col_return_types = [] :: [riak_ql_ddl:field_type()],
col_names = [] :: [binary()],
clause = [] :: [riak_kv_qry_compiler:compiled_select()],
finalisers = [] :: [skip | function()]
Expand All @@ -44,9 +44,9 @@
{
'SELECT' :: #riak_sel_clause_v1{},
'FROM' = <<>> :: binary() | {list, [binary()]} | {regex, list()},
'WHERE' = [] :: [filter()],
'ORDER BY' = [] :: [sorter()],
'LIMIT' = [] :: [limit()],
'WHERE' = [] :: [riak_ql_ddl:filter()],
'ORDER BY' = [] :: [riak_kv_qry_compiler:sorter()],
'LIMIT' = [] :: [riak_kv_qry_compiler:limit()],
helper_mod :: atom(),
%% will include groups when we get that far
partition_key = none :: none | #key_v1{},
Expand All @@ -59,7 +59,15 @@

-record(riak_sql_describe_v1,
{
'DESCRIBE' = <<>> :: binary()
'DESCRIBE' = <<>> :: binary()
}).

-record(riak_sql_insert_v1,
{
'INSERT' = <<>> :: binary(),
fields :: [riak_ql_ddl:field_identifier()],
values :: [[riak_ql_ddl:data_value()]],
helper_mod :: atom()
}).

-define(SQL_SELECT, #riak_select_v1).
Expand Down
13 changes: 13 additions & 0 deletions include/riak_kv_vnode.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
type :: primary | fallback
}).

%% Currently only for timeseries batches
-record(riak_kv_w1c_batch_put_req_v1, {
objs :: list({{binary(), binary()}, binary()}),
type :: primary | fallback
}).

-record(riak_kv_w1c_batch_put_reply_v1, {
reply :: ok | {error, term()},
type :: primary | fallback
}).

-record(riak_kv_get_req_v1, {
bkey :: {binary(), binary()},
req_id :: non_neg_integer()}).
Expand Down Expand Up @@ -75,6 +86,8 @@
-define(KV_PUT_REQ, #riak_kv_put_req_v1).
-define(KV_W1C_PUT_REQ, #riak_kv_w1c_put_req_v1).
-define(KV_W1C_PUT_REPLY, #riak_kv_w1c_put_reply_v1).
-define(KV_W1C_BATCH_PUT_REQ, #riak_kv_w1c_batch_put_req_v1).
-define(KV_W1C_BATCH_PUT_REPLY, #riak_kv_w1c_batch_put_reply_v1).
-define(KV_GET_REQ, #riak_kv_get_req_v1).
-define(KV_LISTBUCKETS_REQ, #riak_kv_listbuckets_req_v1).
-define(KV_LISTKEYS_REQ, #riak_kv_listkeys_req_v4).
Expand Down
11 changes: 10 additions & 1 deletion src/riak_kv.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@

{timeseries_query_max_quanta_span, 5},

{timeseries_max_concurrent_queries, 3}
{timeseries_max_concurrent_queries, 3},

%% Max batch size (in bytes) of data distributed between
%% nodes during a put operation. Highly recommended that you
%% not increase this above 1MB.
%%
%% This is not a hard cap; the number of records to generate
%% a batch under this value will be estimated based on the
%% size of the first record.
{timeseries_max_batch_size, 1048576}
]}
]}.
4 changes: 4 additions & 0 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ start(_Type, _StartArgs) ->
[true, false],
false),

riak_core_capability:register({riak_kv, w1c_batch_vnode},
[true, false],
false),

%% mapred_system should remain until no nodes still exist
%% that would propose 'legacy' as the default choice
riak_core_capability:register({riak_kv, mapred_system},
Expand Down
2 changes: 1 addition & 1 deletion src/riak_kv_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% riak_kv_bucket: bucket validation functions
%%
%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down
88 changes: 58 additions & 30 deletions src/riak_kv_console.erl
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,19 @@ bucket_type_create(CreateTypeFn, Type, {struct, Fields}) ->
[{<<"props", _/binary>>, {struct, Props1}}] ->
case catch riak_kv_ts_util:maybe_parse_table_def(Type, Props1) of
{ok, Props2} ->
Props3 = [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props2],
CreateTypeFn(Props3);
case catch [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props2] of
{bad_linkfun_modfun, {M, F}} ->
io:format("Invalid link mod or fun in bucket type properties: ~p:~p\n", [M, F]),
error;
{bad_linkfun_bkey, {B, K}} ->
io:format("Malformed bucket/key for anon link fun in bucket type properties: ~p/~p\n", [B, K]),
error;
{bad_chash_keyfun, {M, F}} ->
io:format("Invalid chash mod or fun in bucket type properties: ~p:~p\n", [M, F]),
error;
Props3 ->
CreateTypeFn(Props3)
end;
{error, ErrorMessage} when is_list(ErrorMessage) orelse is_binary(ErrorMessage) ->
bucket_type_print_create_result_error_header(Type),
io:format("~ts~n", [ErrorMessage]),
Expand Down Expand Up @@ -566,8 +577,13 @@ bucket_type_update([TypeStr, PropsStr]) ->
bucket_type_update(Type, {struct, Fields}) ->
case proplists:get_value(<<"props">>, Fields) of
{struct, Props} ->
ErlProps = [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props],
bucket_type_print_update_result(Type, riak_core_bucket_type:update(Type, ErlProps));
case catch [riak_kv_wm_utils:erlify_bucket_prop(P) || P <- Props] of
{bad_bucket_property, BadProp} ->
io:format("Invalid bucket type property: ~ts\n", [BadProp]),
error;
ErlProps ->
bucket_type_print_update_result(Type, riak_core_bucket_type:update(Type, ErlProps))
end;
_ ->
io:format("Cannot create bucket type ~ts: no props field found in json~n", [Type]),
error
Expand Down Expand Up @@ -875,6 +891,7 @@ bucket_error_xlate(X) ->

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include_lib("riak_ql/include/riak_ql_ddl.hrl").

json_props(Props) ->
lists:flatten(mochijson2:encode([{props, Props}])).
Expand All @@ -900,18 +917,29 @@ bucket_type_create_with_timeseries_table_test() ->
"user varchar not null, ",
"time timestamp not null, ",
"PRIMARY KEY ((series, user, quantum(time, 15, m)), "
"series, user, time))">>,
JSON = json_props([{bucket_type, my_type},
{table_def, TableDef}]),
"series, user, time))"
" with (n_val=42)">>,
JSON = json_props([{bucket_type, my_type},
{table_def, TableDef},
{n_val, 41}]),
bucket_type_create(
fun(Props) -> put(Ref, Props) end,
<<"my_type">>,
mochijson2:decode(JSON)
),
fun(Props) -> put(Ref, Props) end,
<<"my_type">>,
mochijson2:decode(JSON)
),
?assertMatch(
[{ddl, _}, {bucket_type, <<"my_type">>} | _],
get(Ref)
).
{n_val, 42}, %% 42 set in query via 'with'
%% takes precedence over 41 from sidecar properties
lists:keyfind(n_val, 1, get(Ref))
),
?assertMatch(
{ddl, _},
lists:keyfind(ddl, 1, get(Ref))
),
?assertMatch(
{bucket_type, <<"my_type">>},
lists:keyfind(bucket_type, 1, get(Ref))
).

bucket_type_create_with_timeseries_table_is_write_once_test() ->
Ref = make_ref(),
Expand Down Expand Up @@ -977,23 +1005,24 @@ bucket_type_create_with_timeseries_table_error_when_write_once_set_to_false_test
)
).

bucket_type_create_with_timeseries_table_error_with_short_primary_key_test() ->
Ref = make_ref(),
bucket_type_create_with_timeseries_table_with_two_element_key_test() ->
TableDef =
<<"CREATE TABLE my_type (",
"user varchar not null, ",
"time timestamp not null, ",
"PRIMARY KEY ((user, quantum(time, 15, m)), "
"user, time))">>,
"user varchar not null, ",
"time timestamp not null, ",
"PRIMARY KEY ((user, quantum(time, 15, m)), user, time))">>,
JSON = json_props([{bucket_type, my_type},
{table_def, TableDef}]),
?assertEqual(
error,
bucket_type_create(
fun(Props) -> put(Ref, Props) end,
<<"my_type">>,
mochijson2:decode(JSON)
)
Result = bucket_type_create(
fun(Props) -> Props end,
<<"my_type">>,
mochijson2:decode(JSON)
),
% just assert that this returns a ddl prop
HaveDDL = proplists:get_value(ddl, Result),
?assertMatch(
?DDL{},
HaveDDL
).

bucket_type_create_with_timeseries_table_error_with_misplaced_quantum_test() ->
Expand Down Expand Up @@ -1024,10 +1053,9 @@ bucket_type_and_table_error_local_key_test() ->
"user varchar not null, ",
"time timestamp not null, ",
"other varchar not null, ",
"PRIMARY KEY ((series, user, quantum(time, 15, m)), "
"series, user, time, other))">>,
"PRIMARY KEY ((series, user, quantum(time, 15, m)), seriesTYPO, user, time))">>,
JSON = json_props([{bucket_type, my_type},
{table_def, TableDef}]),
{table_def, TableDef}]),
?assertEqual(
error,
bucket_type_create(
Expand Down
Loading

0 comments on commit 2985758

Please sign in to comment.