Skip to content

Commit

Permalink
Simplify RAFT storage fulfill and apply
Browse files Browse the repository at this point in the history
Summary:
Simplify the implementations of the RAFT storage commit `fulfill` and
`apply` operations by directly using `wa_raft_queue` APIs.

Differential Revision: D50330445

fbshipit-source-id: f82c93c0ed05862510efce07078d0de1c371f938
  • Loading branch information
hsun324 authored and facebook-github-bot committed Oct 16, 2023
1 parent 1548566 commit d031066
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions src/wa_raft_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,9 @@ handle_cast(cancel, #state{name = Name, table = Table, partition = Partition} =
wa_raft_queue:fulfill_all_reads(Table, Partition, {error, not_leader}),
{noreply, State};

handle_cast({fulfill, Ref, Return}, State0) ->
State1 = reply(Ref, Return, State0),
{noreply, State1};
handle_cast({fulfill, Ref, Reply}, #state{table = Table, partition = Partition} = State) ->
wa_raft_queue:fulfill_commit(Table, Partition, Ref, Reply),
{noreply, State};

handle_cast({read, From, Command}, #state{module = Module, handle = Handle, last_applied = Position} = State) ->
gen_server:reply(From, Module:storage_read(Command, Position, Handle)),
Expand Down Expand Up @@ -515,34 +515,28 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------

-spec apply_impl(Record :: wa_raft_log:log_record(), EffectiveTerm :: wa_raft_log:log_term() | undefined, State :: #state{}) -> NewState :: #state{}.
apply_impl({LogIndex, {LogTerm, {Ref, _} = Op}}, EffectiveTerm,
apply_impl({LogIndex, {LogTerm, {Reference, Command} = Op}}, EffectiveTerm,
#state{name = Name, table = Table, partition = Partition, last_applied = #raft_log_pos{index = LastAppliedIndex}} = State0) ->
wa_raft_queue:fulfill_apply(Table, Partition),
StartT = os:timestamp(),
case LogIndex of
LastAppliedIndex ->
apply_delayed_reads(State0);
_ when LogIndex =:= LastAppliedIndex + 1 ->
{Reply, State1} = storage_apply(#raft_log_pos{index = LogIndex, term = LogTerm}, Op, State0),
State2 = case LogTerm =:= EffectiveTerm of
true -> reply(Ref, Reply, State1);
false -> State1
end,
State3 = State2#state{last_applied = #raft_log_pos{index = LogIndex, term = LogTerm}},
State4 = apply_delayed_reads(State3),
?RAFT_COUNT('raft.storage.apply'),
{Reply, State1} = execute(Command, #raft_log_pos{index = LogIndex, term = LogTerm}, State0),
LogTerm =:= EffectiveTerm andalso
wa_raft_queue:fulfill_commit(Table, Partition, Reference, Reply),
State2 = State1#state{last_applied = #raft_log_pos{index = LogIndex, term = LogTerm}},
State3 = apply_delayed_reads(State2),
?LOG_DEBUG("applied ~p:~p", [LogIndex, LogTerm], #{domain => [whatsapp, wa_raft]}),
?RAFT_GATHER('raft.storage.apply.func', timer:now_diff(os:timestamp(), StartT)),
State4;
State3;
_ ->
?LOG_ERROR("[~p] received out-of-order apply with index ~p. (expected index ~p, op ~0P)", [Name, LogIndex, LastAppliedIndex, Op, 30], #{domain => [whatsapp, wa_raft]}),
error(out_of_order_apply)
end.

-spec storage_apply(wa_raft_log:log_pos(), wa_raft_acceptor:op(), #state{}) -> {term(), #state{}}.
storage_apply(LogPos, {_Ref, Command}, State) ->
?RAFT_COUNT('raft.storage.apply'),
execute(Command, LogPos, State).

-spec execute(Command :: wa_raft_acceptor:command(), LogPos :: wa_raft_log:log_pos(), State :: #state{}) -> {term() | error(), #state{}}.
execute(noop, LogPos, #state{module = Module, handle = Handle} = State) ->
{Reply, NewHandle} = Module:storage_apply(noop, LogPos, Handle),
Expand All @@ -565,11 +559,6 @@ execute(Command, LogPos, #state{module = Module, handle = Handle} = State) ->
{Reply, NewHandle} = Module:storage_apply(Command, LogPos, Handle),
{Reply, State#state{handle = NewHandle}}.

-spec reply(term(), term(), #state{}) -> #state{}.
reply(Ref, Reply, #state{table = Table, partition = Partition} = State) ->
wa_raft_queue:fulfill_commit(Table, Partition, Ref, Reply),
State.

-spec apply_delayed_reads(State :: #state{}) -> NewState :: #state{}.
apply_delayed_reads(#state{table = Table, partition = Partition, module = Module, handle = Handle, last_applied = #raft_log_pos{index = LastAppliedIndex} = LastAppliedLogPos} = State) ->
lists:foreach(
Expand Down

0 comments on commit d031066

Please sign in to comment.