Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Tiot/openriak 3.4/tictacaae and nextgenrepl cli commands #55

Open
wants to merge 18 commits into
base: openriak-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
]}.

{profiles, [
{test, [{deps, [meck]}]},
{test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloning your own version of meck seems to me the wrong way to go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is consistent with our approach everywhere, which is to clone all repos we depend on - only we didn't do it previously for test.

If we just get meck latest, as was - we're subject to time-expensive surprises when all our deps can no longer update. If we tag to a version, then we still need to check upstream for critical updates.

There's no perfect solution, so I'm happy that in this case we should resolve with the same solution used for non-test dependencies.

{gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]}
]}.

Expand All @@ -46,6 +46,6 @@
{riak_dt, {git, "https://github.com/OpenRiak/riak_dt.git", {branch, "openriak-3.2"}}},
{riak_api, {git, "https://github.com/OpenRiak/riak_api.git", {branch, "openriak-3.4"}}},
{hyper, {git, "https://github.com/OpenRiak/hyper", {branch, "openriak-3.2"}}},
{kv_index_tictactree, {git, "https://github.com/OpenRiak/kv_index_tictactree.git", {branch, "openriak-3.4"}}},
{kv_index_tictactree, {git, "https://github.com/TI-Tokyo/kv_index_tictactree.git", {tag, "tiot-openriak-3.4-tictacaae-and-nextgenrepl-cli-commands"}}},
{rhc, {git, "https://github.com/OpenRiak/riak-erlang-http-client", {branch, "openriak-3.4"}}}
]}.
301 changes: 300 additions & 1 deletion src/riak_kv_console.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
bucket_type_create/1,
bucket_type_update/1,
bucket_type_reset/1,
bucket_type_list/1]).
bucket_type_list/1,
tictacaae_cmd/1
]).

-export([ensemble_status/1]).

Expand Down Expand Up @@ -747,6 +749,303 @@ ensemble_status([Str]) ->
riak_kv_ensemble_console:ensemble_detail(N)
end.


tictacaae_cmd(Args) ->
case application:get_env(riak_kv, tictacaae_active) of
{ok, active} ->
tictacaae_cmd2(Args);
_ ->
io:format("tictacaae not active\n", [])
end.
tictacaae_cmd2([Item | Cmdline]) ->
try
{ok, {Options, Args}} = getopt:parse(tictacaae_cmd_optspecs(), Cmdline),
Nodes = extract_nodes(Options),
Partitions = extract_partitions(Options),
case Item of
"rebuildwait" when length(Args) == 1 ->
Hours = list_to_integer(hd(Args)),
set_tictacaae_option(tictacaae_rebuildwait, Nodes, Hours);
"rebuildwait" ->
print_tictacaae_option(tictacaae_rebuildwait, Nodes);

"rebuilddelay" when length(Args) == 1 ->
Minutes = list_to_integer(hd(Args)),
set_tictacaae_option(tictacaae_rebuilddelay, Nodes, Minutes);
"rebuilddelay" ->
print_tictacaae_option(tictacaae_rebuilddelay, Nodes);

"rebuildtick" when length(Args) == 1 ->
Msec = list_to_integer(hd(Args)),
set_tictacaae_option(tictacaae_rebuildtick, Nodes, Msec);
"rebuildtick" ->
print_tictacaae_option(tictacaae_rebuildtick, Nodes);

"exchangetick" when length(Args) == 1 ->
MSec = list_to_integer(hd(Args)),
set_tictacaae_option(tictacaae_exchangetick, Nodes, MSec);
"exchangetick" ->
print_tictacaae_option(tictacaae_exchangetick, Nodes);

"maxresults" when length(Args) == 1 ->
N = list_to_integer(hd(Args)),
set_tictacaae_option(tictacaae_maxresults, Nodes, N);
"maxresults" ->
print_tictacaae_option(tictacaae_maxresults, Nodes);

"storeheads" when length(Args) == 1 ->
Enabled = ("true" == hd(Args)),
set_tictacaae_option(tictacaae_storeheads, Nodes, Enabled);
"storeheads" ->
print_tictacaae_option(tictacaae_storeheads, Nodes);

