Skip to content

Commit

Permalink
Add AAE
Browse files Browse the repository at this point in the history
  • Loading branch information
martinsumner committed Feb 14, 2023
1 parent 617dc7e commit feaddca
Showing 1 changed file with 90 additions and 63 deletions.
153 changes: 90 additions & 63 deletions tests/nextgenrepl_bouncingtomb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

-define(INIT_MAX_DELAY, 10).
-define(STND_MAX_DELAY, 3600).
-define(BIG_REPL_SLEEP, 4000).
-define(DELETE_TIMEOUT, 1000).
-define(BIG_REPL_SLEEP, ?DELETE_TIMEOUT + 200).

-define(LOOP_COUNT, 5).
-define(STATS_WAIT, 1000).
-define(LOOP_COUNT, 8).
-define(STATS_WAIT, 500).


-define(CORE_CONFIG(RingSize, NVal),[
Expand All @@ -51,8 +52,9 @@
% Necessary to count tombstones using reap_tombs query
{tictacaae_rebuildwait, 4},
{tictacaae_rebuilddelay, 3600},
{tictacaae_exchangetick, 3600000}, % don't tick for an hour!
{tictacaae_rebuildtick, 3600000} % don't tick for an hour!
{tictacaae_exchangetick, 15000},
{tictacaae_rebuildtick, 3600000}, % don't tick for an hour!
{tictacaae_suspend, true}
]).

-define(CONFIG(RingSize, NVal, Q, DeleteMode),
Expand Down Expand Up @@ -102,7 +104,7 @@ confirm() ->

[ClusterA1, ClusterB1] =
rt:deploy_clusters([
{3, ?CONFIG(?A_RING, ?A_NVAL, cluster_b, 1000)},
{3, ?CONFIG(?A_RING, ?A_NVAL, cluster_b, ?DELETE_TIMEOUT)},
{3, ?CONFIG(?B_RING, ?B_NVAL, cluster_a, keep)}]),

pass = with_insomnia_test(ClusterA1, ClusterB1),
Expand All @@ -117,7 +119,7 @@ confirm() ->
[ClusterA2, ClusterB2, ClusterC2] =
rt:deploy_clusters([
{2, ?CONFIG(?A_RING, ?A_NVAL, cluster_b, keep)},
{2, ?CONFIG(?B_RING, ?B_NVAL, cluster_a, 1000)},
{2, ?CONFIG(?B_RING, ?B_NVAL, cluster_a, ?DELETE_TIMEOUT)},
{1, ?CONFIG(?C_RING, ?C_NVAL, q1_ttaaefs, keep)}]),

no_insomnia_test(ClusterA2, ClusterB2, ClusterC2).
Expand All @@ -144,10 +146,18 @@ no_insomnia_test(ClusterA, ClusterB, ClusterC) ->
{IP, Port}
end,

reset_peer_config(NodeA1, cluster_a, ?A_RING, ?A_NVAL, cluster_b, PeerConfigFun(NodeB1), 1000),
reset_peer_config(NodeA2, cluster_a, ?A_RING, ?A_NVAL, cluster_b, PeerConfigFun(NodeB2), 1000),
reset_peer_config(NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA1), keep),
reset_peer_config(NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA2), keep),
reset_peer_config(
NodeA1, cluster_a, ?A_RING, ?A_NVAL, cluster_b,
PeerConfigFun(NodeB1), ?DELETE_TIMEOUT),
reset_peer_config(
NodeA2, cluster_a, ?A_RING, ?A_NVAL, cluster_b,
PeerConfigFun(NodeB2), ?DELETE_TIMEOUT),
reset_peer_config(
NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA1), keep),
reset_peer_config(
NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA2), keep),

lager:info("Waiting for convergence."),
rt:wait_until_ring_converged(ClusterA),
Expand Down Expand Up @@ -178,8 +188,12 @@ no_insomnia_test(ClusterA, ClusterB, ClusterC) ->
riakc_pb_socket:stop(SrcC),

lager:info("Resetting delete_mode on Cluster B"),
reset_peer_config(NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA1), 1000),
reset_peer_config(NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA2), 1000),
reset_peer_config(
NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA1), ?DELETE_TIMEOUT),
reset_peer_config(
NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA2), ?DELETE_TIMEOUT),

GetStatsFun =
fun() ->
Expand All @@ -190,36 +204,50 @@ no_insomnia_test(ClusterA, ClusterB, ClusterC) ->
get_stats(NodeB1),
get_stats(NodeB2)
end,
LogFun =
fun(NodeA, NodeB) ->
{ok, A1C} =
rpc:call(NodeA,
riak_client,
aae_fold,
[{reap_tombs, ?TEST_BUCKET, all, all, all, count}]),
lager:info("Cluster A ~w tombs", [A1C]),

{ok, B1C} =
rpc:call(NodeB,
riak_client,
aae_fold,
[{reap_tombs, ?TEST_BUCKET, all, all, all, count}]),
lager:info("Cluster B ~w tombs", [B1C]),

{A1C, B1C}
end,

lager:info("Cluster B has tombstones, but Cluster A has objects"),
{_InitACount, _InitBCount} = LogFun(NodeA1, NodeB1),
lager:info("Enabling AAE to avoid inconsistent results on coverage"),
rpc:multicall(ClusterA, riak_client, tictacaae_resume_node, []),
rpc:multicall(ClusterB, riak_client, tictacaae_resume_node, []),
{_InitACount, _InitBCount} = log_fun(NodeA1, NodeB1),

