Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport erlang/otp#7110 to wa/otp/26.0/main, allow dist input_handler to use an internal port #1

Open
wants to merge 5 commits into
base: wa/otp/26.0/main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions erts/doc/src/erlang.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1927,16 +1927,22 @@ end</code>
channel.
</p>
<note><p>
Only the process registered as distribution
controller for the distribution channel identified by
<c><anno>DHandle</anno></c> is allowed to call this
function.
When the distribution controller for the distribution
channel identified by <c><anno>DHandle</anno></c> is a
process, it is the only process allowed to call this
function.
This function is also allowed to be called when the
distribution controller for the distribution channel
identified by <c><anno>DHandle</anno></c> is a port.
The data received by the port should in this case be delivered to
the process identified by <c><anno>InputHandler</anno></c>
which in turn should call
<seemfa marker="erlang#dist_ctrl_put_data/2"><c>erlang:dist_ctrl_put_data/2</c></seemfa>.
</p></note>
<p>
This function is used when implementing an alternative
distribution carrier using processes as distribution
controllers. <c><anno>DHandle</anno></c> is retrieved
via the callback
distribution carrier. <c><anno>DHandle</anno></c> is
retrieved via the callback
<seeguide marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seeguide>.
More information can be found in the documentation of
<seeguide marker="erts:alt_dist#distribution_module">ERTS
Expand Down Expand Up @@ -1967,9 +1973,8 @@ end</code>
</p></note>
<p>
This function is used when implementing an alternative
distribution carrier using processes as distribution
controllers. <c><anno>DHandle</anno></c> is retrieved
via the callback
distribution carrier. <c><anno>DHandle</anno></c> is
retrieved via the callback
<seeguide marker="erts:alt_dist#hs_data_f_handshake_complete"><c>f_handshake_complete</c></seeguide>.
More information can be found in the documentation of
<seeguide marker="erts:alt_dist#distribution_module">ERTS
Expand Down
4 changes: 2 additions & 2 deletions erts/emulator/beam/dist.c
Original file line number Diff line number Diff line change
Expand Up @@ -4386,13 +4386,13 @@ dist_get_stat_1(BIF_ALIST_1)
BIF_RETTYPE
dist_ctrl_input_handler_2(BIF_ALIST_2)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
Uint32 conn_id;
DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);

if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);

if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
if ((ERTS_PROC_GET_DIST_ENTRY(BIF_P) != dep) && !is_internal_port(dep->cid))
BIF_ERROR(BIF_P, BADARG);

if (is_not_internal_pid(BIF_ARG_2))
Expand Down
63 changes: 61 additions & 2 deletions lib/ssl/test/inet_epmd_cryptcookie_inet_ktls.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ accept_controller(_NetAddress, Controller, Socket) ->

%% ------------------------------------------------------------
accepted(NetAddress, _Timer, Socket) ->
inet_epmd_dist:hs_data(NetAddress, Socket).
input_handler_setup(
inet_epmd_dist:hs_data(NetAddress, Socket)).

%% ------------------------------------------------------------
connect(NetAddress, _Timer, Options) ->
Expand All @@ -123,7 +124,8 @@ connect(NetAddress, _Timer, Options) ->
inet_ktls_info(Socket, cryptcookie:ktls_info(CipherState)),
ok ?= inet_tls_dist:set_ktls(KtlsInfo),
ok ?= inet:setopts(Socket, [{packet, 2}, {mode, list}]),
inet_epmd_dist:hs_data(NetAddress, Socket)
input_handler_setup(
inet_epmd_dist:hs_data(NetAddress, Socket))
else
{error, _} = Error ->
Error
Expand Down Expand Up @@ -204,6 +206,63 @@ stream_controlling_process(Stream = {_, [_ | Socket], _}, Pid) ->
erlang:error({?MODULE, ?FUNCTION_NAME, Reason})
end.

%% ------------------------------------------------------------

input_handler_setup(#hs_data{} = HsData) ->
case init:get_argument(inet_ktls) of
{ok, [["port"]]} -> % No input_handler process
%% Just use the distribution port
HsData;
{ok, [["input_handler"]]} -> % Set up an input_handler process
%% Add an f_handshake_complete fun that spawns the input handler
%% and calls the f_setopts_post_nodeup fun
#hs_data{
socket = Socket,
f_setopts_post_nodeup = FSetoptsPostNodeup} = HsData,
HsData#hs_data{
f_setopts_post_nodeup =
fun (S) when S =:= Socket ->
ok
end,
f_handshake_complete =
fun (S, _Node, DHandle) when S =:= Socket ->
handshake_complete(S, FSetoptsPostNodeup, DHandle)
end}
end;
input_handler_setup({error, _} = Error) ->
Error.

handshake_complete(Socket, FSetoptsPostNodeup, DHandle) ->
InputHandler =
spawn_link(
fun () ->
input_handler(Socket, DHandle)
end),
ok = ?DRIVER:controlling_process(Socket, InputHandler),
ok = erlang:dist_ctrl_input_handler(DHandle, InputHandler),
ok = FSetoptsPostNodeup(Socket).

input_handler(Socket, DHandle) ->
receive
{tcp, Socket, Data} ->
erlang:dist_ctrl_put_data(DHandle, Data);
{tcp_error, Socket, _Error} = Reason ->
?DRIVER:close(Socket),
exit(Reason);
{tcp_closed, Socket} = Reason ->
?DRIVER:close(Socket),
exit(Reason);
Other ->
Reason = {unexpected_message, Other},
error_report([?FUNCTION_NAME, {reason, Reason}]),
input_handler(Socket, DHandle)
end.

%% ------------------------------------------------------------

error_report(Report) ->
error_logger:error_report(Report).

%% ------------------------------------------------------------
supported() ->
maybe
Expand Down
21 changes: 19 additions & 2 deletions lib/ssl/test/ssl_dist_bench_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ groups() ->
{cryptcookie_socket_ktls, categories()},
{dist_cryptcookie_inet, categories()},
{cryptcookie_inet_ktls, categories()},
{cryptcookie_inet_ktls_ih, categories()},
%%
%% categories()
{setup, [{repeat, 1}],
Expand Down Expand Up @@ -110,7 +111,8 @@ cryptcookie_backends() ->
[{group, dist_cryptcookie_socket},
{group, cryptcookie_socket_ktls},
{group, dist_cryptcookie_inet},
{group, cryptcookie_inet_ktls}].
{group, cryptcookie_inet_ktls},
{group, cryptcookie_inet_ktls_ih}].

categories() ->
[{group, setup},
Expand Down Expand Up @@ -290,7 +292,22 @@ init_per_group(cryptcookie_inet_ktls, Config) ->
ok ->
[{ssl_dist, false}, {ssl_dist_prefix, "Crypto-Inet-kTLS"},
{ssl_dist_args,
"-proto_dist inet_epmd -inet_epmd cryptcookie_inet_ktls"}
"-proto_dist inet_epmd -inet_epmd cryptcookie_inet_ktls "
"-inet_ktls port"}
| Config];
Problem ->
{skip, Problem}
catch
Class : Reason : Stacktrace ->
{fail, {Class, Reason, Stacktrace}}
end;
init_per_group(cryptcookie_inet_ktls_ih, Config) ->
try inet_epmd_cryptcookie_inet_ktls:supported() of
ok ->
[{ssl_dist, false}, {ssl_dist_prefix, "Crypto-Inet-kTLS-IH"},
{ssl_dist_args,
"-proto_dist inet_epmd -inet_epmd cryptcookie_inet_ktls "
"-inet_ktls input_handler"}
| Config];
Problem ->
{skip, Problem}
Expand Down