"tokenbucket" when length(Args) == 1 ->
Enabled = ("true" == hd(Args)),
set_tictacaae_option(aae_tokenbucket, Nodes, Enabled);
"tokenbucket" ->
print_tictacaae_option(aae_tokenbucket, Nodes);

"rebuildtreeworkers" when length(Args) == 1 ->
N = list_to_integer(hd(Args)),
set_tictacaae_option(af1_worker_pool_size, Nodes, N);
"rebuildtreeworkers" ->
print_tictacaae_option(af1_worker_pool_size, Nodes);

"rebuildstoreworkers" when length(Args) == 1 ->
N = list_to_integer(hd(Args)),
set_tictacaae_option(be_worker_pool_size, Nodes, N);
"rebuildstoreworkers" ->
print_tictacaae_option(be_worker_pool_size, Nodes);

"aaefoldworkers" when length(Args) == 1 ->
N = list_to_integer(hd(Args)),
set_tictacaae_option(af4_worker_pool_size, Nodes, N);
"aaefoldworkers" ->
print_tictacaae_option(af4_worker_pool_size, Nodes);

"rebuild-soon" when length(Args) == 1 ->
AffectedVNodes = schedule_nextrebuild(Nodes, Partitions, list_to_integer(hd(Args))),
if length(Nodes) == 1 ->
io:format("scheduled rebuild of aae trees on ~b partition~s on ~s\n",
[length(AffectedVNodes), ending(AffectedVNodes), hd(Nodes)]);
el/=se ->
io:format("scheduled rebuild of aae trees on ~b nodes\n",
[length(Nodes)])
end;

"rebuild-now" when length(Args) == 0 ->
AffectedVNodes = schedule_nextrebuild(Nodes, Partitions, 0),
poke_for_rebuild(AffectedVNodes),
if length(Nodes) == 1 ->
io:format("rebuilding aae trees on ~b partition~s on ~s\n",
[length(AffectedVNodes), ending(AffectedVNodes), hd(Nodes)]);
el/=se ->
io:format("rebuilding aae trees on ~b nodes\n",
[length(Nodes)])
end;

"treestatus" when length(Args) == 0 ->
case {Nodes, Partitions} of
{[N], all} when N == node() ->
print_aae_progress_report(Options);
_ ->
io:format("treestatus option only supported on local node\n", [])
end;