GetStatsFun(),

rotating_full_sync(NodeA1, NodeB1, GetStatsFun, LogFun, ?LOOP_COUNT),
rotating_full_sync(
NodeA1, NodeB1, GetStatsFun, fun log_fun/2, ?LOOP_COUNT),

?assertMatch({0, 0}, LogFun(NodeA1, NodeB1)),
?assertMatch({0, 0}, log_fun(NodeA1, NodeB1)),

pass.

log_fun(NodeA, NodeB) ->
{ok, A1C} =
rpc:call(NodeA,
riak_client,
aae_fold,
[{find_tombs, ?TEST_BUCKET, all, all, all}]),
lager:info("Cluster A ~w tombs", [length(A1C)]),

{ok, B1C} =
rpc:call(NodeB,
riak_client,
aae_fold,
[{find_tombs, ?TEST_BUCKET, all, all, all}]),
lager:info("Cluster B ~w tombs", [length(B1C)]),

case {length(A1C), length(B1C)} of
{0, 0} ->
ok;
{N, M} when N < 10, M < 10 ->
lager:info("Cluster A tombs ~p", [A1C]),
lager:info("Cluster B tombs ~p", [B1C]);
_ ->
ok
end,

{length(A1C), length(B1C)}.



with_insomnia_test(ClusterA, ClusterB) ->

Expand All @@ -240,19 +268,32 @@ with_insomnia_test(ClusterA, ClusterB) ->
{IP, Port}
end,

reset_peer_config(NodeA1, cluster_a, ?A_RING, ?A_NVAL, cluster_b, PeerConfigFun(NodeB1), 1000),
reset_peer_config(NodeA2, cluster_a, ?A_RING, ?A_NVAL, cluster_b, PeerConfigFun(NodeB2), 1000),
reset_peer_config(NodeA3, cluster_a, ?A_RING, ?A_NVAL, cluster_b, PeerConfigFun(NodeB3), 1000),
reset_peer_config(NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA1), keep),
reset_peer_config(NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA2), keep),
reset_peer_config(NodeB3, cluster_b, ?B_RING, ?B_NVAL, cluster_a, PeerConfigFun(NodeA3), keep),
reset_peer_config(
NodeA1, cluster_a, ?A_RING, ?A_NVAL, cluster_b,
PeerConfigFun(NodeB1), ?DELETE_TIMEOUT),
reset_peer_config(
NodeA2, cluster_a, ?A_RING, ?A_NVAL, cluster_b,
PeerConfigFun(NodeB2), ?DELETE_TIMEOUT),
reset_peer_config(
NodeA3, cluster_a, ?A_RING, ?A_NVAL, cluster_b,
PeerConfigFun(NodeB3), ?DELETE_TIMEOUT),
reset_peer_config(
NodeB1, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA1), keep),
reset_peer_config(
NodeB2, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA2), keep),
reset_peer_config(
NodeB3, cluster_b, ?B_RING, ?B_NVAL, cluster_a,
PeerConfigFun(NodeA3), keep),

lager:info("Waiting for convergence."),
rt:wait_until_ring_converged(ClusterA),
rt:wait_until_ring_converged(ClusterB),
lager:info("Confirm riak_kv is up on all nodes."),
lists:foreach(fun(N) -> rt:wait_for_service(N, riak_kv) end,
ClusterA ++ ClusterB),
lists:foreach(
fun(N) -> rt:wait_for_service(N, riak_kv) end,
ClusterA ++ ClusterB),

lager:info("Wait for peer discovery"),
timer:sleep((?INIT_MAX_DELAY + 1) * 1000),
Expand Down Expand Up @@ -283,33 +324,19 @@ with_insomnia_test(ClusterA, ClusterB) ->
get_stats(NodeB2),
get_stats(NodeB3)
end,
LogFun =
fun(NodeA, NodeB) ->
{ok, A1C} =
rpc:call(NodeA,
riak_client,
aae_fold,
[{reap_tombs, ?TEST_BUCKET, all, all, all, count}]),
lager:info("Cluster A ~w tombs", [A1C]),

{ok, B1C} =
rpc:call(NodeB,
riak_client,
aae_fold,
[{reap_tombs, ?TEST_BUCKET, all, all, all, count}]),
lager:info("Cluster B ~w tombs", [B1C]),

{A1C, B1C}
end,

lager:info("Cluster B has tombstones, but Cluster A should have reaped"),
{_InitACount, _InitBCount} = LogFun(NodeA1, NodeB1),
lager:info("Enabling AAE to avoid inconsistent results on coverage"),
rpc:multicall(ClusterA, riak_client, tictacaae_resume_node, []),
rpc:multicall(ClusterB, riak_client, tictacaae_resume_node, []),
{_InitACount, _InitBCount} = log_fun(NodeA1, NodeB1),

GetStatsFun(),

rotating_full_sync(NodeA1, NodeB1, GetStatsFun, LogFun, ?LOOP_COUNT),
rotating_full_sync(
NodeA1, NodeB1, GetStatsFun, fun log_fun/2, ?LOOP_COUNT),

?assertMatch({0, 0}, LogFun(NodeA1, NodeB1)),
?assertMatch({0, 0}, log_fun(NodeA1, NodeB1)),

pass.

Expand Down

0 comments on commit feaddca

Please sign in to comment.