Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nhse d34 nhskv.i33 token #35

Merged
merged 38 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
79a0e55
Initial support for OTP26
martinsumner May 2, 2024
246774a
Remove tracers
martinsumner May 2, 2024
6c46b9d
Update rebar.config
martinsumner May 11, 2024
1d04e59
Update rebar.config
martinsumner May 14, 2024
38907f2
Token Manager
martinsumner May 29, 2024
4e3489d
Add strong conditional check to PB API
martinsumner May 29, 2024
b615d2e
Reset state on PB connection after completion
martinsumner May 29, 2024
cb53d98
Add downstream recording of state
martinsumner May 30, 2024
deb3ce3
Fix uniquenes of node list
martinsumner May 31, 2024
6854977
Refactor riak_kv_pb_object
martinsumner May 31, 2024
47bd106
Move strong conditional check to FSM (#34)
martinsumner May 31, 2024
39a3e87
Add support to HTTP API
martinsumner May 31, 2024
09fcda2
Conditional PUT to require GET not HEAD
martinsumner Jun 5, 2024
56b6833
Add configuration
martinsumner Jun 5, 2024
a0b027b
Add profile function
martinsumner Jun 5, 2024
ef61d8b
Add profiler
martinsumner Jun 6, 2024
a11d4d2
Merge branch 'nhse-d34-otp26' into nhse-d34-nhskv.i30-profiler
martinsumner Jun 6, 2024
664b6e5
Type fix
martinsumner Jun 6, 2024
5b8cb9f
Add to extending list of defaults
martinsumner Jun 7, 2024
346f276
Merge branch 'nhse-d34-nhskv.i30-profiler' into nhse-d34-nhskv.i33-token
martinsumner Jun 12, 2024
87cf5b1
Initial write-up of riak_kv_token_manager
martinsumner Jun 12, 2024
5c18813
Refactor to use monitoring
martinsumner Jun 13, 2024
665d467
Change to config parameters
martinsumner Jun 13, 2024
2f95b40
Remove duplicate
martinsumner Jun 13, 2024
55615f7
Rename (again), make messages async
martinsumner Jun 16, 2024
30a871b
Clarify erpc errors
martinsumner Jun 17, 2024
7758686
Use token_manager to control downstream messages
martinsumner Jun 18, 2024
ce10992
Add GC process
martinsumner Jun 19, 2024
140f883
Avoid re-fetching objects (#36)
martinsumner Jun 19, 2024
c4a033f
Merge branch 'nhse-d34-nhskv.i33-token' of https://github.com/nhs-ria…
martinsumner Jun 19, 2024
09ad183
Extend eunit test of token_manager
martinsumner Jun 19, 2024
10de288
Try and keep github formatter happy
martinsumner Jun 19, 2024
11caa89
Use monitor rather than spoof 'DOWN'
martinsumner Jun 19, 2024
6ba1736
Use uniq code in OTP 24
martinsumner Jun 19, 2024
8f9b5e0
Update comments for perceived clarity
martinsumner Jun 21, 2024
904fd69
Merge branch 'nhse-develop-3.4' into nhse-d34-nhskv.i33-token
martinsumner Sep 23, 2024
3a102b3
Remove double-definition on merge
martinsumner Sep 23, 2024
e09bfe2
Merge branch 'openriak-3.4' into nhse-d34-nhskv.i33-token
martinsumner Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@
%% required in this mode).
%%
%% With basic_consensus, tokens can still be granted in a wide range of failure
%% scenarios, but with a risk of duplicate grants, in aprticular should a
%% scenarios, but with a risk of duplicate grants, in particular should a
%% cluster be partitioned.
%%
%% No mode provides strict guarantees, including primary_consensus, especially
Expand Down
9 changes: 5 additions & 4 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ process(
GetOpts =
make_options(
[
{n_val, N_val},
{sloppy_quorum, SloppyQuorum},
{basic_quorum, true},
{return_body, false},
{deleted_vclock, true}
Expand All @@ -333,7 +331,9 @@ process(
{ok, [{condition_check, Condition}], Token};
_ ->
?LOG_WARNING(
"Fallback to weak check as no token available"
"Fallback to weak check as no token available "
"for ~p ~p",
[B, K]
),
{CheckR, PutOpts} =
riak_kv_put_fsm:conditional_check(
Expand All @@ -346,7 +346,8 @@ process(
{NotMod, NoneMatch, false, false} ->
GetOpts =
make_option(n_val, N_val) ++
make_option(sloppy_quorum, SloppyQuorum),
make_option(sloppy_quorum, SloppyQuorum) ++
make_option(timeout, Timeout),
{CheckR, PutOpts} =
riak_kv_put_fsm:conditional_check(
riak_client:get(B, K, GetOpts, C),
Expand Down
267 changes: 201 additions & 66 deletions src/riak_kv_token_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
%%
%% riak_kv_token_session process may request a grant. The session process
%% should monnitor the token_manager to which it makes a request, and fail with
%% the fialure of that token_manager. A request may be granted immediately, or
%% the failure of that token_manager. A request may be granted immediately, or
%% potentially queued to be granted when the token is next released.
%%
%% The grant may be refused if the a previous token has been granted and now
Expand Down Expand Up @@ -63,7 +63,7 @@
%% monitor every riak_kv_token_session process to which it has made a grant,
%% and release that grant on the process terminating for any reason.
%%
%% The riak_kv_token_manager has no awareness of other ndoes in the cluster.
%% The riak_kv_token_manager has no awareness of other nodes in the cluster.
%% The riak_kv_token_session request logic should be aware of which
%% riak_kv_token_manager is responsible for granting a given token. In Riak
%% this is done using the preflist based on the hash of the token key. The
Expand Down Expand Up @@ -102,12 +102,21 @@

-define(ASSOCIATION_CHECK_TIMEOUT, 5000).

-ifdef(TEST).
-define(SWEEP_DELAY, 1000).
-else.
-define(SWEEP_DELAY, 10000).
-endif.


-record(state,
{
grants = maps:new() :: grant_map(),
queues = maps:new() :: request_queues(),
associations = maps:new() :: #{session_pid() => token_id()},
monitored_managers = [] :: list(manager_mon())
monitored_managers = [] :: list(manager_mon()),
last_sweep_grants = sets:new([{version, 2}]) ::
sets:set({token_id(), granted_session()})
}
).

Expand Down Expand Up @@ -240,6 +249,7 @@ grants() ->
%%%============================================================================

init(_Args) ->
erlang:send_after(?SWEEP_DELAY, self(), sweep),
{ok, #state{}}.

handle_call(stats, _From, State) ->
Expand Down Expand Up @@ -303,11 +313,6 @@ handle_cast({request, TokenID, VerifyList, Session}, State) ->
associations = UpdAssocs
}
};
{upstream, {Manager, UpstreamSession}} ->
Session ! refused,
F = fun() -> check_upstream(TokenID, Manager, UpstreamSession) end,
spawn(F),
{noreply, State};
_ ->
Session ! refused,
{noreply, State}
Expand Down Expand Up @@ -384,27 +389,23 @@ handle_cast({downstream_check, TokenID, {Manager, Session}}, State) ->
)
}
};
Block ->
{upstream, {BlockingMgr, _PrevSession}} when BlockingMgr == Manager ->
%% Trust the upstream manager knows what they're doing. This is
%% likely to be as a result of message misordering
gen_server:cast(
Manager, {downstream_reply, TokenID, Session, false}
Manager, {downstream_reply, TokenID, Session, true}
),
case Block of
{upstream, {BlockingManager, BlockingSession}} ->
F =
fun() ->
check_upstream(
TokenID, BlockingManager, BlockingSession
)
end,
spawn(F),
?LOG_INFO(
"Prompting upstream check due to block of "
"downstream_check and block is matching ~w ~w",
[BlockingManager == Manager, BlockingSession == Session]
);
_ ->
ok
end,
UpdGrants =
maps:put(
TokenID,
{upstream, {Manager, Session}},
State#state.grants
),
{noreply, State#state{grants = UpdGrants}};
_Block ->
gen_server:cast(
Manager, {downstream_reply, TokenID, Session, false}
),
{noreply, State}
end;
handle_cast({downstream_reply, TokenID, Session, true}, State) ->
Expand Down Expand Up @@ -522,43 +523,60 @@ handle_info({'DOWN', Ref, process, Pid, Reason}, State) ->
}
end;
_ ->
%% No association is assumed to be a token manager
%% Need to remove all upstream grants associated with this
%% manager
?LOG_WARNING(
"Remote Token Manager ~w reported down due to ~p monitored ~p",
[
Pid,
Reason,
lists:member({Pid, Ref}, State#state.monitored_managers)
]
),
FilterFun =
fun({_Tid, G}) ->
case G of
{upstream, {UpstreamPid, _Session}}
when UpstreamPid == Pid ->
false;
_ ->
true
end
end,
UpdGrants =
maps:from_list(
lists:filter(
FilterFun,
maps:to_list(State#state.grants)
)
),
UpdMonitors =
lists:delete({Pid, Ref}, State#state.monitored_managers),
{noreply,
State#state{
grants = UpdGrants,
monitored_managers = UpdMonitors
}
}
case lists:member({Pid, Ref}, State#state.monitored_managers) of
true ->
?LOG_WARNING(
"Remote Token Manager ~w reported down due to ~p",
[Pid, Reason]
),
FilterFun =
fun({_Tid, G}) ->
case G of
{upstream, {UpstreamPid, _Session}}
when UpstreamPid == Pid ->
false;
_ ->
true
end
end,
UpdGrants =
maps:from_list(
lists:filter(
FilterFun,
maps:to_list(State#state.grants)
)
),
UpdMonitors =
lists:delete(
{Pid, Ref},
State#state.monitored_managers
),
{noreply,
State#state{
grants = UpdGrants,
monitored_managers = UpdMonitors
}
};
false ->
?LOG_INFO(
"Session ~w cleared for ~w but not present",
[Pid, Reason]
),
{noreply, State}
end
end;
handle_info(sweep, State) ->
erlang:send_after(
rand:uniform(?SWEEP_DELAY) + ?SWEEP_DELAY div 2,
self(),
sweep
),
LastSweep = State#state.last_sweep_grants,
ThisSweep =
sets:from_list(maps:to_list(State#state.grants), [{version, 2}]),
StillPresent = sets:to_list(sets:intersection(LastSweep, ThisSweep)),
check_active(StillPresent, self()),
{noreply, State#state{last_sweep_grants = ThisSweep}};
handle_info(_Msg, State) ->
{noreply, State}.

Expand Down Expand Up @@ -613,8 +631,31 @@ monitor_manager(Manager, MonitoredManagers) ->
[{Manager, MgrRef}|MonitoredManagers]
end.

-spec check_upstream(token_id(), manager_pid(), session_pid()) -> ok.
check_upstream(TokenID, RemoteManager, Session) ->
-spec check_active(list({token_id(), granted_session()}), pid()) -> ok.
check_active([], _Mgr) ->
ok;
check_active([{_TokenID, {local, Session, _VL, _VC}}|Rest], Mgr) ->
case is_process_alive(Session) of
true ->
check_active(Rest, Mgr);
false ->
martinsumner marked this conversation as resolved.
Show resolved Hide resolved
Mgr ! {'DOWN', self(), process, Session, inactive},
check_active(Rest, Mgr)
end;
check_active([{TokenID, {upstream, {UpstreamMgr, UpstreamSess}}}|Rest], Mgr) ->
_P = check_upstream_async(TokenID, UpstreamMgr, UpstreamSess, Mgr),
check_active(Rest, Mgr).


-spec check_upstream_async(
token_id(), manager_pid(), session_pid(), manager_pid()) -> pid().
check_upstream_async(TokenID, RemoteManager, Session, Mgr) ->
F = fun() -> check_upstream(TokenID, RemoteManager, Session, Mgr) end,
spawn(F).

-spec check_upstream(
token_id(), manager_pid(), session_pid(), manager_pid()) -> ok.
check_upstream(TokenID, RemoteManager, Session, Mgr) ->
RemoteNode = node(RemoteManager),
{UpstreamAssociated, Reason} =
case RemoteNode of
Expand All @@ -640,7 +681,7 @@ check_upstream(TokenID, RemoteManager, Session) ->
[RemoteNode, TokenID, Reason]
),
gen_server:cast(
?MODULE,
Mgr,
{downstream_release, TokenID, {RemoteManager, Session}}
)
end.
Expand All @@ -655,6 +696,100 @@ check_upstream(TokenID, RemoteManager, Session) ->

-include_lib("eunit/include/eunit.hrl").

gc_test_() ->
{timeout, 60, fun gc_tester/0}.

gc_tester() ->
{ok, Mgr1} = gen_server:start(?MODULE, [], []),
{ok, Mgr2} = gen_server:start(?MODULE, [], []),
{ok, Mgr3} = gen_server:start(?MODULE, [], []),
Req1 = requestor_fun(self(), <<"T1">>, Mgr1, [Mgr2, Mgr3]),
Req2 = requestor_fun(self(), <<"T2">>, Mgr2, [Mgr3]),
S1 = spawn(Req1),
timer:sleep(1),
S2 = spawn(Req2),
timer:sleep(1),
S3 = spawn(Req2),
F1 = fun() -> {1, 0, 1} == gen_server:call(Mgr1, stats) end,
F2 = fun() -> {2, 1, 2} == gen_server:call(Mgr2, stats) end,
F3 = fun() -> {2, 0, 0} == gen_server:call(Mgr3, stats) end,
wait_until(F1, 10, 1),
wait_until(F2, 10, 1),
wait_until(F3, 10, 1),
G1A = maps:to_list(gen_server:call(Mgr1, grants)),
G2A = maps:to_list(gen_server:call(Mgr2, grants)),
G3A = maps:to_list(gen_server:call(Mgr3, grants)),
ok = check_active(G1A, Mgr1),
ok = check_active(G2A, Mgr2),
ok = check_active(G3A, Mgr3),
?assert(F1()),
?assert(F2()),
?assert(F3()),

timer:sleep(3 * ?SWEEP_DELAY),
State1 = sys:get_state(Mgr1),
?assertMatch(1, sets:size(State1#state.last_sweep_grants)),
State2 = sys:get_state(Mgr2),
?assertMatch(2, sets:size(State2#state.last_sweep_grants)),
State3 = sys:get_state(Mgr3),
?assertMatch(2, sets:size(State3#state.last_sweep_grants)),

?assert(F1()),
?assert(F2()),
?assert(F3()),

receive Gr1 -> ?assertMatch({granted, S1}, Gr1) end,
receive Gr2 -> ?assertMatch({granted, S2}, Gr2) end,
Mgr1 ! {'DOWN', self(), process, S1, inactive},
F1A = fun() -> {0, 0, 0} == gen_server:call(Mgr1, stats) end,
F2A = fun() -> {1, 1, 2} == gen_server:call(Mgr2, stats) end,
F3A = fun() -> {1, 0, 0} == gen_server:call(Mgr3, stats) end,
wait_until(F1A, 10, 1),
wait_until(F2A, 10, 1),
wait_until(F3A, 10, 1),
S1 ! terminate,
F1A(),
F2A(),
F3A(),
Mgr2 ! {'DOWN', self(), process, S2, inactive},
receive Gr3 -> ?assertMatch({granted, S3}, Gr3) end,
F2B = fun() -> {1, 0, 1} == gen_server:call(Mgr2, stats) end,
F3B = fun() -> {1, 0, 0} == gen_server:call(Mgr3, stats) end,
wait_until(F2B, 10, 1),
wait_until(F3B, 10, 1),
S2 ! terminate,
?assert(receive _ -> false after 10 -> true end),
F2B(),
F3B(),
S3 ! terminate,
?assert(receive _ -> false after 10 -> true end),

gen_server:stop(Mgr1),
gen_server:stop(Mgr2),
gen_server:stop(Mgr3).

already_dead_test() ->
{ok, Mgr1} = gen_server:start(?MODULE, [], []),
DeadPid1 = spawn(fun() -> ok end),
DeadPid2 = spawn(fun() -> ok end),
T1 = <<"T1">>,
VL = [],
ok = gen_server:cast(Mgr1, {request, T1, VL, DeadPid1}),
ok = gen_server:cast(Mgr1, {request, T1, VL, DeadPid2}),
F = fun() -> {0, 0, 0} == gen_server:call(Mgr1, stats) end,
?assert(wait_until(F, ?SWEEP_DELAY div 2, 1)),
gen_server:stop(Mgr1).


wait_until(F, Wait, _RetrySleep) when Wait =< 0 ->
F();
wait_until(F, Wait, RetrySleep) ->
timer:sleep(RetrySleep),
case F() of
true -> true;
false -> wait_until(F, Wait - RetrySleep, RetrySleep)
end.

manager_simple_test() ->
{ok, Mgr1} = gen_server:start(?MODULE, [], []),
{ok, Mgr2} = gen_server:start(?MODULE, [], []),
Expand Down
1 change: 0 additions & 1 deletion src/riak_kv_wm_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ accept_doc_body(
GetOpts =
[
{basic_quorum, true},
{sloppy_quorum, true},
{return_body, false},
{deleted_vclock, true}
],
Expand Down