_ ->
io:format("Unknown item or wrong number of arguments\n", [])
end
catch
error:_ ->
io:format(
"Usage: riak admin tictacaae treestatus [-n [<node>]] [-p [<partition>]]
[--format [<format>]]
[--show [<show>]] ITEM

-n, --node Node, or all [default: [email protected]]
-p, --partition Partition, or all [default: all]
--format table or json [default: table]
--show tree states to show [default:
unbuilt,rebuilding,building]

where ITEM is one of rebuildwait, rebuiddelay, rebuildtick,
exchangetick, maxresults, storeheads, tokenbucket,
rebuildtreeworkers, rebuildstoreworkers, aaefoldworkers,
rebuild-soon, rebuild-now, treestatus.
")
end.

print_tictacaae_option(A, Nodes) ->
[begin
{ok, Current} = rpc:call(Node, application, get_env, [riak_kv, A]),
io:format("~s on ~s is ~p\n", [A, Node, Current])
end || Node <- Nodes],
ok.

set_tictacaae_option(A, Nodes, V) ->
[ok = rpc:call(Node, application, set_env, [riak_kv, A, V])
|| Node <- Nodes],
ok.


schedule_nextrebuild(Nodes, Partitions, Delay) ->
lists:foldl(
fun(Node, Q) ->
VVNN = vnodes(Node, Partitions),
ok = rpc:call(Node, riak_kv_vnode, aae_schedule_nextrebuild, [VVNN, Delay]),
Q ++ VVNN
end, [], Nodes).

vnodes(Node, all) ->
{ok, Ring} = rpc:call(Node, riak_core_ring_manager, get_my_ring, []),
[VN || VN = {_, Owner} <- rpc:call(Node, riak_core_ring, all_owners, [Ring]), Owner =:= Node];
vnodes(Node, List) ->
[{P, Node} || P <- List].

poke_for_rebuild(VNodes) ->
DistinctNodes = lists:usort([N || {_, N} <- VNodes]),
lists:foreach(
fun(N) ->
rpc:call(N, riak_core_vnode_master, command,
[vnodes_at(N, VNodes), tictacaae_rebuildpoke, riak_kv_vnode_master])
end, DistinctNodes).

vnodes_at(Node, VVNN) ->
[VN || VN = {_, N} <- VVNN, N == Node].


produce_aae_progress_report() ->
VVSS =
lists:append(
[case sys:get_state(P) of
{active, _CoreVnodeState = {state, Idx, riak_kv_vnode, VSx, _, _, _, _, _, _, _, _}} ->
[{Idx, VSx}];
_ ->
[]
end || {_, P, _, _} <- supervisor:which_children(riak_core_vnode_sup)]),

[begin
AAECntrl = riak_kv_vnode:aae_controller(VNState),
TictacRebuilding = riak_kv_vnode:aae_rebuilding(VNState),

AAECntrlState = sys:get_state(AAECntrl),
KeyStore = aae_controller:get_key_store(AAECntrlState),

KeyStoreCurrentStatus = if is_pid(KeyStore) ->
element(1, aae_keystore:store_currentstatus(KeyStore));
el/=se ->
not_running
end,

{_, KeyStoreState} = sys:get_state(KeyStore),
LastRebuild = case aae_keystore:get_last_rebuild(KeyStoreState) of
never ->
never;
TS ->
calendar:now_to_local_time(TS)
end,
NextRebuild = calendar:now_to_local_time(
aae_controller:get_next_rebuild(AAECntrlState)),

TreeCaches = aae_controller:get_tree_caches(AAECntrlState),
TCStates = [sys:get_state(P) || {_, P} <- TreeCaches],
TotalDirtySegments = lists:sum(
[aae_treecache:dirty_segment_count(S) || S <- TCStates]),
InProgress = TictacRebuilding /= false,
Status =
case {LastRebuild, InProgress, NextRebuild} of
{never, false, Scheduled} when Scheduled /= undefined ->
unbuilt;
{Built, false, _} when Built /= never ->
built;
{Built, true, _} when Built /= never ->
rebuilding;
{never, true, _} ->
building
end,
[{partition, Idx},
{key_store_current_status, KeyStoreCurrentStatus},
{last_rebuild, time2s(LastRebuild)},
{next_rebuild, time2s(NextRebuild)},
{total_dirty_segments, TotalDirtySegments},
{controller_pid, list_to_binary(pid_to_list(AAECntrl))},
{status, Status}
]
end || {Idx, VNState} <- VVSS].


print_aae_progress_report(Options) ->
Report = produce_aae_progress_report(),
Format = proplists:get_value(format, Options),
aae_progress_report(Format, Report, Options).

aae_progress_report("json", Report, _) ->
io:format("~s\n", [mochijson2:encode(Report)]);

aae_progress_report("table", Report, Options) ->
ShowValue = extract_show(Options),
Show = [list_to_atom(A) || A <- ShowValue],
io:format("~52s ~10s ~21s ~20s ~15s ~16s\n", ["Partition ID", "Status", "Last Rebuild Date", "Next Rebuild Date", "Controller PID", "Key Store Status"]),
io:format("~52s ~10s ~21s ~20s ~15s ~16s\n", ["----------------------------------------------------", "----------", "---------------------", "---------------------", "----------------", "----------------"]),
[begin
Idx = proplists:get_value(partition, M),
LastRebuild = proplists:get_value(last_rebuild, M),
NextRebuild = proplists:get_value(next_rebuild, M),
ControllerPid = proplists:get_value(controller_pid, M),
Status = proplists:get_value(status, M),
KeyStoreCurrentStatus = proplists:get_value(key_store_current_status, M),
case lists:member(Status, Show) of
true ->
io:format("~52b ~10s ~21s ~20s ~15s ~16s\n",
[Idx, Status, LastRebuild, NextRebuild, ControllerPid, KeyStoreCurrentStatus]);
false ->
skip
end
end || M <- Report],
ok.

time2s(never) ->
never;
time2s({{LRY, LRMo, LRD}, {LRH, LRMi, LRS}}) ->
iolist_to_binary(
io_lib:format("~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B",
[LRY, LRMo, LRD, LRH, LRMi, LRS])).

tictacaae_cmd_optspecs() ->
[
{node, $n, "node", {string, atom_to_list(node())}, "Node, or all"},
{partition, $p, "partition", {string, "all"}, "Partition, or all"},
{format, undefined, "format", {string, "table"}, "table or json"},
{show, undefined, "show", {string, "unbuilt,rebuilding,building"}, "tree states to show"}
].
extract_nodes(Options) ->
NN = [N || {node, N} <- Options],
case lists:member("all", NN) of
true ->
[node() | nodes()];
false ->
lists:join(",", [list_to_existing_atom(N) || N <- NN])
end.
extract_partitions(Options) ->
PP = [P || {partition, P} <- Options],
case lists:member("all", PP) of
true ->
all;
false ->
[list_to_integer(P) || P <- PP]
end.
extract_show(Options) ->
PP = string:split(lists:flatten(lists:join(",", [P || {show, P} <- Options])), ",", all),
case lists:member("all", PP) of
true ->
["unbuilt", "rebuilding", "building", "built"];
false ->
PP
end.

ending([_]) -> "";
ending(_) -> "s".

%%%===================================================================
%%% Private
%%%===================================================================
Expand Down
32 changes: 30 additions & 2 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@
reformat_object/2,
stop_fold/1,
get_modstate/1,
aae_send/1]).
aae_send/1,
aae_schedule_nextrebuild/2,
aae_controller/1,
aae_rebuilding/1]).

