Skip to content

Commit

Permalink
refactor: change default rpc module from 'gen_rpc' to 'rpc'
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Oct 17, 2023
1 parent 0e63666 commit cd6c890
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
21 changes: 11 additions & 10 deletions src/mria_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
, rocksdb_backend_available/0
]).

-include("mria_rlog.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -119,9 +120,9 @@ whoami() ->
role()
end.

-spec rpc_module() -> gen_rpc | rpc.
-spec rpc_module() -> ?GEN_RPC | ?ERL_RPC.
rpc_module() ->
persistent_term:get(?mria(rlog_rpc_module), gen_rpc).
persistent_term:get(?mria(rlog_rpc_module), ?DEFAULT_RPC_MODULE).

-spec core_rpc_retries() -> integer().
core_rpc_retries() ->
Expand Down Expand Up @@ -179,15 +180,15 @@ dirty_shard(Shard) ->
persistent_term:get(?is_dirty(Shard), false).

-spec set_shard_transport(mria_rlog:shard(), mria_rlog:transport()) -> ok.
set_shard_transport(Shard, Transport) when Transport =:= gen_rpc;
Transport =:= distr ->
set_shard_transport(Shard, Transport) when Transport =:= ?TRANSPORT_GEN_RPC;
Transport =:= ?TRANSPORT_ERL_DISTR->
ok = persistent_term:put(?shard_transport(Shard), Transport);
set_shard_transport(Shard, Transport) ->
error({badarg, Shard, Transport}).

-spec shard_transport(mria_rlog:shard()) -> mria_rlog:transport().
shard_transport(Shard) ->
Default = persistent_term:get(?mria(shard_transport), gen_rpc),
Default = persistent_term:get(?mria(shard_transport), ?TRANSPORT_ERL_DISTR),
persistent_term:get(?shard_transport(Shard), Default).

-spec set_shard_bootstrap_batch_size(mria_rlog:shard(), non_neg_integer()) -> ok.
Expand Down Expand Up @@ -253,12 +254,12 @@ get_extra_mnesia_diagnostic_checks() ->
-spec consistency_check() -> ok.
consistency_check() ->
case rpc_module() of
gen_rpc -> ok;
rpc -> ok
?GEN_RPC -> ok;
?ERL_RPC -> ok
end,
case persistent_term:get(?mria(shard_transport), gen_rpc) of
distr -> ok;
gen_rpc -> ok
case persistent_term:get(?mria(shard_transport), ?DEFAULT_SHARD_TRANSPORT) of
?TRANSPORT_ERL_DISTR -> ok;
?TRANSPORT_GEN_RPC -> ok
end,
case {backend(), role(), otp_is_compatible()} of
{mnesia, replicant, _} ->
Expand Down
14 changes: 7 additions & 7 deletions src/mria_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
, rpc_destination/0
]).

-include_lib("snabbkaffe/include/trace.hrl").
-include("mria_rlog.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("mnesia/src/mnesia.hrl").

-compile({inline, [node_from_destination/1]}).
Expand Down Expand Up @@ -99,10 +99,10 @@ make_key(undefined) ->
-spec rpc_call(rpc_destination(), module(), atom(), list()) -> term().
rpc_call(Destination, Module, Function, Args) ->
Result = case mria_config:rpc_module() of
rpc ->
?ERL_RPC ->
rpc:call(node_from_destination(Destination),
?MODULE, wrap_exception, [Module, Function, Args]);
gen_rpc ->
?GEN_RPC ->
gen_rpc:call(Destination,
?MODULE, wrap_exception, [Module, Function, Args])
end,
Expand All @@ -111,10 +111,10 @@ rpc_call(Destination, Module, Function, Args) ->
-spec rpc_call_nothrow(rpc_destination(), module(), atom(), list()) -> term().
rpc_call_nothrow(Destination, Module, Function, Args) ->
case mria_config:rpc_module() of
rpc ->
?ERL_RPC ->
rpc:call(node_from_destination(Destination),
Module, Function, Args);
gen_rpc ->
?GEN_RPC ->
gen_rpc:call(Destination,
Module, Function, Args)
end.
Expand Down Expand Up @@ -142,9 +142,9 @@ wrap_exception(Mod, Fun, Args) ->
-spec rpc_cast(rpc_destination(), module(), atom(), list()) -> term().
rpc_cast(Destination, Module, Function, Args) ->
case mria_config:rpc_module() of
rpc ->
?ERL_RPC ->
rpc:cast(node_from_destination(Destination), Module, Function, Args);
gen_rpc ->
?GEN_RPC ->
gen_rpc:cast(Destination, Module, Function, Args)
end.

Expand Down
2 changes: 1 addition & 1 deletion src/mria_rlog.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
%% cluster!
-type seqno() :: non_neg_integer().

-type transport() :: gen_rpc | distr.
-type transport() :: ?TRANSPORT_GEN_RPC | ?TRANSPORT_ERL_DISTR.

-type sync_reply_to() :: #?rlog_sync{reply_to :: reference(), shard :: shard()}.

Expand Down
8 changes: 8 additions & 0 deletions src/mria_rlog.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@
, checkpoint
}).

-define(ERL_RPC, rpc).
-define(GEN_RPC, gen_rpc).
-define(DEFAULT_RPC_MODULE, ?ERL_RPC).

-define(TRANSPORT_ERL_DISTR, distr).
-define(TRANSPORT_GEN_RPC, gen_rpc).
-define(DEFAULT_SHARD_TRANSPORT, ?TRANSPORT_ERL_DISTR).

-endif.
4 changes: 2 additions & 2 deletions src/mria_rlog_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ format_status(Status) ->

%% This function is called by the remote core node.
-spec push_tlog_entry(mria_rlog:transport(), mria_rlog:shard(), mria_lib:subscriber(), mria_rlog:entry()) -> ok.
push_tlog_entry(distr, _Shard, {_Node, Pid}, TLOGEntry) ->
push_tlog_entry(?TRANSPORT_ERL_DISTR, _Shard, {_Node, Pid}, TLOGEntry) ->
do_push_tlog_entry(Pid, TLOGEntry), %% Note: here Pid is remote
ok;
push_tlog_entry(gen_rpc, Shard, {Node, Pid}, TLOGEntry) ->
push_tlog_entry(?TRANSPORT_GEN_RPC, Shard, {Node, Pid}, TLOGEntry) ->
gen_rpc:ordered_cast({Node, Shard}, ?MODULE, do_push_tlog_entry, [Pid, TLOGEntry]),
ok.

Expand Down

0 comments on commit cd6c890

Please sign in to comment.