Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge riak/2.1 branch to master #977

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
81d74cd
yokozuna_rt additions for types and better testing w/ allow-mult=true…
zeeshanlakhani Oct 9, 2015
6f0e534
Merge pull request #886 from basho/backport/zl/yz_extractors+rt-fixum…
fadushin Oct 27, 2015
a255cc1
Add debugging to rt:do_commit/1 to track nothing_planned
Nov 10, 2015
1b34707
Merge pull request #925 from basho/bugfix/bch/add-cluster-commit-logging
Nov 10, 2015
78096fa
Merge branch 'riak/2.0' into merge/bch/2.0to2.1-cluster-commit-debug
Nov 10, 2015
f65bf6f
Merge pull request #926 from basho/merge/bch/2.0to2.1-cluster-commit-…
Nov 10, 2015
ce8ef79
Updated to attempt to make sure we get the "right" vnode PID.
Nov 16, 2015
5c2c474
Merge pull request #930 from basho/dr/check_vnode_for_liveliness
JeetKunDoug Nov 16, 2015
867dc1a
sure up possible race between upgrade and first check
zeeshanlakhani Nov 18, 2015
cc05f86
Merge pull request #932 from basho/release-time/zl/sure-up-yz_extractors
zeeshanlakhani Nov 18, 2015
d855c1f
Log the start of each test in repl_aae_fullsync
nickelization Nov 24, 2015
60577a5
Merge pull request #939 from basho/repl_aae_fullsync-logging
JeetKunDoug Nov 24, 2015
619b24e
Updated tests to explicitly set allow_mult and dvv_enabled, as
Nov 24, 2015
8e5b9a8
Fix misplaced character in ensemble_util config
nickelization Nov 24, 2015
288fc66
Merge pull request #942 from basho/dr/bugfix/update_tests_for_differe…
JeetKunDoug Nov 25, 2015
57659d3
Forward port changes to proxy_overload_recovery to make it pass in the
Dec 1, 2015
010c35a
Merge pull request #947 from basho/dr/forward/proxy_overload_recovery…
JeetKunDoug Dec 1, 2015
2c2e57d
Move wait_for_vnode_change inside the -ifdef(EQC) clause.
tburghart Dec 9, 2015
d76c933
Merge pull request #956 from basho/bugfix/trb/fix-build-without-eqc
JeetKunDoug Dec 9, 2015
eec0b42
Merge branch 'riak/2.1' into merge/dr/merge_2.1_to_master
Jan 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ staged_join(Node, PNode) ->

plan_and_commit(Node) ->
timer:sleep(500),
lager:info("planning and commiting cluster join"),
lager:info("planning cluster join"),
case rpc:call(Node, riak_core_claimant, plan, []) of
{error, ring_not_ready} ->
lager:info("plan: ring not ready"),
Expand All @@ -461,6 +461,7 @@ plan_and_commit(Node) ->
end.

do_commit(Node) ->
lager:info("planning cluster commit"),
case rpc:call(Node, riak_core_claimant, commit, []) of
{error, plan_changed} ->
lager:info("commit: plan changed"),
Expand All @@ -472,8 +473,9 @@ do_commit(Node) ->
timer:sleep(100),
maybe_wait_for_changes(Node),
do_commit(Node);
{error,nothing_planned} ->
{error, nothing_planned} ->
%% Assume plan actually committed somehow
lager:info("commit: nothing planned"),
ok;
ok ->
ok
Expand Down Expand Up @@ -719,7 +721,13 @@ wait_until_no_pending_changes(Nodes) ->
rpc:multicall(Nodes, riak_core_vnode_manager, force_handoffs, []),
{Rings, BadNodes} = rpc:multicall(Nodes, riak_core_ring_manager, get_raw_ring, []),
Changes = [ riak_core_ring:pending_changes(Ring) =:= [] || {ok, Ring} <- Rings ],
BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes)
case BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes) of
true -> true;
false ->
NodesWithChanges = [Node || {Node, false} <- lists:zip(Nodes -- BadNodes, Changes)],
lager:info("Changes not yet complete, or bad nodes. BadNodes=~p, Nodes with Pending Changes=~p~n", [BadNodes, NodesWithChanges]),
false
end
end,
?assertEqual(ok, wait_until(F)),
ok.
Expand Down
6 changes: 5 additions & 1 deletion tests/bucket_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ confirm() ->
lager:info("Deploy some nodes"),
Nodes = rt:build_cluster(4, [], [
{riak_core, [{default_bucket_props,
[{n_val, 2}]}]}]),
[
{n_val, 2},
{allow_mult, true},
{dvv_enabled, true}
]}]}]),
Node = hd(Nodes),