%% riak_core_vnode API
-export([init/1,
Expand Down Expand Up @@ -539,6 +542,16 @@ when_loading_complete(AAECntrl, Preflists, PreflistFun, OnlyIfBroken) ->
skipped
end.

%% @doc Expose aae_controller, mainly for gathering items for `riak admin aae-progress-report`.
-spec aae_controller(#state{}) -> pid() | undefined.
aae_controller(#state{aae_controller = A}) ->
A.

%% @doc Expose tictac_rebuilding field, for `riak admin aae-progress-report`.
-spec aae_rebuilding(#state{}) -> erlang:timestamp() | false.
aae_rebuilding(#state{tictac_rebuilding = A}) ->
A.


%% @doc Reveal the underlying module state for testing
-spec get_modstate(state()) -> {module(), term()}.
Expand Down Expand Up @@ -609,14 +622,23 @@ tictacrebuild_complete(Vnode, StartTime, ProcessType) ->
erlang:timestamp(),
{atom(), non_neg_integer()}) -> ok.
%% @doc
%% Infor the vnode that an aae exchange is complete
%% Inform the vnode that an aae exchange is complete
tictacexchange_complete(Vnode, StartTime, ExchangeResult) ->
riak_core_vnode_master:command(Vnode,
{exchange_complete,
ExchangeResult,
StartTime},
riak_kv_vnode_master).

-spec aae_schedule_nextrebuild([{partition(), node()}], non_neg_integer()) -> ok.
%% @doc
%% Schedule the next rebuilding of tictac trees (emulate tick now, plus a delay)
aae_schedule_nextrebuild(Vnodes, Delay) ->
riak_core_vnode_master:command(Vnodes,
{schedule_nextrebuild, Delay},
riak_kv_vnode_master).


get(Preflist, BKey, ReqId) ->
%% Assuming this function is called from a FSM process
%% so self() == FSM pid
Expand Down Expand Up @@ -1203,6 +1225,12 @@ handle_command({exchange_complete, ExchangeResult, ST},
tictac_exchangetime = XT,
tictac_skiptick = 0}};

handle_command({schedule_nextrebuild, Delay},
_Sender, State) ->
AAECntrl = State#state.aae_controller,
ok = aae_controller:aae_schedulenextrebuild(AAECntrl, Delay),
{noreply, State};

handle_command({upgrade_hashtree, Node}, _, State=#state{hashtrees=HT}) ->
%% Make sure we dont kick off an upgrade during a possible handoff
case node() of
Expand Down
Loading