Skip to content

Commit

Permalink
validate raft server is operating on expected config version for adju…
Browse files Browse the repository at this point in the history
…st membership operations

Summary:
This prepares `wa_raft_server` for receiving adjust_membership commands that include the client perceived config index.
If the server has a different config index, it rejects the membership operation.

Differential Revision: D50945640

fbshipit-source-id: b4f45b3eac79b7d4c83c859b3dbd631fc37e5fc4
  • Loading branch information
Sarah Hassan authored and facebook-github-bot committed Nov 7, 2023
1 parent d2966e3 commit 70c4199
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 33 deletions.
24 changes: 12 additions & 12 deletions include/wa_raft_rpc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@

-define(RAFT_COMMAND(Type, Payload), {command, Type, Payload}).

-define(COMMIT_COMMAND(Op), ?RAFT_COMMAND(commit, Op)).
-define(READ_COMMAND(Op), ?RAFT_COMMAND(read, Op)).
-define(COMMIT_COMMAND(Op), ?RAFT_COMMAND(commit, Op)).
-define(READ_COMMAND(Op), ?RAFT_COMMAND(read, Op)).

-define(STATUS_COMMAND, ?RAFT_COMMAND(status, undefined)).
-define(PROMOTE_COMMAND(Term, Force, Config), ?RAFT_COMMAND(promote, {Term, Force, Config})).
-define(RESIGN_COMMAND, ?RAFT_COMMAND(resign, undefined)).
-define(STATUS_COMMAND, ?RAFT_COMMAND(status, undefined)).
-define(PROMOTE_COMMAND(Term, Force, Config), ?RAFT_COMMAND(promote, {Term, Force, Config})).
-define(RESIGN_COMMAND, ?RAFT_COMMAND(resign, undefined)).

-define(ADJUST_MEMBERSHIP_COMMAND(Action, Peer), ?RAFT_COMMAND(adjust_membership, {Action, Peer})).
-define(ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_COMMAND(adjust_membership, {Action, Peer, ConfigIndex})).

-define(SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_COMMAND(snapshot_available, {Root, Position})).
-define(SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_COMMAND(snapshot_available, {Root, Position})).

-define(HANDOVER_CANDIDATES_COMMAND, ?RAFT_COMMAND(handover_candidates, undefined)).
-define(HANDOVER_COMMAND(Peer), ?RAFT_COMMAND(handover, Peer)).
-define(HANDOVER_CANDIDATES_COMMAND, ?RAFT_COMMAND(handover_candidates, undefined)).
-define(HANDOVER_COMMAND(Peer), ?RAFT_COMMAND(handover, Peer)).

-define(ENABLE_COMMAND, ?RAFT_COMMAND(enable, undefined)).
-define(DISABLE_COMMAND(Reason), ?RAFT_COMMAND(disable, Reason)).
-define(WITNESS_COMMAND(), ?RAFT_COMMAND(witness, undefined)).
-define(ENABLE_COMMAND, ?RAFT_COMMAND(enable, undefined)).
-define(DISABLE_COMMAND(Reason), ?RAFT_COMMAND(disable, Reason)).
-define(WITNESS_COMMAND(), ?RAFT_COMMAND(witness, undefined)).
65 changes: 44 additions & 21 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
resign/1,
refresh_config/1,
adjust_membership/3,
adjust_membership/4,
handover/1,
handover/2,
handover_candidates/1,
Expand Down Expand Up @@ -163,7 +164,7 @@
-type status_command() :: ?STATUS_COMMAND.
-type promote_command() :: ?PROMOTE_COMMAND(wa_raft_log:log_term(), boolean(), config() | undefined).
-type resign_command() :: ?RESIGN_COMMAND.
-type adjust_membership_command() :: ?ADJUST_MEMBERSHIP_COMMAND(membership_action(), peer() | undefined).
-type adjust_membership_command() :: ?ADJUST_MEMBERSHIP_COMMAND(membership_action(), peer() | undefined, wa_raft_log:log_index() | undefined).
-type snapshot_available_command() :: ?SNAPSHOT_AVAILABLE_COMMAND(string(), wa_raft_log:log_pos()).
-type handover_candidates_command() :: ?HANDOVER_CANDIDATES_COMMAND.
-type handover_command() :: ?HANDOVER_COMMAND(node()).
Expand Down Expand Up @@ -303,11 +304,25 @@ resign(Pid) ->