RMD = riak_test_runner:metadata(),
Expand Down
13 changes: 9 additions & 4 deletions tests/ensemble_byzantine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,20 @@ confirm() ->
test_lose_minority_synctrees(PBC, Bucket, Key, Val, PL),
test_lose_majority_synctrees(PBC, Bucket, Key, Val, PL),
test_lose_minority_synctrees_one_node_partitioned(PBC, Bucket, Key, Val,
PL, Nodes),
PL, Nodes),
test_lose_all_data_and_trees_except_one_node(PBC, Bucket, Key, Val, PL),
{ok, _NewVal} = test_backup_restore_data_not_trees(Bucket, Key, Val, PL),
test_lose_all_data(PBC, Bucket, Key, PL),

pass.

config() ->
[{riak_core, [{default_bucket_props, [{n_val, 5}]},
[{riak_core, [{default_bucket_props,
[
{n_val, 5},
{allow_mult, true},
{dvv_enabled, true}
]},
{vnode_management_timer, 1000},
{ring_creation_size, 16},
{enable_consensus, true},
Expand All @@ -79,7 +84,7 @@ test_lose_majority_synctrees(PBC, Bucket, Key, Val, PL) ->
assert_lose_synctrees_and_recover(PBC, Bucket, Key, Val, PL, Majority).

test_lose_minority_synctrees_one_node_partitioned(PBC, Bucket, Key, Val, PL,
Nodes) ->
Nodes) ->
Minority = minority_vnodes(PL),
{{Idx0, Node0}, primary} = hd(PL),
Ensemble = {kv, Idx0, 5},
Expand Down Expand Up @@ -251,7 +256,7 @@ kill_peers(Ensemble, Nodes) ->
Peers = [P || P={_Id, N} <- View, lists:member(N, Nodes)],
lager:info("Killing Peers: ~p", [Peers]),
Pids = [rpc:call(Node, riak_ensemble_manager, get_peer_pid,
[Ensemble, Peer]) || Peer <- Peers],
[Ensemble, Peer]) || Peer <- Peers],
[exit(Pid, kill) || Pid <- Pids, Pid =/= undefined].

wipe_partitions(PL) ->
Expand Down
7 changes: 6 additions & 1 deletion tests/ensemble_ring_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
-define(RING_SIZE, 16).

config() ->
[{riak_core, [{default_bucket_props, [{n_val, 5}]},
[{riak_core, [{default_bucket_props,
[
{n_val, 5},
{allow_mult, true},
{dvv_enabled, true}
]},
{vnode_management_timer, 1000},
{ring_creation_size, ?RING_SIZE},
{enable_consensus, true},
Expand Down
13 changes: 9 additions & 4 deletions tests/ensemble_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ fast_config(Nval, EnableAAE) when is_boolean(EnableAAE) ->

fast_config(NVal, RingSize, EnableAAE) ->
[config_aae(EnableAAE),
{riak_core, [{default_bucket_props, [{n_val, NVal}]},
{vnode_management_timer, 1000},
{ring_creation_size, RingSize},
{enable_consensus, true}]}].
{riak_core, [{default_bucket_props,
[
{n_val, NVal},
{allow_mult, true},
{dvv_enabled, true}
]},
{vnode_management_timer, 1000},
{ring_creation_size, RingSize},
{enable_consensus, true}]}].

config_aae(true) ->
{riak_kv, [{anti_entropy_build_limit, {100, 1000}},
Expand Down
6 changes: 5 additions & 1 deletion tests/http_bucket_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ confirm() ->
lager:info("Deploy some nodes"),
Nodes = rt:build_cluster(4, [], [
{riak_core, [{default_bucket_props,
[{n_val, 2}]}]}]),
[
{n_val, 2},
{allow_mult, true},
{dvv_enabled, true}
]}]}]),
Node = hd(Nodes),

RMD = riak_test_runner:metadata(),
Expand Down
2 changes: 1 addition & 1 deletion tests/http_security.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ confirm() ->
PrivDir = rt:priv_dir(),
Conf = [
{riak_core, [
{default_bucket_props, [{allow_mult, true}]},
{default_bucket_props, [{allow_mult, true}, {dvv_enabled, true}]},
{ssl, [
{certfile, filename:join([CertDir,
"site3.basho.com/cert.pem"])},
Expand Down
7 changes: 6 additions & 1 deletion tests/overload.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ default_config(#config{
fsm_limit = FsmLimit
}) ->
[{riak_core, [{ring_creation_size, 8},
{default_bucket_props, [{n_val, 5}]},
{default_bucket_props,
[
{n_val, 5},
{allow_mult, true},
{dvv_enabled, true}
]},
{vnode_management_timer, 1000},
{enable_health_checks, false},
{enable_consensus, true},
Expand Down
2 changes: 1 addition & 1 deletion tests/pb_security.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ confirm() ->
PrivDir = rt:priv_dir(),
Conf = [
{riak_core, [
{default_bucket_props, [{allow_mult, true}]},
{default_bucket_props, [{allow_mult, true}, {dvv_enabled, true}]},
{ssl, [
{certfile, filename:join([CertDir,"site3.basho.com/cert.pem"])},
{keyfile, filename:join([CertDir, "site3.basho.com/key.pem"])},
Expand Down
86 changes: 58 additions & 28 deletions tests/proxy_overload_recovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
%% cleared and the model is reset. The main goal is to advance the proxy
%% into interesting new states.
%%
%% This test can be run outside of riak_test while working on it.
%% This test can be run outside of riak_test while working on it.
%% Symlink the source into a release build and run
%% c(proxy_overload_recovery).
%% c(proxy_overload_recovery).
%% proxy_overload_recovery:run(300). % Run for 5 mins
%%
%% On failure you can re-run counter examples *and* print out the internal
%% state with the run.
%% proxy_overload_recovery:check().
%% proxy_overload_recovery:check().
%%
%% TODO/Questions:
%% 1) Is there a better way to do the initialization step?
Expand Down Expand Up @@ -137,7 +137,7 @@ prop_proxy_recovery() ->
[catch msgq_len(VPid)])
end
end,
measure(duration, Msecs,
measure(duration, Msecs,
aggregate(with_title("Commands"), command_names(Cmds),
pretty_commands(?MODULE, Cmds, {H, S, Res},
Res == ok))))
Expand Down Expand Up @@ -173,13 +173,13 @@ precondition_common(#tstate{rt = undefined}, {call, _M, F, _A}) ->
precondition_common(_, {call, _M, F, _A}) ->
F /= prepare.

%% %% Make sure we're still running what we think we're running - uncomment
%% %% Make sure we're still running what we think we're running - uncomment
%% %% if having process death issues
%% invariant(#tstate{rt = undefined}) ->
%% true;
%% invariant(#tstate{rt = #rt{id = Index, ppid = PPid, vpid = VPid}}) ->
%% RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
%% PPid = whereis(RegName), % Check process we think it is.
%% PPid = whereis(RegName), % Check process we think it is.
%% true = is_process_alive(PPid),
%% true = is_process_alive(VPid),
%% true.
Expand Down Expand Up @@ -208,13 +208,14 @@ prepare(ThresholdSeed) ->
{ok, VPid0} = riak_core_vnode_manager:get_vnode_pid(Id, riak_kv_vnode),
sys:resume(VPid0),
ok = supervisor:terminate_child(riak_core_vnode_sup, VPid0),
false = is_process_alive(VPid0),

%% Reset the proxy pid to make sure it resets state and picks up the new
%% environment variables
ok = supervisor:terminate_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),
RegName = riak_core_vnode_proxy:reg_name(riak_kv_vnode, Index),
undefined = whereis(RegName),
VPid1 = wait_for_vnode_change(VPid0, Index),

{ok, PPid} = supervisor:restart_child(riak_core_vnode_proxy_sup, {riak_kv_vnode, Id}),

%% Find the proxy pid and check it's alive and matches the supervisor
Expand All @@ -225,6 +226,7 @@ prepare(ThresholdSeed) ->
%% and return the Pid so we know we have the same Pid.
{ok, VPid} = riak_core_vnode_proxy:command_return_vnode(
{riak_kv_vnode,Index,node()}, timeout),
?assertEqual(VPid, VPid1),

true = is_process_alive(PPid),
true = is_process_alive(VPid),
Expand Down Expand Up @@ -264,14 +266,14 @@ resume_args(#tstate{rt = RT}) ->
resume(#rt{ppid = PPid, vpid = VPid}) ->
sys:resume(VPid),
%% Use the sys:get_status call to force a synchronous call
%% against the vnode proxy to ensure all messages sent by
%% against the vnode & the proxy to ensure all messages sent by
%% this process have been serviced and there are no pending
%% 'ping's in the vnode before we continue.
%% Then drain the vnode to make sure any pending pongs have
%% been sent.
ok = drain(VPid),
%% been sent, and ensure the proxy has
_ = sys:get_status(PPid),
_ = sys:get_status(VPid),
_ = sys:get_status(PPid).
ok = drain([VPid, PPid]).

resume_next(S, _V, _A) ->
S#tstate{vnode_running = true, proxy_msgs = 0, direct_msgs = 0}.
Expand Down Expand Up @@ -324,28 +326,28 @@ overloaded_args(#tstate{vnode_running = Running, rt = RT}) ->
overloaded(Running, #rt{ppid = PPid, vpid = VPid}) ->
case Running of
true ->
ok = drain(PPid), % make sure all proxy msgs processed/dropped
ok = drain(VPid); % make sure any pending ping/pongs are processed
ok = drain([PPid, VPid]);
_ ->
ok
end,
{riak_core_vnode_proxy:overloaded(PPid),
msgq_len(VPid), % just for debug so we can review in log output
sys:get_status(PPid)}. % ditto
{messages, PMsgs} = process_info(PPid, messages),
{messages, VMsgs} = process_info(VPid, messages),
Overloaded = riak_core_vnode_proxy:overloaded(PPid),
{Overloaded, {VMsgs, PMsgs}, sys:get_status(PPid)}.

overloaded_post(#tstate{threshold = undefined}, _A,
{R, _VnodeQ, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If there are no thresholds there should never be an overload
eq(R, false);
overloaded_post(#tstate{vnode_running = true}, _A,
{R, _VnodeQ = 0, _ProxyStatus}) ->
{R, _Messages, _ProxyStatus}) ->
%% If the vnode is running, we have cleared queues so
%% should not be in overload.
eq(R, false);
overloaded_post(#tstate{vnode_running = false,
proxy_msgs = ProxyMsgs,
threshold = Threshold}, _A,
{ResultOverload, _VnodeQ, _ProxyStatus}) ->
{ResultOverload, _Messages, _ProxyStatus}) ->
%% Either
%% mailbox is completely an estimate based on proxy msgs
%% or mailbox is a check + estimate since
Expand Down Expand Up @@ -392,16 +394,33 @@ prep_env(Var, Val) ->
%% Wait until all messages are drained by the Pid. No guarantees
%% about future messages being sent, or that responses for the
%% last message consumed have been transmitted.
%%
drain(Pid) ->
case erlang:process_info(Pid, message_queue_len) of
{message_queue_len, 0} ->
%% NOTE: The "drain 3 times in a row" was determined empirically,
%% and may not be sufficient (2 was not). Given time constraints,
%% living with it for now. If this fails, we should really add some
%% tracing code around the send of messages to Vnode and Proxy to
%% determine where extra messages are coming from rather than just
%% make this "try 4 times"
%%
drain(Pid) when is_pid(Pid) ->
drain([Pid], {-1, -1});

drain(Pids) when is_list(Pids) ->
drain(Pids, {-1, -1}).
drain(Pids, {PrevPrev, Prev}) ->
_ = [sys:suspend(Pid) || Pid <- Pids],
Len = lists:foldl(fun(Pid, Acc0) ->
{message_queue_len, Len} = erlang:process_info(Pid, message_queue_len),
Acc0 + Len
end, 0, Pids),
_ = [sys:resume(Pid) || Pid <- Pids],
case {PrevPrev, Prev, Len} of
{0, 0, 0} ->
ok;
{message_queue_len, L} when L > 0 ->
timer:sleep(1), % give it a millisecond to drain
drain(Pid);
ER ->
ER
_ ->
%% Attempt to ensure something else is scheduled before we try to drain again
erlang:yield(),
timer:sleep(1),
drain(Pids, {Prev, Len})
end.

%% Return the length of the message queue (or crash if proc dead)
Expand Down Expand Up @@ -448,6 +467,17 @@ add_eqc_apps(Nodes) ->
end || App <- Apps, Node <- Nodes],
ok.


wait_for_vnode_change(VPid0, Index) ->
{ok, VPid1} = riak_core_vnode_manager:get_vnode_pid(Index, riak_kv_vnode),
case VPid1 of
VPid0 ->
timer:sleep(1),
wait_for_vnode_change(VPid0, Index);
_ ->
VPid1
end.

-else. %% no EQC

-export([confirm/0]).
Expand Down
Loading