diff --git a/eqc/ec_eqc.erl b/eqc/ec_eqc.erl index df99c54d0..63cc87007 100644 --- a/eqc/ec_eqc.erl +++ b/eqc/ec_eqc.erl @@ -671,13 +671,15 @@ get_fsm_proc(ReqId, #params{n = N, r = R}) -> DeletedVclock = true, ExpectedVclock = false, NodeConfirms = 0, + ReturnBody = true, GetCore = riak_kv_get_core:init(N, R, 0, %% SLF hack FailThreshold, NotFoundOk, AllowMult, DeletedVclock, [{Idx, primary} || Idx <- lists:seq(1, N)], %% SLF hack ExpectedVclock, - NodeConfirms + NodeConfirms, + ReturnBody ), #proc{name = {get_fsm, ReqId}, handler = get_fsm, procst = #getfsmst{getcore = GetCore}}. diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 1786a4e68..513dcaddd 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -579,6 +579,74 @@ hidden ]}. +%% @doc Mode for handling conditional checks on PUTs +%% Handling if-not-modified (vclock-based) and if-none-match conditionals on +%% PUTS, there are three possible modes: +%% - api_only (default) +%% - prefer_token +%% - mandate_token (not currently implemented) +%% +%% In the api_only mode, a read will be done before the write within the API, +%% and if the read passes the condition the PUT will be allowed (even though +%% a parallel conditional PUT may be in-flight). +%% In the prefer_token mode a token must be requested for the key to be +%% updated, and the read and the write will be managed within the token +%% session. Only a single update will normally have access to the token, +%% requests will queue for use of the token. When a token cannot be secured +%% within a timeout, then the api_only method will be used. +%% +%% Future releases may support a mandate_token mode which will error on failure +%% to get a token, rather than proceed and accept eventual consistency (as with +%% prefer_token). +%% +%% Only conditional PUTs are impacted by this setting, non-conditional PUTs +%% being sent in parallel to conditional PUTs may cause siblings. The +%% HTTP standard headers of If-Unmodified-Since and If-Match, are always +%% applied as api_only checks. The riak-specific vector-clock based +%% if-not-modified header, and the HTTP-default if-none-match header are the +%% only conditional PUTs that will be subject to stronger, token-based +%% restrictions. +{mapping, "conditional_put_mode", "riak_kv.conditional_put_mode", [ + {default, api_only}, + {datatype, {enum, [api_only, prefer_token]}} +]}. + +%% @doc Set the level of verification required on token access +%% When requesting access to a token, this can be done in three different +%% verification modes: +%% - head_only +%% - basic_consensus +%% - primary_consensus (default) +%% +%% The head_only mode will make the node currently at the head of each preflist +%% responsible for granting tokens in isolation. This is intended to meet +%% constraints only in healthy clusters (or single-node clusters). +%% +%% In a consensus mode, the token will be granted by the node at the head of +%% the preflist, and the issuance will be validated by up to two "downstream" +%% nodes. This means that when a node recovers from failure, and becomes head +%% of the preflist it is prevented from making grants which are duplicates of +%% ones made by a downstream node during the failure. +%% +%% There are two forms of consensus - basic and primary. With basic +%% consensus, any avaliable unique nodes in the preflist (either primary or +%% fallback) can be used for consensus. With primary consensus, the nodes +%% must be 3 of 5 primary nodes (and hence a target_n_val of at least 5 is +%% required in this mode). +%% +%% With basic_consensus, tokens can still be granted in a wide range of failure +%% scenarios, but with a risk of duplicate grants, in particular should a +%% cluster be partitioned. +%% +%% No mode provides strict guarantees, including primary_consensus, especially +%% in complex partition scenarios where different nodes have alternative views +%% of node reachability. +{mapping, "token_request_mode", "riak_kv.token_request_mode", [ + {default, primary_consensus}, + {datatype, {enum, [head_only, basic_consensus, primary_consensus]}} +]}. + + %% @doc Controls which binary representation of a riak value is stored %% on disk. %% * 0: Original erlang:term_to_binary format. Higher space overhead. diff --git a/src/riak_client.erl b/src/riak_client.erl index 860949284..18ed1aa0b 100644 --- a/src/riak_client.erl +++ b/src/riak_client.erl @@ -58,7 +58,6 @@ %% @type default_timeout() = 60000 -define(DEFAULT_TIMEOUT, 60000). -define(DEFAULT_FOLD_TIMEOUT, 3600000). --define(DEFAULT_ERRTOL, 0.00003). %% TODO: This type needs to be better specified and validated against %% any dependents on riak_kv. diff --git a/src/riak_kv_get_core.erl b/src/riak_kv_get_core.erl index 811bfa57f..132b74d79 100644 --- a/src/riak_kv_get_core.erl +++ b/src/riak_kv_get_core.erl @@ -20,7 +20,7 @@ %% %% ------------------------------------------------------------------- -module(riak_kv_get_core). --export([init/10, update_init/2, head_merge/1, +-export([init/11, update_init/2, head_merge/1, add_result/4, update_result/5, result_shortcode/1, enough/1, response/1, has_all_results/1, final_action/1, info/1]). -export_type([getcore/0, result/0, reply/0, final_action/0]). @@ -76,6 +76,7 @@ head_merge = false :: boolean(), expected_fetchclock = false :: boolean()|vclock:vclock(), node_confirms = 0 :: non_neg_integer(), + return_body = true :: boolean(), confirmed_nodes = []}). -opaque getcore() :: #getcore{}. @@ -89,26 +90,30 @@ AllowMult::boolean(), DeletedVClock::boolean(), IdxType::idx_type(), ExpClock::false|vclock:vclock(), - NodeConfirms::non_neg_integer()) -> getcore(). + NodeConfirms::non_neg_integer(), + ReturnBody::boolean() + ) -> getcore(). init(N, R, PR, FailThreshold, NotFoundOk, AllowMult, - DeletedVClock, IdxType, ExpClock, NodeConfirms) -> - #getcore{n = N, - r = case ExpClock of false -> R; _ -> N end, - pr = PR, - ur = 0, - fail_threshold = FailThreshold, - notfound_ok = NotFoundOk, - allow_mult = AllowMult, - deletedvclock = DeletedVClock, - idx_type = IdxType, - expected_fetchclock = ExpClock, - node_confirms = NodeConfirms}. + DeletedVClock, IdxType, ExpClock, NodeConfirms, ReturnBody) -> + #getcore{ + n = N, + r = case ExpClock of false -> R; _ -> N end, + pr = PR, + ur = 0, + fail_threshold = FailThreshold, + notfound_ok = NotFoundOk, + allow_mult = AllowMult, + deletedvclock = DeletedVClock, + idx_type = IdxType, + expected_fetchclock = ExpClock, + node_confirms = NodeConfirms, + return_body = ReturnBody + }. %% Re-initialise a get to a restricted number of vnodes (that must all respond) -spec update_init(N::pos_integer(), getcore()) -> getcore(). update_init(N, PrevGetCore) -> - PrevGetCore#getcore{ur = N, - head_merge = true}. + PrevGetCore#getcore{ur = N, head_merge = true}. %% Convert the get so that it is expecting to potentially receive the %% responses to head requests (though for backwards compatibility these may @@ -279,7 +284,7 @@ response(#getcore{r = R, num_ok = NumOK, pr= PR, num_pok = NumPOK, when (NumOK >= R andalso NumPOK >= PR) orelse ExpClock == true -> #getcore{results = Results, allow_mult=AllowMult, deletedvclock = DeletedVClock} = GetCore, - Merged = merge_heads(Results, AllowMult), + Merged = merge_heads(Results, AllowMult, GetCore#getcore.return_body), case Merged of {ok, _MergedObj} -> {Merged, GetCore#getcore{merged = Merged}}; % {ok, MObj} @@ -430,11 +435,11 @@ merge(Replies, AllowMult) -> %% a backend not supporting HEAD was called, or the operation was an UPDATE), %% or body-less objects. %% --spec merge_heads(list(result()), boolean()) -> +-spec merge_heads(list(result()), boolean(), boolean()) -> {notfound, undefined}| {tombstone, riak_object:riak_object()}|{ok, riak_object:riak_object()}| {fetch, list(non_neg_integer())}. -merge_heads(Replies, AllowMult) -> +merge_heads(Replies, AllowMult, ReturnBody) -> % Replies should be a list of [{Idx, {ok, RObj}] IdxObjs = [{I, {ok, RObj}} || {I, {ok, RObj}} <- Replies], % Those that don't pattern match will be not_found @@ -443,17 +448,16 @@ merge_heads(Replies, AllowMult) -> {notfound, undefined}; _ -> {BestReplies, FetchIdxObjL} = riak_object:find_bestobject(IdxObjs), - case FetchIdxObjL of - [] -> + case {FetchIdxObjL, ReturnBody} of + {_, false} -> + merge(BestReplies ++ FetchIdxObjL, AllowMult); + {[], true} -> merge(BestReplies, AllowMult); - IdxL -> + {IdxL, true} -> {fetch, lists:map(fun({Idx, _Rsp}) -> Idx end, IdxL)} end end. - - - %% @private Checks IdxType to see if Idx is a primary. %% If the Idx is not in the IdxType the world must be %% resizing (ring expanding). In that case, Idx is @@ -474,36 +478,6 @@ num_pr(GetCore = #getcore{num_pok=NumPOK, idx_type=IdxType}, Idx) -> GetCore end. -%% @private Print a warning if objects are not equal. Only called on case of no read-repair -%% This situation could happen with pre 2.1 vclocks in very rare cases. Fixing the object -%% requires the user to rewrite the object in 2.1+ of Riak. Logic is enabled when capabilities -%% returns a version(all nodes at least 2.2) and the entropy_manager is not yet version 0 -% maybe_log_old_vclock(Results) -> -% case riak_core_capability:get({riak_kv, object_hash_version}, legacy) of -% legacy -> -% ok; -% 0 -> -% Version = riak_kv_entropy_manager:get_version(), -% case [RObj || {_Idx, {ok, RObj}} <- Results] of -% [] -> -% ok; -% [_] -> -% ok; -% _ when Version == 0 -> -% ok; -% [R1|Rest] -> -% case [RObj || RObj <- Rest, not riak_object:equal(R1, RObj)] of -% [] -> -% ok; -% _ -> -% object:warning("Bucket: ~p Key: ~p should be rewritten to guarantee -% compatability with AAE version 0", -% [riak_object:bucket(R1),riak_object:key(R1)]) -% end -% end; -% _ -> -% ok -% end. -ifdef(TEST). diff --git a/src/riak_kv_get_fsm.erl b/src/riak_kv_get_fsm.erl index da53cbb91..cc06edf32 100644 --- a/src/riak_kv_get_fsm.erl +++ b/src/riak_kv_get_fsm.erl @@ -346,11 +346,16 @@ validate(timeout, StateData=#state{from = {raw, ReqId, _Pid}, options = Options, NFOk0 = get_option(notfound_ok, Options, default), NotFoundOk = riak_kv_util:expand_value(notfound_ok, NFOk0, BucketProps), DeletedVClock = get_option(deletedvclock, Options, false), - GetCore = riak_kv_get_core:init(N, R, PR, FailThreshold, - NotFoundOk, AllowMult, - DeletedVClock, IdxType, - ExpClock, - NodeConfirms), + ReturnBody = get_option(return_body, Options, true), + GetCore = + riak_kv_get_core:init( + N, R, PR, FailThreshold, + NotFoundOk, AllowMult, + DeletedVClock, IdxType, + ExpClock, + NodeConfirms, + ReturnBody + ), new_state_timeout(execute, StateData#state{get_core = GetCore, timeout = Timeout, req_id = ReqId}); diff --git a/src/riak_kv_pb_object.erl b/src/riak_kv_pb_object.erl index 48ea82aaa..7c986b5da 100644 --- a/src/riak_kv_pb_object.erl +++ b/src/riak_kv_pb_object.erl @@ -52,7 +52,8 @@ -module(riak_kv_pb_object). -include_lib("riak_pb/include/riak_kv_pb.hrl"). --include_lib("riak_pb/include/riak_pb_kv_codec.hrl"). +-include_lib("kernel/include/logger.hrl"). + -ifdef(TEST). -compile([export_all, nowarn_export_all]). @@ -69,12 +70,13 @@ -import(riak_pb_kv_codec, [decode_quorum/1]). --record(state, {client, % local client - req, % current request (for multi-message requests like list keys) - req_ctx, % context to go along with request (partial results, request ids etc) - is_consistent = false :: boolean(), - client_id = <<0,0,0,0>>, % emulate legacy API when vnode_vclocks is true - repl_compress = false :: boolean()}). +-record(state, + { + client, % local client + client_id = <<0,0,0,0>>, + % emulate legacy API when vnode_vclocks is true + repl_compress = false :: boolean() + }). %% @doc init/0 callback. Returns the service internal start %% state. @@ -273,53 +275,18 @@ process(#rpbputreq{key = <<>>}, State) -> {error, "Key cannot be zero-length", State}; process(#rpbputreq{type = <<>>}, State) -> {error, "Type cannot be zero-length", State}; -process(#rpbputreq{bucket=B0, type=T, key=K, vclock=PbVC, - if_not_modified=NotMod, if_none_match=NoneMatch, - n_val=N_val, sloppy_quorum=SloppyQuorum} = Req, - #state{client=C} = State) when NotMod; NoneMatch -> - GetOpts = make_option(n_val, N_val) ++ - make_option(sloppy_quorum, SloppyQuorum), - B = maybe_bucket_type(T, B0), - Result = - case riak_kv_util:consistent_object(B) of - true -> - consistent; - false -> - riak_client:get(B, K, GetOpts, C) - end, - case Result of - consistent -> - process(Req#rpbputreq{if_not_modified=undefined, - if_none_match=undefined}, - State#state{is_consistent = true}); - {ok, _} when NoneMatch -> - {error, "match_found", State}; - {ok, O} when NotMod -> - case erlify_rpbvc(PbVC) == riak_object:vclock(O) of - true -> - process(Req#rpbputreq{if_not_modified=undefined, - if_none_match=undefined}, - State); - _ -> - {error, "modified", State} - end; - {error, _} when NoneMatch -> - process(Req#rpbputreq{if_not_modified=undefined, - if_none_match=undefined}, - State); - {error, notfound} when NotMod -> - {error, "notfound", State}; - {error, Reason} -> - {error, {format, Reason}, State} - end; - -process(#rpbputreq{bucket=B0, type=T, key=K, vclock=PbVC, content=RpbContent, - w=W0, dw=DW0, pw=PW0, return_body=ReturnBody, - return_head=ReturnHead, timeout=Timeout, asis=AsIs, - n_val=N_val, sloppy_quorum=SloppyQuorum, - node_confirms=NodeConfirms0}, - #state{client=C} = State0) -> +process( + #rpbputreq{ + bucket=B0, type=T, key=K, vclock=PbVC, + content=RpbContent, + w=W0, dw=DW0, pw=PW0, + n_val=N_val, sloppy_quorum=SloppyQuorum, node_confirms=NodeConfirms0, + return_body=ReturnBody, return_head=ReturnHead, + timeout=Timeout, + asis=AsIs, if_not_modified=IfNotModified, if_none_match=IfNoneMatch}, + #state{client=C} = State) -> + B = maybe_bucket_type(T, B0), case K of undefined -> %% Generate a key, the user didn't supply one @@ -330,66 +297,148 @@ process(#rpbputreq{bucket=B0, type=T, key=K, vclock=PbVC, content=RpbContent, %% Don't return the key since we're not generating one ReturnKey = undefined end, - B = maybe_bucket_type(T, B0), - O0 = riak_object:new(B, Key, <<>>), - O1 = update_rpbcontent(O0, RpbContent), - O = update_pbvc(O1, PbVC), - %% erlang_protobuffs encodes as 1/0/undefined - W = decode_quorum(W0), - DW = decode_quorum(DW0), - PW = decode_quorum(PW0), - NodeConfirms = decode_quorum(NodeConfirms0), - B = maybe_bucket_type(T, B0), - Options = case ReturnBody of - 1 -> [returnbody]; - true -> [returnbody]; - _ -> - case ReturnHead of - true -> [returnbody]; - _ -> [] - end - end, - {Options2, State} = - case State0#state.is_consistent of - true -> - {[{if_none_match, true}|Options], - State0#state{is_consistent = false}}; - _ -> - {Options, State0} + IsConsistent = riak_kv_util:consistent_object(B), + CondPutMode = + application:get_env(riak_kv, conditional_put_mode, api_only), + MakeTokenRequest = CondPutMode =/= api_only, + + {CheckResult, CondPutOpts, SessionToken} = + case {IfNotModified, IfNoneMatch, IsConsistent, MakeTokenRequest} of + {_, true, true, _} -> + {ok, [{if_none_match, true}], none}; + {undefined, undefined, _, _} -> + {ok, [], none}; + {NotMod, NoneMatch, false, true} -> + GetOpts = + make_options( + [ + {basic_quorum, true}, + {return_body, false}, + {deleted_vclock, true} + ]), + TokenResult = + riak_kv_token_session:session_request_retry({B, K}), + case TokenResult of + {true, Token} -> + Condition = + case NotMod of + undefined -> + {undefined, true, GetOpts}; + _ -> + InClock = erlify_rpbvc(PbVC), + {{true, InClock}, undefined, GetOpts} + end, + {ok, [{condition_check, Condition}], Token}; + _ -> + ?LOG_WARNING( + "Fallback to weak check as no token available " + "for ~p ~p", + [B, K] + ), + {CheckR, PutOpts} = + riak_kv_put_fsm:conditional_check( + riak_client:get(B, K, GetOpts, C), + {NotMod, erlify_rpbvc(PbVC)}, + NoneMatch + ), + {CheckR, PutOpts, none} + end; + {NotMod, NoneMatch, false, false} -> + GetOpts = + make_option(n_val, N_val) ++ + make_option(sloppy_quorum, SloppyQuorum) ++ + make_option(timeout, Timeout), + {CheckR, PutOpts} = + riak_kv_put_fsm:conditional_check( + riak_client:get(B, K, GetOpts, C), + {NotMod, erlify_rpbvc(PbVC)}, + NoneMatch + ), + {CheckR, PutOpts, none} end, - Opts = - make_options([{w, W}, {dw, DW}, {pw, PW}, - {node_confirms, NodeConfirms}, - {timeout, Timeout}, {asis, AsIs}, - {n_val, N_val}, - {sloppy_quorum, SloppyQuorum}]) ++ Options2, - case riak_client:put(O, Opts, C) of - ok when is_binary(ReturnKey) -> - PutResp = #rpbputresp{key = ReturnKey}, - {reply, PutResp, State}; - ok -> - {reply, #rpbputresp{}, State}; - {ok, Obj} -> - Contents = riak_object:get_contents(Obj), - PbContents = case ReturnHead of - true -> - %% Remove all the 'value' fields from the contents - %% This is a rough equivalent of a REST HEAD - %% request - BlankContents = [{MD, <<>>} || {MD, _} <- Contents], - riak_pb_kv_codec:encode_contents(BlankContents); - _ -> - riak_pb_kv_codec:encode_contents(Contents) - end, - PutResp = #rpbputresp{content = PbContents, - vclock = pbify_rpbvc(riak_object:vclock(Obj)), - key = ReturnKey - }, - {reply, PutResp, State}; - {error, notfound} -> - {reply, #rpbputresp{}, State}; + case CheckResult of {error, Reason} -> - {error, {format, Reason}, State} + riak_kv_token_session:session_release(SessionToken), + {error, Reason, State}; + ok -> + O0 = riak_object:new(B, Key, <<>>), + O1 = update_rpbcontent(O0, RpbContent), + O = update_pbvc(O1, PbVC), + %% erlang_protobuffs encodes as 1/0/undefined + W = decode_quorum(W0), + DW = decode_quorum(DW0), + PW = decode_quorum(PW0), + NodeConfirms = decode_quorum(NodeConfirms0), + BodyOptions = + case ReturnBody of + 1 -> [returnbody]; + true -> [returnbody]; + _ -> + case ReturnHead of + true -> [returnbody]; + _ -> [] + end + end, + Opts = + CondPutOpts ++ + BodyOptions ++ + make_options( + [{w, W}, {dw, DW}, {pw, PW}, + {node_confirms, NodeConfirms}, + {timeout, Timeout}, {asis, AsIs}, + {n_val, N_val}, + {sloppy_quorum, SloppyQuorum}]), + PutRsp = + case SessionToken of + none -> + riak_client:put(O, Opts, C); + _ -> + riak_kv_token_session:session_use( + SessionToken, put, [O, Opts] + ) + end, + riak_kv_token_session:session_release(SessionToken), + case PutRsp of + ok when is_binary(ReturnKey) -> + PutResp = #rpbputresp{key = ReturnKey}, + {reply, PutResp, State}; + ok -> + {reply, #rpbputresp{}, State}; + {ok, Obj} -> + Contents = riak_object:get_contents(Obj), + PbContents = + case ReturnHead of + true -> + %% Remove all the 'value' fields from the + %% contents. This is a rough equivalent of + %% a REST HEAD request + BlankContents = + [{MD, <<>>} || {MD, _} <- Contents], + riak_pb_kv_codec:encode_contents( + BlankContents); + _ -> + riak_pb_kv_codec:encode_contents(Contents) + end, + PutResp = + #rpbputresp{ + content = PbContents, + vclock = pbify_rpbvc(riak_object:vclock(Obj)), + key = ReturnKey}, + {reply, PutResp, State}; + {error, notfound} -> + {reply, #rpbputresp{}, State}; + {error, CondRsp} + when + CondRsp == "match_found"; + CondRsp == "modified"; + CondRsp == "notfound" -> + %% Conditional PUTs which fail at the PUT_FSM + %% Otherwise formatting will be different from condition + %% failing at the API + {error, CondRsp, State}; + {error, PutError} -> + {error, {format, PutError}, State} + end end; process(#rpbdelreq{bucket=B0, type=T, key=K, vclock=PbVc, @@ -433,6 +482,7 @@ process_stream(_,_,State) -> %% Internal functions %% =================================================================== + -spec encode_nextgenrepl_response( intenal|internal_aaehash, #rpbfetchresp{}, @@ -599,4 +649,4 @@ new_connection(Options) -> Port = app_helper:get_env(riak_api, pb_port), gen_tcp:connect(Host, Port, [binary, {active, false},{nodelay, true}|Options]). --endif. +-endif. \ No newline at end of file diff --git a/src/riak_kv_put_fsm.erl b/src/riak_kv_put_fsm.erl index 88cd8455a..1d89f08dd 100644 --- a/src/riak_kv_put_fsm.erl +++ b/src/riak_kv_put_fsm.erl @@ -27,7 +27,6 @@ -include_lib("eunit/include/eunit.hrl"). -endif. -include_lib("riak_kv_vnode.hrl"). --include("riak_kv_wm_raw.hrl"). -include("riak_kv_types.hrl"). -compile({nowarn_deprecated_function, @@ -49,6 +48,7 @@ waiting_local_vnode/2, waiting_remote_vnode/2, postcommit/2, finish/2]). +-export([conditional_check/3]). -include_lib("kernel/include/logger.hrl"). @@ -113,7 +113,6 @@ dw :: non_neg_integer() | undefined, pw :: non_neg_integer() | undefined, node_confirms :: non_neg_integer() | undefined, - sync_on_write :: atom(), coord_pl_entry :: {integer(), atom()} | undefined, preflist2 :: riak_core_apl:preflist_ann() | undefined, bkey :: {riak_object:bucket(), riak_object:key()}, @@ -123,13 +122,11 @@ timeout = infinity :: pos_integer()|infinity, tref :: reference() | undefined, vnode_options=[] :: list(), - returnbody = false :: boolean(), allowmult = true :: boolean(), precommit=[] :: list(), postcommit=[] :: list(), bucket_props :: list() | undefined, putcore :: riak_kv_put_core:putcore() | undefined, - put_usecs :: undefined | non_neg_integer(), timing = [] :: [{atom(), {non_neg_integer(), non_neg_integer(), non_neg_integer()}}], reply, % reply sent to client, @@ -317,7 +314,37 @@ prepare(timeout, State = #state{robj = RObj, options=Options}) -> BucketProps = get_bucket_props(Bucket), StatTracked = get_option(stat_tracked, BucketProps, false), N = get_n_val(Options, BucketProps), - get_preflist(N, State#state{tracked_bucket=StatTracked, bucket_props=BucketProps}). + ConditionCheck = get_option(condition_check, Options, false), + CheckR = + case ConditionCheck of + false -> + ok; + {NotMod, NoneMatch, GetOpts} -> + Key = riak_object:key(RObj), + {GetCheck, _PutOpts} = + riak_kv_put_fsm:conditional_check( + riak_client:get( + Bucket, + Key, + GetOpts, + riak_client:new(node(), condition_check) + ), + NotMod, + NoneMatch + ), + GetCheck + end, + case CheckR of + ok -> + get_preflist( + N, + State#state{ + tracked_bucket=StatTracked, bucket_props=BucketProps + } + ); + Error -> + process_reply(Error, State) + end. %% @private validate(timeout, StateData0 = #state{from = {raw, ReqId, _Pid}, @@ -1282,7 +1309,26 @@ get_soft_limit_option(Options) -> get_option(mbox_check, Options, SoftLimitSupported), SoftLimitedWanted. - +-spec conditional_check( + {ok, riak_object:riak_object()}|{error, term()}, + {boolean()|undefined, vclock:vclock()}, + boolean()|undefined) -> {ok|{error, term()}, list()}. +conditional_check({ok, _}, _NotMod, NoneMatch) when NoneMatch -> + {{error, "match_found"}, []}; +conditional_check({ok, PreFetchO}, {NotMod, InClock}, _NoneMatch) when NotMod -> + CurrClock = riak_object:vclock(PreFetchO), + case vclock:equal(InClock, CurrClock) of + true -> + {ok, [{if_not_modified, InClock}]}; + _ -> + {{error, "modified"}, []} + end; +conditional_check({error, notfound}, _NotMod, NoneMatch) when NoneMatch -> + {ok, [{if_none_match, true}]}; +conditional_check({error, notfound}, {NotMod, _}, _NoneMatch) when NotMod -> + {{error, "notfound"}, []}; +conditional_check({error, PreFetchError}, _NotMod, _NoneMatch) -> + {{error, {format, PreFetchError}}, []}. %% @private the local node is not in the preflist, or is overloaded, %% forward to another node diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index 0819ba271..ed23dbc3c 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -270,6 +270,31 @@ do_update({index_fsm_time, Microsecs, ResultCount}) -> ok = exometer:update([P, ?APP, index, fsm, complete], 1), ok = exometer:update([P, ?APP, index, fsm, results], ResultCount), ok = exometer:update([P, ?APP, index, fsm, time], Microsecs); +do_update({token_session_time, Microsecs}) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, complete], 1), + ok = exometer:update([P, ?APP, token, session, duration], Microsecs); +do_update(token_session_timeout) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, timeout], 1); +do_update(token_session_refusal) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, refusal], 1); +do_update(token_session_unreachable) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, unreachable], 1); +do_update(token_session_request_timeout) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, request_timeout], 1); +do_update(token_session_preflist_short) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, preflist_short], 1); +do_update(token_session_renewal) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, renewal], 1); +do_update(token_session_error) -> + P = ?PFX, + ok = exometer:update([P, ?APP, token, session, error], 1); do_update({read_repairs, Preflist}) -> ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1), do_repairs(Preflist); @@ -798,6 +823,18 @@ stats() -> {[list, fsm, create, error], spiral, [], [{one , list_fsm_create_error}, {count, list_fsm_create_error_total}]}, {[list, fsm, active], counter, [], [{value, list_fsm_active}]}, + + {[token, session, complete], spiral, [], [{one, token_session_complete}]}, + {[token, session, timeout], spiral, [], [{one, token_session_timeout}]}, + {[token, session, refusal], spiral, [], [{one, token_session_refusal}]}, + {[token, session, unreachable], spiral, [], [{one, token_session_unreachable}]}, + {[token, session, preflist_short], spiral, [], [{one, token_session_preflist_short}]}, + {[token, session, request_timeout], spiral, [], [{one, token_session_request_timeout}]}, + {[token, session, renewal], spiral, [], [{one, token_session_renewal}]}, + {[token, session, error], spiral, [], [{one, token_session_error}]}, + {[token, session, duration], histogram, [], [{mean, token_session_time_mean}, + {max, token_session_time_100}]}, + {[clusteraae, fsm, create], spiral, [], [{one, clusteraae_fsm_create}]}, {[clusteraae, fsm, create, error], spiral, [], [{one, clusteraae_fsm_create_error}]}, {[clusteraae, fsm, active], counter, [], [{value, clusteraae_fsm_active}]}, diff --git a/src/riak_kv_sup.erl b/src/riak_kv_sup.erl index 872937940..6b4014ef2 100644 --- a/src/riak_kv_sup.erl +++ b/src/riak_kv_sup.erl @@ -98,10 +98,14 @@ init([]) -> {riak_kv_reader, start_link, []}, permanent, 30000, worker, [riak_kv_reader]}, + TokenManager = {riak_kv_token_manager, + {riak_kv_token_manager, start_link, []}, + permanent, 3000, worker, [riak_kv_token_manager]}, + EnsemblesKV = {riak_kv_ensembles, {riak_kv_ensembles, start_link, []}, permanent, 30000, worker, [riak_kv_ensembles]}, - + % Figure out which processes we should run... HasStorageBackend = (app_helper:get_env(riak_kv, storage_backend) /= undefined), @@ -115,6 +119,7 @@ init([]) -> Reaper, Eraser, Reader, + TokenManager, ?IF(HasStorageBackend, VMaster, []), FastPutSup, DeleteSup, diff --git a/src/riak_kv_token_manager.erl b/src/riak_kv_token_manager.erl new file mode 100644 index 000000000..ff584d054 --- /dev/null +++ b/src/riak_kv_token_manager.erl @@ -0,0 +1,1010 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_token_manager: Process for granting access to tokens +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Grant a requests for token, when no other request is active. +%% +%% Every node in a Riak cluster is expecetd to have one, and only one +%% riak_kv_token_manager. Client actions requiring tokens, may start a +%% riak_kv_token_session process on a node, and request a token from the local +%% riak_kv_token_manager. If a token is granted, the session may then be used +%% to marhsall riak_client functions, under the "ownership" of that token. +%% +%% A riak_kv_token_session process will cast a request for the grant of a given +%% token, and await an async receive of either a `granted` or `refused` message +%% in response. +%% +%% Tokens may be a {Bucket, Key} pair, a {{Type, Bucket}, Key} tuple or a +%% {token, TokenName} tuple where TokenName is any binary name. +%% +%% The session process should monitor the token_manager to which it makes a +%% request, and fail itself on the failure of that token_manager. Requests +%% should only be made from local session (i.e. a remote process should start +%% a local session to make the request). The session process should close as +%% soon as the token is no longer required, or otherwise on a timeout. The +%% token manager will monitor the session pid and release the grant on closure +%% of the session for any reason. Sessions which never end, will never have +%% their grants released. +%% +%% A request may be granted immediately, if the token is available. If the +%% token is not available, then the response may be deferred and the request +%% queued, to be granted when the holding session is released (as well as any +%% session requests queued earlier). The queue if FIFO. +%% +%% A request may optionally pass a "verify list", a list of nodes with whom the +%% request should be confirmed before granting. Within riak the verify list is +%% aligned with the preflist, of the token key being requested (i.e. it is the +%% list of nodes associated with the vnodes in the preflist). If a verify list +%% is present those "downstream" token managers should confirm that they have +%% not granted the token before the `granted` message is returned (and record +%% the existence of the upstream grant in their map of grants). Confirmation +%% is achieved through the passing of async downstream check/reply messages. +%% +%% Not passing a consistent verify list for token requests will increase the +%% number of failure scenarios where duplicate tokens could be concurrently +%% granted within the cluster - but reduce the latency and cost of granting a +%% token. +%% +%% The grant may be refused immediately, rather than queued, if a previous +%% token has been granted but to a different verify list. In this case, the +%% session should back-off and retry. Once any tokens requested under +%% different conditions have been released new tokens under the changed +%% conditions may be granted. A grant will always be refused immediately if +%% a grant for that token is active upstream. +%% +%% When a request is de-queued, it bypasses the wait to confirm downstream +%% nodes do not have a grant (given that they already confirmed for the grant +%% made at the head of the queue). A downstream renew is sent in the +%% background, rather than the downstream check message (and a renew message +%% does not require a reply). +%% +%% Each Token Manager should monitor every Token Manager from which it +%% receives a downstream check. If a remote token manager goes down, then all +%% notifications of upstream grants associated with that manager should be +%% cleared. +%% +%% The token manager monitors every local token session to which it grants a +%% token, and on receipt of a 'DOWN' notification, it will clear the grant (or +%% queued request) and association for that session - and inform the verify +%% list of managers using a downstream release message. +%% +%% This overall mechanism is to provide "stronger" but not "strong" +%% consistency. The aim is to have a sub-system whereby in healthy clusters +%% and in common failure scenarios tokens can be requested without conflict - +%% in the first case to allow for conditional logic on PUTs to detect conflict +%% with greater reliability. +%% +%% It is accepted that there will be partition scenarios, and scenarios where +%% rapid changes in up/down state where guarantees cannot be met. The intention +%% is that eventual consistency will be the fallback. The application/operator +%% may have to deal with siblings to protect against data loss - but the +%% frequency with which those siblings occur can be reduced through the use of +%% these tokens. +%% +%% The wnd-to-end system (Riak) is still intended to be a used as an eventually +%% consistent database. + +-module(riak_kv_token_manager). + +-behavior(gen_server). + +-include_lib("kernel/include/logger.hrl"). + +-export( + [ + start_link/0, + request_token/2, + associated/1, + associated/2, + stats/0, + grants/0 + ] +). + +-export( + [ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ] +). + +-ifdef(TEST). +-define(SWEEP_DELAY, 1000). +-define(ASSOCIATION_CHECK_TIMEOUT, 1000). +-else. + +-define(SWEEP_DELAY, 10000). +-define(ASSOCIATION_CHECK_TIMEOUT, 5000). + +-endif. + +-record(state, {grants = maps:new() :: grant_map(), + queues = maps:new() :: request_queues(), + associations = maps:new() :: #{session_pid() => token_id()}, + monitored_managers = [] :: list(manager_mon()), + last_sweep_grants = sets:new([{version, 2}]) :: + sets:set({token_id(), granted_session()})}). + +-type verify_count() :: non_neg_integer(). +-type granted_session() + :: {local, session_pid(), verify_list(), verify_count()}| + {upstream, upstream_ref()}. +-type grant_map() :: #{token_id() => granted_session()}. +-type request_queues() + :: #{token_id() => list({pid(), verify_list()})}. +-type token_id() :: {token, binary()}|{riak_object:bucket(), riak_object:key()}. +-type verify_list() :: [downstream_node()]. +-type session_pid() :: pid(). +-type manager_pid() :: pid(). +-type manager_mon() :: {manager_pid(), reference()}. +-type upstream_ref() :: {manager_pid(), session_pid()}. +-type downstream_node() :: node()|pid(). + % in tests will be a pid() not a node() + +-export_type([token_id/0, verify_list/0]). + +%%%============================================================================ +%%% API +%%%============================================================================ + +-spec start_link() -> {ok, manager_pid()} | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% @doc request_token/2 +%% Request a token (TokenID) from this node's Token Manager, requiring the +%% request to be verified in the nodes provided in the VerifyList. +%% +%% An async response message of either `granted` or `refused` will be returned. +%% Any session process making a request should monitor the riak_kv_token_manager +%% and terminate should the manager terminate. +%% +%% A token is released when the riak_kv_token_manager receives a 'DOWN' message +%% from a session process associated with a grant. +-spec request_token(token_id(), [node()]) -> ok. +request_token(TokenID, VerifyList) -> + gen_server:cast( + ?MODULE, {request, TokenID, VerifyList, self()}). + +%% @doc associated/1 +%% Confirms if this token manager has an association with the PID of a session. +%% Associations are kept for local sessions only (where this manager has +%% granted the token) +%% +%% Associated check used by a session process before calls to use the session +%% to confirm that the session is still active, and also by downstream token +%% managers to confirm that lingering upstream sessions are still active. +-spec associated(session_pid()) -> ok. +associated(SessionPid) -> + gen_server:cast(?MODULE, {associated, SessionPid, self()}). + +-spec associated(node(), session_pid()) -> ok. +associated(Node, SessionPid) -> + gen_server:cast({?MODULE, Node}, {associated, SessionPid, self()}). + +%%%============================================================================ +%%% API Remote - Downstream/Upstream messages between token managers +%%%============================================================================ + +%% @doc downstream_check/3 +%% Call to a remote node to confirm that it does not currently have a +%% information of a grant from another node. If it does have such a grant the +%% token request will be refused (return false). +%% +%% If there is no grant registered, or if the grant that is registered is from +%% the same remote_manager, then the downstream_check will pass and the grant +%% information will be updated +-spec downstream_check(downstream_node(), token_id(), pid()) -> ok. +downstream_check(TestPid, TokenID, SessionPid) when is_pid(TestPid) -> + gen_server:cast( + TestPid, {downstream_check, TokenID, {self(), SessionPid}}); +downstream_check(ToNode, TokenID, SessionPid) -> + gen_server:cast( + {?MODULE, ToNode}, {downstream_check, TokenID, {self(), SessionPid}}). + +%% @doc renew_downstream +%% If a token has been released, and a queued token is now to be granted the +%% granting is not blocked by the use of is_downstream_check/3, it is assumed +%% that a notification is present due to the original grant. +-spec downstream_renew(verify_list(), token_id(), pid()) -> ok. +downstream_renew([], _TokenID, _SessionPid) -> + ok; +downstream_renew([TestPid|Rest], TokenID, SessionPid) when is_pid(TestPid) -> + gen_server:cast( + TestPid, + {downstream_renew, TokenID, {self(), SessionPid}} + ), + downstream_renew(Rest, TokenID, SessionPid); +downstream_renew([N|Rest], TokenID, SessionPid) -> + gen_server:cast( + {?MODULE, N}, {downstream_renew, TokenID, {self(), SessionPid}}), + downstream_renew(Rest, TokenID, SessionPid). + +%% @doc release_downstream +%% If a token has been released, and there is no queued token, release the +%% downstream blocks. +-spec downstream_release(verify_list(), token_id(), pid()) -> ok. +downstream_release([], _TokenID, _SessionPid) -> + ok; +downstream_release([TestPid|Rest], TokenID, SessionPid) when is_pid(TestPid) -> + gen_server:cast( + TestPid, + {downstream_release, TokenID, {self(), SessionPid}} + ), + downstream_release(Rest, TokenID, SessionPid); +downstream_release([N|Rest], TokenID, SessionPid) -> + gen_server:cast( + {?MODULE, N}, {downstream_release, TokenID, {self(), SessionPid}}), + downstream_release(Rest, TokenID, SessionPid). + +%%%============================================================================ +%%% API - Operations (helper functions) +%%%============================================================================ + +%% @doc stats/0 +%% Return three counts - the count of grants, the count of queued requests and +%% the count of associations (whenever a grant is made or queued an association +%% is retain to map the PID to the token as a helper should the PID go down). +-spec stats() -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}. +stats() -> + gen_server:call(?MODULE, stats). + +%% @doc grants/0 +%% A map of the current grants that have bee made by this token_manager +-spec grants() -> grant_map(). +grants() -> + gen_server:call(?MODULE, grants). + +%%%============================================================================ +%%% Callback functions +%%%============================================================================ + +init(_Args) -> + erlang:send_after(?SWEEP_DELAY, self(), sweep), + {ok, #state{}}. + +handle_call(stats, _From, State) -> + Grants = maps:size(State#state.grants), + QueuedRequests = + maps:fold( + fun(_T, Q, Acc) -> length(Q) + Acc end, + 0, + State#state.queues + ), + Associations = maps:size(State#state.associations), + {reply, {Grants, QueuedRequests, Associations}, State}; +handle_call(grants, _From, State) -> + {reply, State#state.grants, State}. + +handle_cast({request, TokenID, VerifyList, Session}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + not_found -> + lists:foreach( + fun(N) -> downstream_check(N, TokenID, Session) end, + VerifyList + ), + UpdAssocs = + maps:put(Session, TokenID, State#state.associations), + _SidRef = monitor(process, Session), + UpdGrants = + maps:put( + TokenID, + {local, Session, VerifyList, 0}, + State#state.grants + ), + case VerifyList of + [] -> + Session ! granted; + _ -> + %% Need to wait for downstream replies + ok + end, + {noreply, + State#state{ + grants = UpdGrants, associations = UpdAssocs + } + }; + {local, _OtherSession, CurrentVerifyList, _VerifyCount} + when CurrentVerifyList == VerifyList -> + %% Can queue this session, it has the same verifylist as the + %% existing grant + UpdAssocs = + maps:put(Session, TokenID, State#state.associations), + _SidRef = monitor(process, Session), + TokenQueue = maps:get(TokenID, State#state.queues, []), + UpdQueue = + maps:put( + TokenID, + [{Session, VerifyList}|TokenQueue], + State#state.queues + ), + {noreply, + State#state{ + queues = UpdQueue, + associations = UpdAssocs + } + }; + _ -> + Session ! refused, + {noreply, State} + end; +handle_cast({downstream_renew, TokenID, {Manager, Session}}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + not_found -> + UpdGrants = + maps:put( + TokenID, + {upstream, {Manager, Session}}, + State#state.grants + ), + {noreply, + State#state{ + grants = UpdGrants, + monitored_managers = + monitor_manager( + Manager, + State#state.monitored_managers + ) + } + }; + {upstream, {CurrentManager, _OldSession}} + when Manager == CurrentManager -> + UpdGrants = + maps:put( + TokenID, + {upstream, {Manager, Session}}, + State#state.grants + ), + {noreply, State#state{grants = UpdGrants}}; + ExistingGrant -> + ?LOG_WARNING( + "Potential conflict ignored on token ~w between ~w and ~w", + [TokenID, ExistingGrant, {Manager, Session}] + ), + {noreply, State} + end; +handle_cast({downstream_release, TokenID, {Manager, Session}}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + not_found -> + {noreply, State}; + {upstream, {CurrentManager, _OldSession}} + when Manager == CurrentManager -> + UpdGrants = maps:remove(TokenID, State#state.grants), + {noreply, State#state{grants = UpdGrants}}; + ExistingGrant -> + ?LOG_WARNING( + "Potential conflict ignored on token ~w between ~w and ~w", + [TokenID, ExistingGrant, {Manager, Session}] + ), + {noreply, State} + end; +handle_cast({downstream_check, TokenID, {Manager, Session}}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + not_found -> + gen_server:cast( + Manager, {downstream_reply, TokenID, Session, true} + ), + UpdGrants = + maps:put( + TokenID, + {upstream, {Manager, Session}}, + State#state.grants + ), + {noreply, + State#state{ + grants = UpdGrants, + monitored_managers = + monitor_manager( + Manager, + State#state.monitored_managers + ) + } + }; + {upstream, {BlockingMgr, _PrevSession}} when BlockingMgr == Manager -> + %% Trust the upstream manager knows what they're doing. This is + %% likely to be as a result of message misordering + gen_server:cast( + Manager, {downstream_reply, TokenID, Session, true} + ), + UpdGrants = + maps:put( + TokenID, + {upstream, {Manager, Session}}, + State#state.grants + ), + {noreply, State#state{grants = UpdGrants}}; + _Block -> + gen_server:cast( + Manager, {downstream_reply, TokenID, Session, false} + ), + {noreply, State} + end; +handle_cast({downstream_reply, TokenID, Session, true}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + {local, ThisSession, VerifyList, VerifyCount} + when ThisSession == Session -> + case length(VerifyList) of + VL when VL == (VerifyCount + 1) -> + Session ! granted; + _ -> + ok + end, + {noreply, + State#state{ + grants = + maps:put( + TokenID, + {local, Session, VerifyList, VerifyCount + 1}, + State#state.grants + ) + } + }; + _ -> + {noreply, State} + end; +handle_cast({downstream_reply, TokenID, Session, false}, State) -> + case maps:get(TokenID, State#state.grants, not_found) of + {local, ThisSession, _VerifyList, _VerifyCount} + when Session == ThisSession -> + Session ! refused, + {noreply, + State#state{grants = maps:remove(TokenID, State#state.grants)} + }; + _ -> + {noreply, State} + end; +handle_cast({associated, SessionPid, FromPid}, State) -> + FromPid ! + {maps:is_key(SessionPid, State#state.associations), + {self(), SessionPid} + }, + {noreply, State}. + + +handle_info({'DOWN', Ref, process, Pid, Reason}, State) -> + case maps:take(Pid, State#state.associations) of + {TokenID, UpdAssocs} -> + %% An association exists, so this is assumed to be a session + %% process which has gone down. This might be in a queue, or + %% in receipt of a grant + Queues = + clear_session_from_queues( + State#state.queues, + TokenID, + Pid + ), + case maps:get(TokenID, State#state.grants, not_found) of + {local, ActivePid, VerifyList, _VerifyCount} + when ActivePid == Pid -> + %% There is a grant, is there a queued request for that + %% same token that we can now grant + case return_session_from_queues(Queues, TokenID) of + {none, UpdQueues} -> + ok = + downstream_release( + VerifyList, + TokenID, + Pid + ), + {noreply, + State#state{ + associations = UpdAssocs, + queues = UpdQueues, + grants = + maps:remove( + TokenID, + State#state.grants + ) + } + }; + {{NextSession, NextVerifyList}, UpdQueues} + when NextVerifyList == VerifyList -> + %% Only requests with the same VerifyList should be + %% queued + UpdGrants = + maps:put( + TokenID, + {local, NextSession, VerifyList, 0}, + State#state.grants + ), + NextSession ! granted, + %% Check that the downstream nodes are still aware + %% of this grant (in case, for example, they have + %% restarted in between) + ok = + downstream_renew( + VerifyList, + TokenID, + NextSession + ), + {noreply, + State#state{ + associations = UpdAssocs, + queues = UpdQueues, + grants = UpdGrants + } + } + end; + _ -> + %% The session may have been queued and not had a grant + %% It may be an upstream grant which has changed before the + %% 'DOWN' message was received + {noreply, + State#state{associations = UpdAssocs, queues = Queues} + } + end; + _ -> + case lists:member({Pid, Ref}, State#state.monitored_managers) of + true -> + ?LOG_WARNING( + "Remote Token Manager ~w reported down due to ~p", + [Pid, Reason] + ), + FilterFun = + fun({_Tid, G}) -> + case G of + {upstream, {UpstreamPid, _Session}} + when UpstreamPid == Pid -> + false; + _ -> + true + end + end, + UpdGrants = + maps:from_list( + lists:filter( + FilterFun, + maps:to_list(State#state.grants) + ) + ), + UpdMonitors = + lists:delete( + {Pid, Ref}, + State#state.monitored_managers + ), + {noreply, + State#state{ + grants = UpdGrants, + monitored_managers = UpdMonitors + } + }; + false -> + ?LOG_INFO( + "Session ~w cleared for ~w but not present", + [Pid, Reason] + ), + {noreply, State} + end + end; +handle_info(sweep, State) -> + erlang:send_after( + rand:uniform(?SWEEP_DELAY) + ?SWEEP_DELAY div 2, + self(), + sweep + ), + LastSweep = State#state.last_sweep_grants, + ThisSweep = + sets:from_list(maps:to_list(State#state.grants), [{version, 2}]), + StillPresent = sets:to_list(sets:intersection(LastSweep, ThisSweep)), + check_active(StillPresent, self()), + {noreply, State#state{last_sweep_grants = ThisSweep}}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +-spec clear_session_from_queues( + request_queues(), token_id(), pid()) -> request_queues(). +clear_session_from_queues(Queues, TokenID, Session) -> + Queue = maps:get(TokenID, Queues, []), + case lists:filter(fun({QS, _VL}) -> QS =/= Session end, Queue) of + [] -> + maps:remove(TokenID, Queues); + UpdTQueue -> + maps:put(TokenID, UpdTQueue, Queues) + end. + +-spec return_session_from_queues( + request_queues(), token_id()) -> + {{pid(), verify_list()}|none, request_queues()}. +return_session_from_queues(Queues, TokenID) -> + case maps:get(TokenID, Queues, []) of + [] -> + {none, Queues}; + QueuedRequests -> + case lists:reverse(QueuedRequests) of + [{NextSession, VerifyList}] -> + {{NextSession, VerifyList}, maps:remove(TokenID, Queues)}; + [{NextSession, VerifyList}|QueueRem] -> + {{NextSession, VerifyList}, + maps:put(TokenID, lists:reverse(QueueRem), Queues)} + end + end. + + +-spec monitor_manager( + manager_pid(), list(manager_mon())) -> list(manager_mon()). +monitor_manager(Manager, MonitoredManagers) -> + case lists:keymember(Manager, 1, MonitoredManagers) of + true -> + MonitoredManagers; + false -> + MgrRef = monitor(process, Manager), + [{Manager, MgrRef}|MonitoredManagers] + end. + +-spec check_active(list({token_id(), granted_session()}), pid()) -> ok. +check_active([], _Mgr) -> + ok; +check_active([{_TokenID, {local, Session, _VL, _VC}}|Rest], Mgr) -> + case is_process_alive(Session) of + true -> + check_active(Rest, Mgr); + false -> + _NewRef = monitor(process, Session), + check_active(Rest, Mgr) + end; +check_active([{TokenID, {upstream, {UpstreamMgr, UpstreamSess}}}|Rest], Mgr) -> + _P = check_upstream_async(TokenID, UpstreamMgr, UpstreamSess, Mgr), + check_active(Rest, Mgr). + + +-spec check_upstream_async( + token_id(), manager_pid(), session_pid(), manager_pid()) -> pid(). +check_upstream_async(TokenID, RemoteManager, Session, Mgr) -> + F = fun() -> check_upstream(TokenID, RemoteManager, Session, Mgr) end, + spawn(F). + +-spec check_upstream( + token_id(), manager_pid(), session_pid(), manager_pid()) -> ok. +check_upstream(TokenID, RemoteManager, Session, Mgr) -> + RemoteNode = node(RemoteManager), + {UpstreamAssociated, Reason} = + case RemoteNode of + nonode@nohost -> + {false, nonode@nohost}; + _ -> + associated(RemoteNode, Session), + receive + Reply -> + Reply + after + ?ASSOCIATION_CHECK_TIMEOUT -> + {false, timeout} + end + end, + case UpstreamAssociated of + true -> + ok; + false -> + ?LOG_WARNING( + "Upstream association check to ~w prompted release " + "of TokenID ~w due to reason ~p", + [RemoteNode, TokenID, Reason] + ), + gen_server:cast( + Mgr, + {downstream_release, TokenID, {RemoteManager, Session}} + ) + end. + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +gc_test_() -> + {timeout, 60, fun gc_tester/0}. + +gc_tester() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + {ok, Mgr2} = gen_server:start(?MODULE, [], []), + {ok, Mgr3} = gen_server:start(?MODULE, [], []), + Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]), + Req2 = requestor_fun(self(), <<"T2">>, Mgr2, [Mgr3]), + S1 = spawn(Req1), + receive Gr1 -> ?assertMatch({granted, S1}, Gr1) end, + S2 = spawn(Req2), + receive Gr2 -> ?assertMatch({granted, S2}, Gr2) end, + S3 = spawn(Req2), + F1 = fun() -> {1, 0, 1} == gen_server:call(Mgr1, stats) end, + F2 = fun() -> {2, 1, 2} == gen_server:call(Mgr2, stats) end, + F3 = fun() -> {2, 0, 0} == gen_server:call(Mgr3, stats) end, + wait_until(F1, 10, 1), + wait_until(F2, 10, 1), + wait_until(F3, 10, 1), + + timer:sleep(?SWEEP_DELAY + 1), % First sweep at fixed time + State1 = sys:get_state(Mgr1), + ?assertMatch(1, sets:size(State1#state.last_sweep_grants)), + State2 = sys:get_state(Mgr2), + ?assertMatch(2, sets:size(State2#state.last_sweep_grants)), + State3 = sys:get_state(Mgr3), + ?assertMatch(2, sets:size(State3#state.last_sweep_grants)), + ?assert(F1()), + ?assert(F2()), + ?assert(F3()), + + Mgr1 ! {'DOWN', self(), process, S1, inactive}, + F1A = fun() -> {0, 0, 0} == gen_server:call(Mgr1, stats) end, + F2A = fun() -> {1, 1, 2} == gen_server:call(Mgr2, stats) end, + F3A = fun() -> {1, 0, 0} == gen_server:call(Mgr3, stats) end, + wait_until(F1A, 10, 1), + wait_until(F2A, 10, 1), + wait_until(F3A, 10, 1), + S1 ! terminate, + F1A(), + F2A(), + F3A(), + Mgr2 ! {'DOWN', self(), process, S2, inactive}, + receive Gr3 -> ?assertMatch({granted, S3}, Gr3) end, + F2B = fun() -> {1, 0, 1} == gen_server:call(Mgr2, stats) end, + F3B = fun() -> {1, 0, 0} == gen_server:call(Mgr3, stats) end, + wait_until(F2B, 10, 1), + wait_until(F3B, 10, 1), + S2 ! terminate, + ?assert(receive _ -> false after 1 -> true end), + F2B(), + F3B(), + S3 ! terminate, + ?assert(receive _ -> false after 1 -> true end), + + timer:sleep(?SWEEP_DELAY + ?SWEEP_DELAY div 2), % should be nothing to sweep + StateZ1 = sys:get_state(Mgr1), + ?assertMatch(0, sets:size(StateZ1#state.last_sweep_grants)), + StateZ2 = sys:get_state(Mgr2), + ?assertMatch(0, sets:size(StateZ2#state.last_sweep_grants)), + StateZ3 = sys:get_state(Mgr3), + ?assertMatch(0, sets:size(StateZ3#state.last_sweep_grants)), + + SA1 = spawn(Req1), + receive GrA1 -> ?assertMatch({granted, SA1}, GrA1) end, + SA2 = spawn(Req2), + receive GrA2 -> ?assertMatch({granted, SA2}, GrA2) end, + SA3 = spawn(Req2), + wait_until(F1, 10, 1), + wait_until(F2, 10, 1), + wait_until(F3, 10, 1), + + timer:sleep(3 * ?SWEEP_DELAY), + % Should prompt sweep, and remove remotes as nonode@nohost + ?assertMatch({1, 0, 1}, gen_server:call(Mgr1, stats)), + ?assertMatch({1, 1, 2}, gen_server:call(Mgr2, stats)), + ?assertMatch({0, 0, 0}, gen_server:call(Mgr3, stats)), + timer:sleep(3 * ?SWEEP_DELAY), + % PRevious grants should remain stable + StateX1 = sys:get_state(Mgr1), + ?assertMatch(1, sets:size(StateX1#state.last_sweep_grants)), + StateX2 = sys:get_state(Mgr2), + ?assertMatch(1, sets:size(StateX2#state.last_sweep_grants)), + StateX3 = sys:get_state(Mgr3), + ?assertMatch(0, sets:size(StateX3#state.last_sweep_grants)), + ?assertMatch({1, 0, 1}, gen_server:call(Mgr1, stats)), + ?assertMatch({1, 1, 2}, gen_server:call(Mgr2, stats)), + ?assertMatch({0, 0, 0}, gen_server:call(Mgr3, stats)), + + SA1 ! terminate, + SA2 ! terminate, + receive GrA3 -> ?assertMatch({granted, SA3}, GrA3) end, + SA3 ! terminate, + ?assert(receive _ -> false after 1 -> true end), + + gen_server:stop(Mgr1), + gen_server:stop(Mgr2), + gen_server:stop(Mgr3). + +already_dead_test() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + DeadPid1 = spawn(fun() -> ok end), + DeadPid2 = spawn(fun() -> ok end), + T1 = <<"T1">>, + VL = [], + ok = gen_server:cast(Mgr1, {request, T1, VL, DeadPid1}), + ok = gen_server:cast(Mgr1, {request, T1, VL, DeadPid2}), + F = fun() -> {0, 0, 0} == gen_server:call(Mgr1, stats) end, + ?assert(wait_until(F, ?SWEEP_DELAY div 2, 1)), + gen_server:stop(Mgr1). + + +wait_until(F, Wait, _RetrySleep) when Wait =< 0 -> + F(); +wait_until(F, Wait, RetrySleep) -> + timer:sleep(RetrySleep), + case F() of + true -> true; + false -> wait_until(F, Wait - RetrySleep, RetrySleep) + end. + +manager_simple_test() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + {ok, Mgr2} = gen_server:start(?MODULE, [], []), + {ok, Mgr3} = gen_server:start(?MODULE, [], []), + Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]), + Req2 = requestor_fun(self(), <<"T1">>, Mgr2, [Mgr3]), + S1 = spawn(Req1), + receive Gr1 -> ?assertMatch({granted, S1}, Gr1) end, + S2 = spawn(Req1), + timer:sleep(1), + % Make sure first request received + % Avoid race over spawn time + S3 = spawn(Req1), + S4 = spawn(Req2), + receive Rf1 -> ?assertMatch({refused, S4}, Rf1) end, + S1 ! terminate, + receive Gr2 -> ?assertMatch({granted, S2}, Gr2) end, + S2 ! terminate, + receive Gr3 -> ?assertMatch({granted, S3}, Gr3) end, + S3 ! terminate, + ?assert(receive _ -> false after 10 -> true end), + gen_server:stop(Mgr1), + gen_server:stop(Mgr2), + gen_server:stop(Mgr3). + + +manager_multitoken_test() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + {ok, Mgr2} = gen_server:start(?MODULE, [], []), + {ok, Mgr3} = gen_server:start(?MODULE, [], []), + Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]), + Req2 = requestor_fun(self(), <<"T2">>, Mgr2, [Mgr1, Mgr3]), + Req3 = requestor_fun(self(), <<"T3">>, Mgr2, [Mgr3, Mgr1]), + Req4 = requestor_fun(self(), <<"T4">>, Mgr3, [Mgr1, Mgr2]), + S1R1 = spawn(Req1), + S2R1 = spawn(Req2), + S3R1 = spawn(Req3), + S4R1 = spawn(Req4), + timer:sleep(1), + % Make sure first round of requests received + % Avoid race over spawn time + S1R2 = spawn(Req1), + S2R2 = spawn(Req2), + S3R2 = spawn(Req3), + S4R2 = spawn(Req4), + ok = receive {granted, S1R1} -> ok end, + ok = receive {granted, S2R1} -> ok end, + ok = receive {granted, S3R1} -> ok end, + ok = receive {granted, S4R1} -> ok end, + S1R1 ! terminate, + receive Gr1 -> ?assertMatch({granted, S1R2}, Gr1) end, + S4R1 ! terminate, + receive Gr2 -> ?assertMatch({granted, S4R2}, Gr2) end, + S3R3 = spawn(Req3), + S3R2 ! terminate, + S2R1 ! terminate, + receive Gr3 -> ?assertMatch({granted, S2R2}, Gr3) end, + S3R1 ! terminate, + receive Gr4 -> ?assertMatch({granted, S3R3}, Gr4) end, + ?assert(receive _ -> false after 10 -> true end), + ?assertMatch({4, 0, 1}, gen_server:call(Mgr1, stats)), + ?assertMatch({4, 0, 2}, gen_server:call(Mgr2, stats)), + ?assertMatch({4, 0, 1}, gen_server:call(Mgr3, stats)), + S1R2 ! terminate, + S4R2 ! terminate, + S2R2 ! terminate, + S3R3 ! terminate, + ?assert(receive _ -> false after 10 -> true end), + % This also allows time for Mgrs to receive 'DOWN' + ?assertMatch({0, 0, 0}, gen_server:call(Mgr1, stats)), + ?assertMatch({0, 0, 0}, gen_server:call(Mgr2, stats)), + ?assertMatch({0, 0, 0}, gen_server:call(Mgr3, stats)), + ?assertMatch(#{}, gen_server:call(Mgr1, grants)), + ?assertMatch(#{}, gen_server:call(Mgr2, grants)), + ?assertMatch(#{}, gen_server:call(Mgr3, grants)), + gen_server:stop(Mgr1), + gen_server:stop(Mgr2), + gen_server:stop(Mgr3). + + +manager_downstream_failure_test() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + {ok, Mgr2} = gen_server:start(?MODULE, [], []), + {ok, Mgr3} = gen_server:start(?MODULE, [], []), + Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]), + S1 = spawn(Req1), + receive Gr1 -> ?assertMatch({granted, S1}, Gr1) end, + S2 = spawn(Req1), + timer:sleep(1), + % Make sure first request received + % Avoid race over spawn time + S3 = spawn(Req1), + ok = gen_server:stop(Mgr2), + {ok, Mgr2A} = gen_server:start(?MODULE, [], []), + ?assert(is_process_alive(S1)), + Req2 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2A, Mgr3]), + % Will not queue a request if the verify list has changed + S4 = spawn(Req2), + receive Rf1 -> ?assertMatch({refused, S4}, Rf1) end, + S1 ! terminate, + receive Gr2 -> ?assertMatch({granted, S2}, Gr2) end, + S2 ! terminate, + receive Gr3 -> ?assertMatch({granted, S3}, Gr3) end, + S3 ! terminate, + ?assert(receive _ -> false after 10 -> true end), + % This also allows time for Mgrs to receive 'DOWN' + S5 = spawn(Req2), + receive Gr4 -> ?assertMatch({granted, S5}, Gr4) end, + S5 ! terminate, + ?assert(receive _ -> false after 10 -> true end), + gen_server:stop(Mgr1), + gen_server:stop(Mgr2A), + gen_server:stop(Mgr3). + +manager_primary_failure_test() -> + {ok, Mgr1} = gen_server:start(?MODULE, [], []), + {ok, Mgr2} = gen_server:start(?MODULE, [], []), + {ok, Mgr3} = gen_server:start(?MODULE, [], []), + Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]), + S1 = spawn(Req1), + receive Gr1 -> ?assertMatch({granted, S1}, Gr1) end, + S2 = spawn(Req1), + S3 = spawn(Req1), + gen_server:stop(Mgr1), + Req2 = requestor_fun(self(), <<"T1">>, Mgr2, [Mgr3]), + S4 = spawn(Req2), + receive Gr2 -> ?assertMatch({granted, S4}, Gr2) end, + {ok, Mgr1A} = gen_server:start(?MODULE, [], []), + Req3 = requestor_fun(self(), <<"T1">>, Mgr1A, [Mgr2, Mgr3]), + S5 = spawn(Req3), + receive Rf3 -> ?assertMatch({refused, S5}, Rf3) end, + S4 ! terminate, + ?assert(receive _ -> false after 10 -> true end), + % This also allows time for Mgrs to receive 'DOWN' + S6 = spawn(Req3), + receive Gr4 -> ?assertMatch({granted, S6}, Gr4) end, + lists:foreach(fun(P) -> P ! terminate end, [S1, S2, S3, S6]), + gen_server:stop(Mgr1A), + gen_server:stop(Mgr2), + gen_server:stop(Mgr3). + + +requestor_fun(ReturnPid, Token, Mgr, VerifyList) -> + fun() -> + gen_server:cast(Mgr, {request, Token, VerifyList, self()}), + requestor_receive_loop(ReturnPid) + end. + + +requestor_receive_loop(ReturnPid) -> + receive + granted -> + ReturnPid ! {granted, self()}, + requestor_receive_loop(ReturnPid); + refused -> + ReturnPid ! {refused, self()}, + requestor_receive_loop(ReturnPid); + _ -> + ok + end. + +-endif. \ No newline at end of file diff --git a/src/riak_kv_token_session.erl b/src/riak_kv_token_session.erl new file mode 100644 index 000000000..9f2c97c4e --- /dev/null +++ b/src/riak_kv_token_session.erl @@ -0,0 +1,478 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_token_session: +%% Process for managing a session associated with a token +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Receive a session request, and attempt to gain an exclusive token for +%% that session. If one has been gained, allow for riak_client functions to +%% be managed via the session + +-module(riak_kv_token_session). + +-behavior(gen_server). + +-include_lib("kernel/include/logger.hrl"). + +-export( + [ + session_request/4, + session_request_retry/1, + session_use/3, + session_release/1, + session_renew/1 + ] +). + +-export( + [ + session_local_request/4, + session_local_use/4, + session_local_release/2, + session_local_renew/2 + ] +). + +-export( + [ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ] +). + +-define(TOKEN_REQUEST_TIMEOUT, 12000). +-define(MINIMUM_SINGLE_REQUEST_TIMEOUT, 500). +-define(TOKEN_SESSION_TIMEOUT, 30000). +-define(TOKEN_RETRY_COUNT, 12). + +-record(state, {token_id :: token_id(), + token_timeout :: pos_integer(), + client :: riak_client:riak_client()|undefined, + session_id :: session_id()|undefined, + start_time = os:system_time(microsecond) :: pos_integer()}). + +-type token_id() :: riak_kv_token_manager:token_id(). +-type verify_list() :: riak_kv_token_manager:verify_list(). +-type session_ref() :: binary(). +-type session_id() :: non_neg_integer(). +-type timeout_ms() :: pos_integer(). +-type erpc_session_error() :: + {error, session_noconnection}|{error, session_remote_exit}. +-type token_request_mode() :: head_only|small_consensus|large_consensus. + +-export_type([session_ref/0]). + +%%%============================================================================ +%%% External API +%%%============================================================================ + +-spec session_request_retry( + token_id()) -> {true, session_ref()}|{false, none}|erpc_session_error(). +session_request_retry(TokenID) -> + TokenRequestMode = + application:get_env( + riak_kv, + token_request_mode, + primary_consensus + ), + session_request_retry( + TokenID, + TokenRequestMode, + ?TOKEN_REQUEST_TIMEOUT, + ?TOKEN_SESSION_TIMEOUT, + ?TOKEN_RETRY_COUNT + ). + +-spec session_request_retry( + token_id(), + token_request_mode(), + timeout_ms(), + timeout_ms(), + pos_integer()) -> {true, session_ref()}|{false, none}|erpc_session_error(). +session_request_retry(TokenID, TRM, RequestTimeout, TokenTimeout, Retry) -> + session_request_retry( + TokenID, TRM, RequestTimeout, TokenTimeout, Retry, 0 + ). + +session_request_retry(_TokenID, _TRM, 0, _TokenTO, _Retry, _Attempts) -> + {false, none}; +session_request_retry(TokenID, TRM, RequestTO, TokenTO, Retry, Retry) -> + session_request(TokenID, TRM, RequestTO, TokenTO); +session_request_retry(TokenID, TRM, RequestTO, TokenTO, Retry, Attempts) -> + RT0 = os:system_time(millisecond), + NextTimeOut = max(?MINIMUM_SINGLE_REQUEST_TIMEOUT, RequestTO div 4), + case session_request(TokenID, TRM, NextTimeOut, TokenTO) of + {true, SessionRef} -> + {true, SessionRef}; + _Error -> + timer:sleep( + rand:uniform( + max( + ?MINIMUM_SINGLE_REQUEST_TIMEOUT, + RequestTO div (Retry - Attempts) + ) + ) + ), + session_request_retry( + TokenID, + TRM, + max(0, RequestTO + (RT0 - os:system_time(millisecond))), + TokenTO, + Retry, + Attempts + 1 + ) + end. + +-spec session_request( + token_id(), + token_request_mode(), + timeout_ms(), + timeout_ms()) -> {true, session_ref()}|{false, none}|erpc_session_error(). +session_request(TokenID, TRM, RequestTimeout, TokenTimeout) -> + DocIdx = chash_key(TokenID), + {TargetNodeCount, MinimumNodeCount, Partitions} = + case TRM of + head_only -> + {1, 1, riak_core_apl:get_primary_apl(DocIdx, 3, riak_kv)}; + basic_consensus -> + { + 3, + 1, + riak_core_apl:get_apl_ann( + DocIdx, 3, riak_core_node_watcher:nodes(riak_kv)) + }; + primary_consensus -> + {3, 3, riak_core_apl:get_primary_apl(DocIdx, 5, riak_kv)} + end, + PrimNodes = + lists:sublist( + uniq(lists:map(fun({{_Idx, N}, _}) -> N end, Partitions)), + TargetNodeCount + ), + case PrimNodes of + [Head|VerifyList] when length(PrimNodes) >= MinimumNodeCount -> + case Head of + ThisNode when ThisNode == node() -> + session_local_request( + TokenID, VerifyList, RequestTimeout, TokenTimeout); + _ -> + try + erpc:call( + Head, + ?MODULE, + session_local_request, + [TokenID, VerifyList, RequestTimeout, TokenTimeout] + ) + catch + error:{erpc,noconnection}:_ -> + ?LOG_WARNING( + "Connection error" + " accessing token_manager on ~w", + [Head]), + riak_kv_stat:update(token_session_unreachable), + {false, none} + end + end; + _ -> + ok = riak_kv_stat:update(token_session_preflist_short), + {false, none} + end. + +-spec session_use(session_ref(), atom(), list()) -> any(). +session_use(SessionReference, FuncName, Args) -> + {N, P, ID} = decode_session_reference(SessionReference), + case node() of + ThisNode when ThisNode == N -> + session_local_use(P, FuncName, Args, ID); + _ -> + safe_erpc(N, ?MODULE, session_local_use, [P, FuncName, Args, ID]) + end. + +-spec session_release(session_ref()|none) -> ok|erpc_session_error(). +session_release(none) -> + ok; +session_release(SessionReference) -> + {N, P, ID} = decode_session_reference(SessionReference), + case node() of + ThisNode when ThisNode == N -> + session_local_release(P, ID); + _ -> + safe_erpc(N, ?MODULE, session_local_release, [P, ID]) + end. + + +-spec session_renew(session_ref()) -> ok|erpc_session_error(). +session_renew(SessionReference) -> + {N, P, ID} = decode_session_reference(SessionReference), + case node() of + ThisNode when ThisNode == N -> + session_local_renew(P, ID); + _ -> + safe_erpc(N, ?MODULE, session_local_renew, [P, ID]) + end. + +%%%============================================================================ +%%% Local API +%%%============================================================================ + +-spec safe_erpc(node(), atom(), atom(), list()) -> any(). +safe_erpc(Node, Module, Function, Args) -> + try + erpc:call(Node, Module, Function, Args) + catch + error:{erpc, noconnection}:_ -> + {error, session_noconnection}; + exit:_:_ -> + {error, session_remote_exit} + end. + +-spec session_local_request( + token_id(), verify_list(), timeout_ms(), timeout_ms() + ) -> {boolean(), session_ref()}. +session_local_request(TokenID, VerifyList, RequestTimeout, TokenTimeout) -> + {ok, Session} = + gen_server:start(?MODULE, [{TokenID, TokenTimeout}], []), + gen_server:call(Session, {request, RequestTimeout, VerifyList}, infinity). + +-spec session_local_use(pid(), atom(), list(), session_id()) -> any(). +session_local_use(Pid, FuncName, Args, ID) -> + ok = riak_kv_token_manager:associated(Pid), + receive + {true, _Upstream} -> + gen_server:call( + Pid, + {use_session, FuncName, Args, ID}, + infinity); + _ -> + {error, session_down} + after + ?TOKEN_SESSION_TIMEOUT -> + {error, session_down} + end. + +-spec session_local_release(pid(), session_id()) -> ok. +session_local_release(Pid, ID) -> + case is_process_alive(Pid) of + true -> + gen_server:call(Pid, {release, ID}, infinity); + false -> + ok + end. + +-spec session_local_renew(pid(), session_id()) -> ok. +session_local_renew(Pid, ID) -> + gen_server:call(Pid, {renew, ID}, infinity). + +%%%============================================================================ +%%% Callback functions +%%%============================================================================ + +init([{TokenID, TokenTimeout}]) -> + _Ref = erlang:monitor(process, whereis(riak_kv_token_manager)), + {ok, #state{token_id = TokenID, token_timeout = TokenTimeout}}. + +handle_call({request, RequestTimeout, VerifyList}, _From, State) -> + ok = riak_kv_token_manager:request_token(State#state.token_id, VerifyList), + receive + granted -> + <> = crypto:strong_rand_bytes(4), + SessionRef = encode_session_reference(SessionID), + Client = riak_client:new(node(), State#state.token_id), + {reply, + {true, SessionRef}, + State#state{client = Client, session_id = SessionID}, + State#state.token_timeout}; + refused -> + ok = riak_kv_stat:update(token_session_refusal), + {stop, normal, {false, none}, State} + after + RequestTimeout -> + ok = riak_kv_stat:update(token_session_request_timeout), + {stop, normal, {false, none}, State} + end; +handle_call({use_session, Function, Args, ID}, _From, State) + when ID == State#state.session_id -> + C = State#state.client, + R = apply(riak_client, Function, lists:append(Args, [C])), + {reply, R, State, State#state.token_timeout}; +handle_call({release, ID}, _From, State) when ID == State#state.session_id -> + Duration = os:system_time(microsecond) - State#state.start_time, + ok = riak_kv_stat:update({token_session_time, Duration}), + {stop, normal, ok, State}; +handle_call({renew, ID}, _From, State) when ID == State#state.session_id -> + ok = riak_kv_stat:update(token_session_renewal), + {reply, ok, State, State#state.token_timeout}. + +handle_cast(_Msg, State) -> + {stop, normal, State}. + +handle_info({'DOWN', _Ref, process, Manager, Reason}, State) -> + ok = riak_kv_stat:update(token_session_error), + ?LOG_WARNING( + "Token session ~w terminated as manager ~w down for Reason ~w", + [State#state.token_id, Manager, Reason] + ), + {stop, normal, State}; +handle_info(timeout, State) -> + ok = riak_kv_stat:update(token_session_timeout), + ?LOG_INFO( + "Token session ~w terminated due to timeout", + [State#state.token_id] + ), + {stop, normal, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +-spec chash_key(token_id()) -> chash:index(). +chash_key({token, TokenID}) -> + chash:key_of(TokenID); +chash_key(BucketKey) -> + riak_core_util:chash_key(BucketKey). + +-spec encode_session_reference(session_id()) -> session_ref(). +encode_session_reference(SessionID) -> + base64:encode( + term_to_binary({node(), self(), SessionID}) + ). + +-spec decode_session_reference(session_ref()) -> {node(), pid(), session_id()}. +decode_session_reference(SessionReference) -> + binary_to_term(base64:decode(SessionReference)). + + +-if(?OTP_RELEASE < 25). +-spec uniq(List1) -> List2 when + List1 :: [T], + List2 :: [T], + T :: term(). + +uniq(L) -> + uniq_1(L, #{}). + +uniq_1([X | Xs], M) -> + case is_map_key(X, M) of + true -> + uniq_1(Xs, M); + false -> + [X | uniq_1(Xs, M#{X => true})] + end; +uniq_1([], _) -> + []. +-else. +uniq(L) -> lists:uniq(L). +-endif. + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +basic_session_request_test_() -> + {timeout, 10, fun basic_session_request_tester/0}. + +basic_session_request_tester() -> + _TokenManager = riak_kv_token_manager:start_link(), + {true, SessionRef1} = + session_local_request({token, <<"Batch1">>}, [], 1000, 2000), + timer:sleep(1000), + session_renew(SessionRef1), + {1, 0, 1} = riak_kv_token_manager:stats(), + timer:sleep(1001), + {1, 0, 1} = riak_kv_token_manager:stats(), + timer:sleep(1000), + {0, 0, 0} = riak_kv_token_manager:stats(), + + {true, SessionRef2} = + session_local_request({token, <<"Batch2">>}, [], 1000, 2000), + {true, SessionRef3} = + session_local_request({token, <<"Batch3">>}, [], 1000, 2000), + Me = self(), + SpawnFun = + fun(TokenID) -> + fun() -> + SessionRsp = + session_local_request({token, TokenID}, [], 1000, 2000), + Me ! SessionRsp + end + end, + SpawnForQueueKillFun = + fun(TokenID) -> + fun() -> + {ok, SPid} = + gen_server:start(?MODULE, [{{token, TokenID}, 2000}], []), + Me ! {ok, SPid}, + SessionRsp = + gen_server:call(SPid, {request, 1000, []}, infinity), + Me ! SessionRsp + end + end, + spawn(SpawnFun(<<"Batch3">>)), + timer:sleep(100), + {2, 1, 3} = riak_kv_token_manager:stats(), + session_release(SessionRef3), + receive + {true, SessionRef4} -> + ok + end, + P4 = element(2, decode_session_reference(SessionRef4)), + {2, 0, 2} = riak_kv_token_manager:stats(), + spawn(SpawnFun(<<"Batch3">>)), + spawn(SpawnFun(<<"Batch3">>)), + spawn(SpawnForQueueKillFun(<<"Batch3">>)), + receive {ok, P7} -> ok end, + timer:sleep(100), + {2, 3, 5} = riak_kv_token_manager:stats(), + exit(P4, kill), + receive + {true, SessionRef5} -> + ok + end, + {2, 2, 4} = riak_kv_token_manager:stats(), + session_release(SessionRef2), + {1, 2, 3} = riak_kv_token_manager:stats(), + exit(P7, kill), + timer:sleep(100), + {1, 1, 2} = riak_kv_token_manager:stats(), + session_release(SessionRef5), + receive + {true, _SessionRef6} -> + ok + end, + {1, 0, 1} = riak_kv_token_manager:stats(), + + gen_server:stop(riak_kv_token_manager). + + +-endif. \ No newline at end of file diff --git a/src/riak_kv_wm_object.erl b/src/riak_kv_wm_object.erl index dfc66d43b..8db2d24b4 100644 --- a/src/riak_kv_wm_object.erl +++ b/src/riak_kv_wm_object.erl @@ -155,36 +155,39 @@ delete_resource/2 ]). --record(ctx, {api_version, %% integer() - Determine which version of the API to use. - bucket_type, %% binary() - Bucket type (from uri) - type_exists, %% bool() - Type exists as riak_core_bucket_type - bucket, %% binary() - Bucket name (from uri) - key, %% binary() - Key (from uri) - client, %% riak_client() - the store client - r, %% integer() - r-value for reads - w, %% integer() - w-value for writes - dw, %% integer() - dw-value for writes - rw, %% integer() - rw-value for deletes - pr, %% integer() - number of primary nodes required in preflist on read - pw, %% integer() - number of primary nodes required in preflist on write - node_confirms,%% integer() - number of physically diverse nodes required in preflist on write - basic_quorum, %% boolean() - whether to use basic_quorum - notfound_ok, %% boolean() - whether to treat notfounds as successes - asis, %% boolean() - whether to send the put without modifying the vclock - sync_on_write,%% string() - sync on write behaviour to pass to backend - prefix, %% string() - prefix for resource uris - riak, %% local | {node(), atom()} - params for riak client - doc, %% {ok, riak_object()}|{error, term()} - the object found - vtag, %% string() - vtag the user asked for - bucketprops, %% proplist() - properties of the bucket - links, %% [link()] - links of the object - index_fields, %% [index_field()] - method, %% atom() - HTTP method for the request - ctype, %% string() - extracted content-type provided - charset, %% string() | undefined - extracted character set provided - timeout, %% integer() - passed-in timeout value in ms - security %% security context - }). +-record(ctx, + { + api_version, %% integer() - Determine which version of the API to use. + bucket_type, %% binary() - Bucket type (from uri) + type_exists, %% bool() - Type exists as riak_core_bucket_type + bucket, %% binary() - Bucket name (from uri) + key, %% binary() - Key (from uri) + client, %% riak_client() - the store client + r, %% integer() - r-value for reads + w, %% integer() - w-value for writes + dw, %% integer() - dw-value for writes + rw, %% integer() - rw-value for deletes + pr, %% integer() - number of primary nodes required in preflist on read + pw, %% integer() - number of primary nodes required in preflist on write + node_confirms,%% integer() - number of physically diverse nodes required in preflist on write + basic_quorum, %% boolean() - whether to use basic_quorum + notfound_ok, %% boolean() - whether to treat notfounds as successes + asis, %% boolean() - whether to send the put without modifying the vclock + sync_on_write,%% string() - sync on write behaviour to pass to backend + prefix, %% string() - prefix for resource uris + riak, %% local | {node(), atom()} - params for riak client + doc, %% {ok, riak_object()}|{error, term()} - the object found + vtag, %% string() - vtag the user asked for + links, %% [link()] - links of the object + index_fields, %% [index_field()] + method, %% atom() - HTTP method for the request + ctype, %% string() - extracted content-type provided + charset, %% string() | undefined - extracted character set provided + timeout, %% integer() - passed-in timeout value in ms + security, %% security context + not_modified %% decoded vector clock to be used in not_modified check + } +). -ifdef(namespaced_types). -type riak_kv_wm_object_dict() :: dict:dict(). @@ -204,7 +207,6 @@ -type link() :: {{Bucket::binary(), Key::binary()}, Tag::binary()}. --define(DEFAULT_TIMEOUT, 60000). -define(V1_BUCKET_REGEX, "/([^/]+)>; ?rel=\"([^\"]+)\""). -define(V1_KEY_REGEX, "/([^/]+)/([^/]+)>; ?riaktag=\"([^\"]+)\""). -define(V2_BUCKET_REGEX, "; ?rel=\"([^\"]+)\""). @@ -324,7 +326,7 @@ validate_resource(RD, Ctx, _Perm) -> %% @doc Detects whether fetching the requested object results in an %% error. validate_doc(RD, Ctx) -> - DocCtx = ensure_doc(Ctx), + DocCtx = ensure_doc(RD, Ctx), case DocCtx#ctx.doc of {error, Reason} -> handle_common_error(Reason, RD, DocCtx); @@ -637,7 +639,7 @@ content_types_provided(RD, Ctx=#ctx{method=Method}) when Method =:= 'DELETE' -> {[{"text/html", to_html}], RD, Ctx}; content_types_provided(RD, Ctx0) -> - DocCtx = ensure_doc(Ctx0), + DocCtx = ensure_doc(RD, Ctx0), %% we can assume DocCtx#ctx.doc is {ok,Doc} because of malformed_request case select_doc(DocCtx) of {MD, V} -> @@ -666,7 +668,7 @@ charsets_provided(RD, Ctx=#ctx{method=Method}) when Method =:= 'DELETE' -> {no_charset, RD, Ctx}; charsets_provided(RD, Ctx0) -> - DocCtx = ensure_doc(Ctx0), + DocCtx = ensure_doc(RD, Ctx0), case DocCtx#ctx.doc of {ok, _} -> case select_doc(DocCtx) of @@ -691,13 +693,7 @@ charsets_provided(RD, Ctx0) -> %% used in the PUT request that stored the document in Riak, or %% "identity" and "gzip" if no encoding was specified at PUT-time. encodings_provided(RD, Ctx0) -> - DocCtx = - case Ctx0#ctx.method of - UpdM when UpdM =:= 'PUT'; UpdM =:= 'POST'; UpdM =:= 'DELETE' -> - Ctx0; - _ -> - ensure_doc(Ctx0) - end, + DocCtx = ensure_doc(RD, Ctx0), case DocCtx#ctx.doc of {ok, _} -> case select_doc(DocCtx) of @@ -753,18 +749,10 @@ content_types_accepted(RD, Ctx) -> %% Documents exists if a read request to Riak returns {ok, riak_object()}, %% and either no vtag query parameter was specified, or the value of the %% vtag param matches the vtag of some value of the Riak object. -resource_exists(RD, Ctx0) -> - Method = Ctx0#ctx.method, - ToFetch = - case Method of - UpdM when UpdM =:= 'PUT'; UpdM =:= 'POST'; UpdM =:= 'DELETE' -> - conditional_headers_present(RD) == true; - _ -> - true - end, - case ToFetch of +resource_exists(RD, Ctx0) -> + case element(1, doc_required(RD, Ctx0)) of true -> - DocCtx = ensure_doc(Ctx0), + DocCtx = ensure_doc(RD, Ctx0), case DocCtx#ctx.doc of {ok, Doc} -> case DocCtx#ctx.vtag of @@ -789,7 +777,7 @@ resource_exists(RD, Ctx0) -> false -> % Fake it - rather than fetch to see. If we're deleting we assume % it does exist, and if PUT/POST, assume it doesn't - case Method of + case Ctx0#ctx.method of 'DELETE' -> {true, RD, Ctx0}; _ -> @@ -797,6 +785,16 @@ resource_exists(RD, Ctx0) -> end end. +-spec doc_required(request_data(), context()) -> {boolean(), boolean()}. +doc_required(RD, Context) -> + case Context#ctx.method of + UpdM when UpdM =:= 'PUT'; UpdM =:= 'POST'; UpdM =:= 'DELETE' -> + {conditional_headers_present(RD) == true, false}; + _ -> + {true, true} + end. + + -spec is_conflict(request_data(), context()) -> {boolean(), request_data(), context()}. is_conflict(RD, Ctx) -> @@ -811,7 +809,10 @@ is_conflict(RD, Ctx) -> base64:decode(NotModifiedClock)), CurrentClock = riak_object:vclock(Obj), - {not vclock:equal(InClock, CurrentClock), RD, Ctx}; + {not vclock:equal(InClock, CurrentClock), + RD, + Ctx#ctx{not_modified = InClock} + }; _ -> {true, RD, Ctx} end; @@ -873,7 +874,9 @@ accept_doc_body( Ctx=#ctx{ bucket_type=T, bucket=B, key=K, client=C, links=L, ctype=CType, charset=Charset, - index_fields=IF}) -> + index_fields=IF, + not_modified = IfNotModified + }) -> Doc0 = riak_object:new(riak_kv_wm_utils:maybe_bucket_type(T,B), K, <<>>), VclockDoc = riak_object:set_vclock(Doc0, decode_vclock_header(RD)), UserMeta = extract_user_meta(RD), @@ -902,14 +905,68 @@ accept_doc_body( _ -> [] end, Options = make_options(Options0, Ctx), - NoneMatch = (wrq:get_req_header("If-None-Match", RD) =/= undefined), - Options2 = case riak_kv_util:consistent_object(B) and NoneMatch of - true -> - [{if_none_match, true}|Options]; - false -> - Options - end, - case riak_client:put(Doc, Options2, C) of + IfNoneMatch = (wrq:get_req_header("If-None-Match", RD) =/= undefined), + IsConsistent = riak_kv_util:consistent_object(B), + CondPutMode = + application:get_env(riak_kv, conditional_put_mode, api_only), + MakeTokenRequest = CondPutMode =/= api_only, + + {CondPutOptions, SessionToken} = + case {IfNotModified, IfNoneMatch, IsConsistent, MakeTokenRequest} of + {_, true, true, _} -> + {[{if_none_match, true}], none}; + {undefined, false, false, _} -> + {[], none}; + {NotMod, NoneMatch, _, true} -> + TokenResult = + riak_kv_token_session:session_request_retry({B, K}), + case TokenResult of + {true, Token} -> + GetOpts = + [ + {basic_quorum, true}, + {return_body, false}, + {deleted_vclock, true} + ], + Condition = + case NotMod of + undefined -> + {undefined, true, GetOpts}; + InClock -> + {{true, InClock}, undefined, GetOpts} + end, + {[{condition_check, Condition}], Token}; + _ -> + %% Pass the condition downstream, but currently that + %% condition is ignored + case {NotMod, NoneMatch} of + {_, true} -> + {[{if_none_match, true}], none}; + {InClock, _} -> + {[{if_not_modified, InClock}], none} + end + end; + {NotMod, NoneMatch, _, false} -> + %% Pass the condition downstream, but currently that + %% condition is ignored + case {NotMod, NoneMatch} of + {_, true} -> + {[{if_none_match, true}], none}; + {InClock, _} -> + {[{if_not_modified, InClock}], none} + end + end, + PutRsp = + case SessionToken of + none -> + riak_client:put(Doc, CondPutOptions ++ Options, C); + _ -> + riak_kv_token_session:session_use( + SessionToken, put, [Doc, CondPutOptions ++ Options] + ) + end, + riak_kv_token_session:session_release(SessionToken), + case PutRsp of {error, Reason} -> handle_common_error(Reason, RD, Ctx); ok -> @@ -1146,29 +1203,37 @@ decode_vclock_header(RD) -> Head -> riak_object:decode_vclock(base64:decode(Head)) end. --spec ensure_doc(context()) -> context(). +-spec ensure_doc(request_data(), context()) -> context(). %% @doc Ensure that the 'doc' field of the context() has been filled %% with the result of a riak_client:get request. This is a %% convenience for memoizing the result of a get so it can be %% used in multiple places in this resource, without having to %% worry about the order of executing of those places. -ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) -> +ensure_doc(_RD, Ctx=#ctx{doc=undefined, key=undefined}) -> Ctx#ctx{doc={error, notfound}}; -ensure_doc(Ctx=#ctx{doc=undefined, bucket_type=T, bucket=B, key=K, client=C, +ensure_doc(RD, Ctx=#ctx{doc=undefined, bucket_type=T, bucket=B, key=K, client=C, basic_quorum=Quorum, notfound_ok=NotFoundOK}) -> case Ctx#ctx.type_exists of true -> - Options0 = - [deletedvclock, - {basic_quorum, Quorum}, - {notfound_ok, NotFoundOK}], - Options = make_options(Options0, Ctx), - BT = riak_kv_wm_utils:maybe_bucket_type(T,B), - Ctx#ctx{doc=riak_client:get(BT, K, Options, C)}; + case doc_required(RD, Ctx) of + {true, BodyRequired} -> + Options0 = + [ + deletedvclock, + {basic_quorum, Quorum}, + {return_body, BodyRequired}, + {notfound_ok, NotFoundOK} + ], + Options = make_options(Options0, Ctx), + BT = riak_kv_wm_utils:maybe_bucket_type(T,B), + Ctx#ctx{doc=riak_client:get(BT, K, Options, C)}; + _ -> + Ctx + end; false -> Ctx#ctx{doc={error, bucket_type_unknown}} end; -ensure_doc(Ctx) -> Ctx. +ensure_doc(_RD, Ctx) -> Ctx. -spec delete_resource(#wm_reqdata{}, context()) -> {true, #wm_reqdata{}, context()}. @@ -1419,6 +1484,11 @@ handle_common_error(Reason, RD, Ctx) -> {{halt, 503}, wrq:append_to_response_body(Msg, RD), Ctx}; {error, failed} -> {{halt, 412}, RD, Ctx}; + {error, "match_found"} -> + {{halt, 412}, RD, Ctx}; + {error, "modified"} -> + {{halt, 409}, RD, Ctx}; + {error, Err} -> {{halt, 500}, wrq:set_resp_header(?HEAD_CTYPE, "text/plain", diff --git a/src/riak_object.erl b/src/riak_object.erl index e348d304e..d0b7dc1e2 100644 --- a/src/riak_object.erl +++ b/src/riak_object.erl @@ -264,9 +264,23 @@ find_bestobject(FetchedItems) -> % the first received (the fastest responder) - as if there is a need % for a follow-up fetch, we should prefer the vnode that had responded % fastest to he HEAD (this may be local). - ObjNotJustHeadFun = fun({_Idx, Rsp}) -> not is_head(Rsp) end, - {Objects, Heads} = lists:partition(ObjNotJustHeadFun, FetchedItems), + ObjNotJustHeadFun = fun({_Idx, Rsp}) -> not is_head(Rsp) end, + {Objects, MaybeDupHeads} = + lists:partition(ObjNotJustHeadFun, FetchedItems), + + %% If we've fetched the object, ignore the head + Heads = + lists:foldl( + fun({Idx, _}, UpdHeads) -> + lists:keydelete(Idx, 1, UpdHeads) + end, + MaybeDupHeads, + Objects + ), + %% prefer full objects to heads + %% + %% FoldList = Heads ++ Objects, DescendsFun = @@ -1793,19 +1807,34 @@ convert_object_to_headonly(B, K, Object) -> VclockLen:32/integer, VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Binary, + io:format("SibCount ~w SibsBin size ~w~n", [SibCount, byte_size(SibsBin)]), + ConvertedSibsBin = convert_sibling_data(SibCount, SibsBin, <<>>), + HeadBin = + <>, + from_binary(B, K, HeadBin). + +convert_sibling_data(1, SibBin, AccBin) -> + {SibBin0, <<>>} = convert_individual_sibling(SibBin), + <>; +convert_sibling_data(N, SibsBin, AccBin) -> + {SibBin0, RestBin} = convert_individual_sibling(SibsBin), + convert_sibling_data(N - 1, RestBin, <>). + +convert_individual_sibling(SibsBin) -> <> = SibsBin, - SibsBin0 = <<0:32/integer, - MetaLen:32/integer, - MetaBinRest:MetaLen/binary>>, - HeadBin = <>, - from_binary(B, K, HeadBin). + MetaBinRest:MetaLen/binary, + OtherSiblings/binary>> = SibsBin, + SibBin0 = + <<0:32/integer, + MetaLen:32/integer, + MetaBinRest:MetaLen/binary>>, + {<>, OtherSiblings}. val_decoding_headresponse_test() -> % An empty binary as a value results in the content value being marked as @@ -1851,6 +1880,25 @@ find_bestobject_equal_test() -> {1, {ok, Obj1}}, {3, {ok, Obj3}}])). +find_bestobject_headget_confusion_reconcile() -> + B = <<"buckets_are_binaries">>, + K = <<"keys are binaries">>, + {_Obj1, UpdO} = update_test(), + Obj2 = riak_object:increment_vclock(UpdO, one_pid), + Obj3 = riak_object:increment_vclock(UpdO, alt_pid), + + ReplyH1 = {1, {ok, convert_object_to_headonly(B, K, Obj2)}}, + ReplyB1 = {1, {ok, Obj3}}, + ReplyB2 = {2, {ok, Obj2}}, + ReplyB3 = {3, {ok, Obj3}}, + + Replies = [ReplyB2, ReplyB1, ReplyB2, ReplyH1, ReplyB3], + ?assertMatch( + {[ReplyB2, ReplyB1, ReplyB2, ReplyB3], []}, + find_bestobject(Replies) + ). + + find_bestobject_ancestor() -> % one object is behind, and one of the dominant objects is head_only B = <<"buckets_are_binaries">>, @@ -1962,7 +2010,8 @@ bucket_prop_needers_test_() -> fun(_) -> meck:unload(riak_core_bucket) end, - [{"Ancestor", fun ancestor/0}, + [ + {"Ancestor", fun ancestor/0}, {"Ancestor Weird Clocks", fun ancestor_weird_clocks/0}, {"Reconcile", fun reconcile/0}, {"Merge 1", fun merge1/0}, @@ -1979,8 +2028,12 @@ bucket_prop_needers_test_() -> {"Mixed Merge 2", fun mixed_merge2/0}, {"Find Object Ancestor", fun find_bestobject_ancestor/0}, {"Find Object Reconcile", fun find_bestobject_reconcile/0}, + {"Find Object Head-Get Confusion Reconcile", + fun find_bestobject_headget_confusion_reconcile/0}, {"Test Summary Bin Extract", fun summary_binary_extract/0}, - {"Next Gen Repl Encode/Decode", fun nextgenrepl/0}] + {"Next Gen Repl Encode/Decode", fun nextgenrepl/0}, + {"Simple Head/Get merge", fun simple_merge_head_and_get/0} + ] }. ancestor() -> @@ -2477,6 +2530,19 @@ summary_binary_extract() -> ?assertMatch(true, element(1, is_aae_object_deleted(ObjBinD, true))), ?assertMatch(true, element(1, is_aae_object_deleted(ObjBinE, true))). +simple_merge_head_and_get() -> + B = <<"HeadTestB">>, + K = <<"HeadTestK">>, + Obj1 = new(B, K, <<"{\"a\":1}">>, "application/json"), + Obj2 = increment_vclock(Obj1, one_pid), + Obj3 = increment_vclock(Obj2, alt_pid), + Obj4 = convert_object_to_headonly(B, K, Obj2), + ObjMerge1 = merge(merge(Obj3, Obj4), Obj1), + ObjMerge2 = merge(Obj1, merge(Obj4, Obj3)), + ObjHead1 = convert_object_to_headonly(B, K, ObjMerge1), + ObjHead2 = convert_object_to_headonly(B, K, ObjMerge2), + ?assert(ObjHead1 == ObjHead2). + trim_value_frombinary(<