-spec refresh_config(Name :: atom() | pid()) -> {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error().
refresh_config(Name) ->
gen_statem:call(Name, ?ADJUST_MEMBERSHIP_COMMAND(refresh, undefined), ?RAFT_RPC_CALL_TIMEOUT()).

-spec adjust_membership(Name :: atom() | pid(), Action :: add | remove | add_witness | remove_witness, Peer :: peer()) -> {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error().
gen_statem:call(Name, ?ADJUST_MEMBERSHIP_COMMAND(refresh, undefined, undefined), ?RAFT_RPC_CALL_TIMEOUT()).

-spec adjust_membership(
Name :: atom() | pid(),
Action :: add | remove | add_witness | remove_witness,
Peer :: peer()
) ->
{ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error().
adjust_membership(Name, Action, Peer) ->
gen_statem:call(Name, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer), ?RAFT_RPC_CALL_TIMEOUT()).
adjust_membership(Name, Action, Peer, undefined).

-spec adjust_membership(
Name :: atom() | pid(),
Action :: add | remove | add_witness | remove_witness,
Peer :: peer(),
ConfigIndex :: wa_raft_log:log_index() | undefined
) -> {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error().
adjust_membership(Name, Action, Peer, ConfigIndex) ->
gen_statem:call(Name, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_RPC_CALL_TIMEOUT()).

-spec handover_candidates(Name :: atom() | pid()) -> {ok, Candidates :: [node()]} | wa_raft:error().
handover_candidates(Name) ->
Expand Down Expand Up @@ -923,7 +938,7 @@ leader({call, From}, ?RESIGN_COMMAND, #raft_state{name = Name, log_view = View,
{next_state, follower, clear_leader(?FUNCTION_NAME, State#raft_state{log_view = NewView}), {reply, From, ok}};

%% [Adjust Membership] Leader attempts to commit a single-node membership change.
leader(Type, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer),
leader(Type, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ExpectedConfigIndex),
#raft_state{name = Name, log_view = View0, storage = Storage,
current_term = CurrentTerm, last_applied = LastApplied} = State0) ->
% Try to adjust the configuration according to the current request.
Expand Down Expand Up @@ -951,18 +966,26 @@ leader(Type, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer),
reply(Type, {error, rejected}),
{keep_state, State0};
false ->
% Now that we have completed all the required checks, the leader is free to
% attempt to change the config. We heartbeat immediately to make the change as
% soon as possible.
Op = {make_ref(), {config, NewConfig}},
{ok, LogIndex, View1} = wa_raft_log:append(View0, [{CurrentTerm, Op}]),
?LOG_NOTICE("Server[~0p, term ~0p, leader] appended configuration change from ~0p to ~0p at log index ~p.",
[Name, CurrentTerm, Config, NewConfig, LogIndex], #{domain => [whatsapp, wa_raft]}),
State1 = State0#raft_state{log_view = View1},
State2 = apply_single_node_cluster(State1),
State3 = append_entries_to_followers(State2),
reply(Type, {ok, #raft_log_pos{index = LogIndex, term = CurrentTerm}}),
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
case ExpectedConfigIndex =/= undefined andalso ExpectedConfigIndex =/= LogConfigIndex of
true ->
?LOG_NOTICE("Server[~0p, term ~0p, leader] rejecting request to ~p peer ~p because it has a different config index than expected (log: ~p, expected: ~p).",
[Name, CurrentTerm, Action, Peer, LogConfigIndex, ExpectedConfigIndex], #{domain => [whatsapp, wa_raft]}),
reply(Type, {error, rejected}),
{keep_state, State0};
false ->
% Now that we have completed all the required checks, the leader is free to
% attempt to change the config. We heartbeat immediately to make the change as
% soon as possible.
Op = {make_ref(), {config, NewConfig}},
{ok, LogIndex, View1} = wa_raft_log:append(View0, [{CurrentTerm, Op}]),
?LOG_NOTICE("Server[~0p, term ~0p, leader] appended configuration change from ~0p to ~0p at log index ~p.",
[Name, CurrentTerm, Config, NewConfig, LogIndex], #{domain => [whatsapp, wa_raft]}),
State1 = State0#raft_state{log_view = View1},
State2 = apply_single_node_cluster(State1),
State3 = append_entries_to_followers(State2),
reply(Type, {ok, #raft_log_pos{index = LogIndex, term = CurrentTerm}}),
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)}
end
end;
{ok, _OtherTerm} ->
?LOG_NOTICE("Server[~0p, term ~0p, leader] rejecting request to ~p peer ~p because it has not established current term commit quorum.",
Expand Down Expand Up @@ -1615,9 +1638,9 @@ command(StateName, {call, From}, ?RESIGN_COMMAND, #raft_state{name = Name, curre
[Name, CurrentTerm, StateName], #{domain => [whatsapp, wa_raft]}),
{keep_state_and_data, {reply, From, {error, not_leader}}};
%% [AdjustMembership] Non-leader nodes cannot adjust their config.
command(StateName, Type, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer), #raft_state{name = Name, current_term = CurrentTerm} = State) when StateName =/= leader ->
?LOG_NOTICE("Server[~0p, term ~0p, ~0p] cannot ~p peer ~p because we are not leader.",
[Name, CurrentTerm, StateName, Action, Peer], #{domain => [whatsapp, wa_raft]}),
command(StateName, Type, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), #raft_state{name = Name, current_term = CurrentTerm} = State) when StateName =/= leader ->
?LOG_NOTICE("Server[~0p, term ~0p, ~0p] cannot ~p peer ~p config index ~p because we are not leader.",
[Name, CurrentTerm, StateName, Action, Peer, ConfigIndex], #{domain => [whatsapp, wa_raft]}),
reply(Type, {error, not_leader}),
{keep_state, State};
%% [Snapshot Available] Follower and candidate nodes might switch to stalled to install snapshot.
Expand Down

0 comments on commit 70c4199

Please sign in to comment.