diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index c4a4db6c4a30..4c2379c134bd 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -898,6 +898,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define INET_OPT_RECVTTL 47 /* IP_RECVTTL ancillary data */ #define TCP_OPT_NOPUSH 48 /* super-Nagle, aka TCP_CORK */ #define INET_LOPT_TCP_READ_AHEAD 49 /* Read ahead of packet data */ +#define INET_LOPT_NON_BLOCK_SEND 50 /* Non-blocking send, only SCTP */ #define INET_LOPT_DEBUG 99 /* Enable/disable DEBUG for a socket */ /* SCTP options: a separate range, from 100: */ @@ -988,6 +989,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define INET_FLG_IS_IGNORED_RD (1 << 2) #define INET_FLG_IS_IGNORED_WR (1 << 3) #define INET_FLG_IS_IGNORED_PASS (1 << 4) +#define INET_FLG_NON_BLOCK_SEND (1 << 5) /* Currently only SCTP */ /* ** End of interface constants. @@ -1039,6 +1041,9 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define INET_IGNORE_READ (INET_FLG_IS_IGNORED|INET_FLG_IS_IGNORED_RD) #define INET_IGNORE_WRITE (INET_FLG_IS_IGNORED|INET_FLG_IS_IGNORED_WR) #define INET_IGNORE_PASSIVE (INET_FLG_IS_IGNORED|INET_FLG_IS_IGNORED_PASS) +#define IS_NON_BLOCK_SEND(desc) ((desc)->flags & INET_FLG_NON_BLOCK_SEND) +#define SET_NON_BLOCK_SEND(desc) ((desc)->flags & INET_FLG_NON_BLOCK_SEND) +#define CLEAR_NON_BLOCK_SEND(desc) ((desc)->flags & ~(INET_FLG_NON_BLOCK_SEND)) /* Max length of Erlang Term Buffer (for outputting structured terms): */ #ifdef HAVE_SCTP @@ -1256,6 +1261,7 @@ typedef struct { #endif int recv_cmsgflags; /* Which ancillary data to expect */ int debug; /* debug enabled or not */ + } inet_descriptor; @@ -7681,6 +7687,37 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) break; #endif + case INET_LOPT_NON_BLOCK_SEND: +#ifdef HAVE_SCTP + if (IS_SCTP(desc)) { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "inet_set_opts(non-block-send) -> %s\r\n", + __LINE__, desc->s, + driver_caller(desc->port), B2S(ival)) ); + if (ival) { + desc->flags |= INET_FLG_NON_BLOCK_SEND; + } else { + desc->flags = (desc->flags) & ~(INET_FLG_NON_BLOCK_SEND); + } + } else { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "inet_set_opts(non-block-send) -> IGNORE\r\n", + __LINE__, desc->s, + driver_caller(desc->port)) ); + } +#else + { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "inet_set_opts(non-block-send) -> IGNORE\r\n", + __LINE__, desc->s, + driver_caller(desc->port)) ); + } +#endif + continue; /* take care of next option */ + case INET_LOPT_DEBUG: DDBG(desc, ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " @@ -8719,6 +8756,33 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) } #endif + case INET_LOPT_NON_BLOCK_SEND: + if (IS_SCTP(desc)) + { + int ival = get_int32(curr); curr += 4; + + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "sctp_set_opts(non-block-send) -> %s\r\n", + __LINE__, desc->s, driver_caller(desc->port), + B2S(ival)) ); + if (ival) { + desc->flags |= INET_FLG_NON_BLOCK_SEND; + } else { + desc->flags = (desc->flags) & ~(INET_FLG_NON_BLOCK_SEND); + } + + res = 0; + } + else { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "sctp_set_opts(non-block-send) -> IGNORE\r\n", + __LINE__, desc->s, + driver_caller(desc->port)) ); + } + continue; /* take care of next option */ + case INET_LOPT_DEBUG: { int ival = get_int32(curr); curr += 4; @@ -9423,6 +9487,15 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, } #endif /* #ifdef __WIN32__ */ + case INET_LOPT_NON_BLOCK_SEND: + *ptr++ = opt; + if (IS_NON_BLOCK_SEND(desc)) + ival = TRUE; + else + ival = FALSE; + put_int32(ival, ptr); + continue; + case INET_LOPT_DEBUG: *ptr++ = opt; ival = desc->debug; @@ -9558,6 +9631,11 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, int eopt = *buf; /* "eopt" is 1-byte encoded */ buf ++; buflen --; + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "sctp_fill_opts -> opt: %d\r\n", + __LINE__, desc->s, driver_caller(desc->port), eopt) ); + switch(eopt) { /* Local options allowed for SCTP. For TCP and UDP, the values of @@ -9565,6 +9643,36 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, but here, we encode them as proper terms the same way as we do it for all other SCTP options: */ + case INET_LOPT_NON_BLOCK_SEND: + { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "sctp_fill_opts -> NON_BLOCK_SEND: %s\r\n", + __LINE__, desc->s, driver_caller(desc->port), + B2S(IS_NON_BLOCK_SEND(desc))) ); + + if (IS_NON_BLOCK_SEND(desc)) + i = LOAD_ATOM (spec, i, am_true); + else + i = LOAD_ATOM (spec, i, am_false); + break; + } + + case INET_LOPT_DEBUG: + { + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "sctp_fill_opts -> DEBUG: %s\r\n", + __LINE__, desc->s, driver_caller(desc->port), + B2S(desc->debug)) ); + + if (desc->debug) + i = LOAD_ATOM (spec, i, am_true); + else + i = LOAD_ATOM (spec, i, am_false); + break; + } + case INET_LOPT_BUFFER: { PLACE_FOR(spec, i, LOAD_ATOM_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT); @@ -14014,15 +14122,17 @@ static udp_descriptor* sctp_inet_copy(udp_descriptor* desc, SOCKET s, } /* Some flags must be inherited at this point */ - copy_desc->inet.mode = desc->inet.mode; - copy_desc->inet.exitf = desc->inet.exitf; - copy_desc->inet.deliver = desc->inet.deliver; - copy_desc->inet.htype = desc->inet.htype; - copy_desc->inet.psize = desc->inet.psize; - copy_desc->inet.stype = desc->inet.stype; - copy_desc->inet.sfamily = desc->inet.sfamily; - copy_desc->inet.hsz = desc->inet.hsz; - copy_desc->inet.bufsz = desc->inet.bufsz; + copy_desc->inet.mode = desc->inet.mode; + copy_desc->inet.exitf = desc->inet.exitf; + copy_desc->inet.deliver = desc->inet.deliver; + copy_desc->inet.htype = desc->inet.htype; + copy_desc->inet.psize = desc->inet.psize; + copy_desc->inet.stype = desc->inet.stype; + copy_desc->inet.sfamily = desc->inet.sfamily; + copy_desc->inet.hsz = desc->inet.hsz; + copy_desc->inet.bufsz = desc->inet.bufsz; + // Only inherit the NON-BLOCK-SEND flag + copy_desc->inet.flags = desc->inet.flags & INET_FLG_NON_BLOCK_SEND; /* The new port will be linked and connected to the owner */ port = driver_create_port(port, owner, "sctp_inet", @@ -14744,11 +14854,42 @@ static void packet_inet_command(ErlDrvData e, char* buf, ErlDrvSizeT len) #endif if (IS_SOCKET_ERROR(code)) { int err = sock_errno(); + + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "packet_inet_command -> send failed" + "\r\n error: %d (%T)" + "\r\n", + __LINE__, + desc->s, driver_caller(desc->port), + err, error_atom(err)) ); + if ((err != ERRNO_BLOCK) && (err != EINTR)) { inet_reply_error(desc, err); return; } + // else if (desc->nonBlockSend) { + else if (IS_NON_BLOCK_SEND(desc)) { + + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "packet_inet_command -> block|intr when non-block send" + "\r\n", + __LINE__, + desc->s, driver_caller(desc->port)) ); + + inet_reply_error(desc, err); + return; + } else { + + DDBG(desc, + ("INET-DRV-DBG[%d][" SOCKET_FSTR ",%T] " + "packet_inet_command -> block|intr send" + "\r\n", + __LINE__, + desc->s, driver_caller(desc->port)) ); + /* XXX if(! INET_IGNORED(INETP(desc))) */ sock_select(desc, (FD_WRITE|FD_CLOSE), 1); set_busy_port(desc->port, 1); diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam index 7f0879794845..1d4e2e5ae0f7 100644 Binary files a/erts/preloaded/ebin/prim_inet.beam and b/erts/preloaded/ebin/prim_inet.beam differ diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index e520f18c8475..f3d1f5928e95 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -1574,6 +1574,7 @@ enc_opt(line_delimiter) -> ?INET_LOPT_LINE_DELIM; enc_opt(raw) -> ?INET_OPT_RAW; enc_opt(bind_to_device) -> ?INET_OPT_BIND_TO_DEVICE; enc_opt(read_ahead) -> ?INET_LOPT_TCP_READ_AHEAD; +enc_opt(non_block_send) -> ?INET_OPT_NON_BLOCK_SEND; enc_opt(debug) -> ?INET_OPT_DEBUG; % Names of SCTP opts: enc_opt(sctp_rtoinfo) -> ?SCTP_OPT_RTOINFO; @@ -1646,6 +1647,7 @@ dec_opt(?INET_LOPT_LINE_DELIM) -> line_delimiter; dec_opt(?INET_OPT_RAW) -> raw; dec_opt(?INET_OPT_BIND_TO_DEVICE) -> bind_to_device; dec_opt(?INET_LOPT_TCP_READ_AHEAD) -> read_ahead; +dec_opt(?INET_OPT_NON_BLOCK_SEND) -> non_block_send; dec_opt(?INET_OPT_DEBUG) -> debug; dec_opt(I) when is_integer(I) -> undefined. @@ -1759,6 +1761,7 @@ type_opt_1(netns) -> binary; type_opt_1(show_econnreset) -> bool; type_opt_1(bind_to_device) -> binary; type_opt_1(read_ahead) -> bool; +type_opt_1(non_block_send) -> bool; type_opt_1(debug) -> bool; %% %% SCTP options (to be set). If the type is a record type, the corresponding @@ -2070,7 +2073,7 @@ type_value_2(binary_or_uint,Int) %% Type-checking of SCTP options type_value_2(sctp_assoc_id, X) when X band 16#ffffffff =:= X -> true; -type_value_2(_, _) -> false. +type_value_2(_T, _V) -> false. diff --git a/lib/kernel/src/gen_sctp.erl b/lib/kernel/src/gen_sctp.erl index fd2c642d92cf..c32562d52d0b 100644 --- a/lib/kernel/src/gen_sctp.erl +++ b/lib/kernel/src/gen_sctp.erl @@ -133,8 +133,15 @@ by calling `inet:setopts/2`. They can be retrieved using `inet:getopts/2`. larger than `val(recbuf)`. Setting this option also adjusts the size of the driver buffer (see `buffer` above). +### [](){: #option_non_block_send } + +- **`{non_block_send, boolean()}`** - A send call that would otherwise block (hang), + will instead immediately return with e.g. `{error, eagain}` + *if* this option has been set to `true`. + Defaults to `false`. + - **`{sctp_module, module()}`** - Overrides which callback module is used. - Defaults to `inet_sctp` for IPv4 and `inet6_sctp` for IPv6. +Defaults to `inet_sctp` for IPv4 and `inet6_sctp` for IPv6. - **`{sctp_rtoinfo, #sctp_rtoinfo{}}`** @@ -683,6 +690,7 @@ future associations". -type elementary_option() :: {active, true | false | once | -32768..32767} | {buffer, non_neg_integer()} | + {non_block_send, boolean()} | {debug, boolean()} | {dontroute, boolean()} | {exclusiveaddruse, boolean()} | @@ -713,6 +721,7 @@ future associations". -type elementary_option_name() :: active | buffer | + non_block_send | debug | dontroute | exclusiveaddruse | @@ -1502,6 +1511,14 @@ and context (passed to the local SCTP layer), which can be used, for example, for error identification. However, such a fine grained user control is rarely required. The function [`send/4`](`send/4`) is sufficient for most applications. + +> #### Note {: .info } +> +> Send is normally blocking, but if the socket option +> [`non_block_send`](#option_non_block_send) is set to true, +> the function will return with e.g. {error, eagain} +> in the case when the function would otherwise block. +> It is then up to the user to try again later. """. -spec send(Socket, SndRcvInfo, Data) -> ok | {error, Reason} when Socket :: sctp_socket(), @@ -1528,6 +1545,14 @@ Sends a `Data` message on the association `Assoc` and `Stream`. [`#sctp_assoc_change{}`](#record-sctp_assoc_change) record from an association establishment, or as the `t:assoc_id/0` `t:integer/0` field value. + +> #### Note {: .info } +> +> Send is normally blocking, but if the socket option +> [`non_block_send`](#option_non_block_send) is set to true, +> the function will return with e.g. {error, eagain} +> in the case when the function would otherwise block. +> It is then up to the user to try again later. """. -spec send(Socket, Assoc, Stream, Data) -> ok | {error, Reason} when Socket :: sctp_socket(), diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl index 155bf4759bd4..7f0ba07a9601 100644 --- a/lib/kernel/src/inet.erl +++ b/lib/kernel/src/inet.erl @@ -3359,6 +3359,7 @@ sctp_options() -> % Other options are SCTP-specific (though they may be similar to their % TCP and UDP counter-parts): + non_block_send, sctp_rtoinfo, sctp_associnfo, sctp_initmsg, sctp_autoclose, sctp_nodelay, sctp_disable_fragments, sctp_i_want_mapped_v4_addr, sctp_maxseg, sctp_primary_addr, @@ -3369,13 +3370,17 @@ sctp_options() -> -doc false. sctp_options(Opts, Mod) -> + %% ?DBG([{opts, Opts}, {mod, Mod}]), case sctp_opt(Opts, Mod, #sctp_opts{}, sctp_options()) of - {ok,SO} -> + {ok, SO} -> {ok,SO#sctp_opts{opts=lists:reverse(SO#sctp_opts.opts)}}; - Error -> Error + Error -> + %% ?DBG([{error, Error}]), + Error end. sctp_opt([Opt|Opts], Mod, #sctp_opts{ifaddr = IfAddr} = R, As) -> + %% ?DBG([{opt, Opt}]), case Opt of %% what if IfAddr is already a map (=sockaddr)? %% Shall we allow ifaddr as a list of sockaddr? @@ -3425,6 +3430,7 @@ sctp_opt([Opt|Opts], Mod, #sctp_opts{ifaddr = IfAddr} = R, As) -> sctp_opt(Opts, Mod, R#sctp_opts { opts = [{active,N}|NOpts] }, As); {Name,Val} -> + %% ?DBG([{name, Name}, {val, Val}]), sctp_opt(Opts, Mod, R, As, Name, Val); _ -> @@ -3438,6 +3444,7 @@ sctp_opt([], _Mod, #sctp_opts{ifaddr=IfAddr}=R, _SockOpts) -> end. sctp_opt(Opts, Mod, #sctp_opts{} = R, As, Name, Val) -> + %% ?DBG([{opts, Opts}, {mod, Mod}, {name, Name}, {val, Val}]), case add_opt(Name, Val, R#sctp_opts.opts, As) of {ok,SocketOpts} -> sctp_opt(Opts, Mod, R#sctp_opts{opts=SocketOpts}, As); @@ -3466,17 +3473,22 @@ sctp_module(Opts) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% add_opt(Name, Val, Opts, As) -> + %% ?DBG([{name, Name}, {val, Val}, {opts, Opts}, {as, As}]), case lists:member(Name, As) of true -> + %% ?DBG(['is sockopt_val']), case prim_inet:is_sockopt_val(Name, Val) of true when Name =:= raw -> {ok, [{Name,Val} | Opts]}; true -> Opts1 = lists:keydelete(Name, 1, Opts), {ok, [{Name,Val} | Opts1]}; - false -> {error,badarg} + false -> + %% ?DBG(['false']), + {error,badarg} end; - false -> {error,badarg} + false -> + {error,badarg} end. diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl index 76b3f3ce023c..beab45fd19c5 100644 --- a/lib/kernel/src/inet_int.hrl +++ b/lib/kernel/src/inet_int.hrl @@ -168,6 +168,7 @@ -define(INET_OPT_RECVTTL, 47). -define(TCP_OPT_NOPUSH, 48). -define(INET_LOPT_TCP_READ_AHEAD, 49). +-define(INET_OPT_NON_BLOCK_SEND, 50). -define(INET_OPT_DEBUG, 99). % Specific SCTP options: separate range: -define(SCTP_OPT_RTOINFO, 100). diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl index 51f71d859d98..4831f3da9aaf 100644 --- a/lib/kernel/test/gen_sctp_SUITE.erl +++ b/lib/kernel/test/gen_sctp_SUITE.erl @@ -32,7 +32,13 @@ -export([ skip_old_solaris/1, basic/1, - api_open_close/1,api_listen/1,api_connect_init/1,api_connectx_init/1,api_opts/1, + + api_open_close/1, + api_listen/1, + api_connect_init/1, + api_connectx_init/1, + api_opts/1, + xfer_min/1,xfer_active/1,def_sndrcvinfo/1,implicit_inet6/1, open_multihoming_ipv4_socket/1, open_unihoming_ipv6_socket/1, @@ -49,7 +55,9 @@ t_simple_link_local_sockaddr_in_send_recv/1, t_simple_local_sockaddr_in6_send_recv/1, t_simple_link_local_sockaddr_in6_send_recv/1, - t_simple_local_sockaddr_in_connectx_init/1 + t_simple_local_sockaddr_in_connectx_init/1, + + non_block_send/1 ]). suite() -> @@ -64,7 +72,9 @@ all() -> [ {group, smoke}, {group, G}, - {group, sockaddr}]. + {group, sockaddr}, + {group, misc} + ]. groups() -> [ @@ -72,7 +82,9 @@ groups() -> {old_solaris, [], old_solaris_cases()}, {extensive, [], extensive_cases()}, - {sockaddr, [], sockaddr_cases()} + {sockaddr, [], sockaddr_cases()}, + + {misc, [], misc_cases()} ]. smoke_cases() -> @@ -110,6 +122,12 @@ sockaddr_cases() -> t_simple_local_sockaddr_in_connectx_init ]. +misc_cases() -> + [ + non_block_send + ]. + + %% This (Config) was ignored before, why? init_per_suite(Config0) -> @@ -2446,7 +2464,255 @@ t_simple_local_sockaddr_in_connectx_init(Config) when is_list(Config) -> ok. -%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +non_block_send(Config) when is_list(Config) -> + Cond = fun() -> ok end, + Pre = fun() -> case ?WHICH_LOCAL_ADDR(inet) of + {ok, Addr} -> + Addr; + {error, Reason} -> + throw({skip, Reason}) + end + end, + TC = fun(Addr) -> do_non_block_send(Config, Addr) end, + Post = fun(_) -> ok end, + ?TC_TRY(?FUNCTION_NAME, Cond, Pre, TC, Post). + +do_non_block_send(_Config, Addr) -> + {Server, Port} = nbs_server_start(Addr), + Client = nbs_client_start(Addr, Port), + ok = nbs_await_client_blocked(Client), + nbs_command_continue("server", Server), + ok = nbs_await_server_recv(Server), + nbs_command_continue("client", Client), + nbs_command_continue("server", Server), + case nbs_await_client_done(Client) of + ok -> + nbs_server_stop(Server), + ok; + {error, Reason} -> + nbs_server_stop(Server), + exit(Reason) + end. + +nbs_command_continue(Who, Pid) -> + ?P("[ctrl] command ~s continue", [Who]), + Pid ! {?MODULE, self(), continue}. + +nbs_await_server_recv(Server) -> + ?P("[ctrl] await server recv"), + receive + {?MODULE, Server, recv} -> + ?P("[ctrl] server recv"), + ok + end. + +nbs_await_client_blocked(Client) -> + ?P("[ctrl] await client blocked"), + receive + {?MODULE, Client, blocked} -> + ?P("[ctrl] client blocked"), + ok + end. + +nbs_await_client_done(Client) -> + ?P("[ctrl] await client done"), + receive + {'EXIT', Client, normal} -> + ?P("[ctrl] client done"), + ok; + {'EXIT', Client, Reason} -> + ?P("[ctrl] client failed: " + "~n ~p", [Reason]), + {error, Reason} + end. + + +nbs_server_start(Addr) -> + Self = self(), + Pid = spawn_link(fun() -> nbs_server_init(Self, Addr) end), + receive + {?MODULE, Pid, {ok, Port}} when is_integer(Port) -> + {Pid, Port} + end. + +nbs_server_stop(Server) -> + ?P("[ctrl] command server stop"), + Server ! {?MODULE, self(), stop}, + ?P("[ctrl] await server stopped"), + receive + {'EXIT', Server, normal} -> + ?P("[ctrl] server stopped"), + ok; + {'EXIT', Server, Reason} -> + ?P("[ctrl] server failed: " + "~n ~p", [Reason]), + {error, Reason} + end. + +nbs_server_init(Parent, Addr) -> + ?P("[server] Try create socket"), + case gen_sctp:open(0, [{recbuf, 16384}, {ip, Addr}, {debug, true}] ) of + {ok, S} -> + {ok, [false]} = inet:getopts(S, [non_block_send]), + {ok, Port} = inet:port(S), + ?P("[server] Try make listen"), + ok = gen_sctp:listen(S, true), + Parent ! {?MODULE, self(), {ok, Port}}, + nbs_server_main(Parent, S); + {error, Reason} -> + exit({server, Reason}) + end. + +nbs_server_main(Parent, Sock) -> + ?P("[server] await controller continue command"), + receive + {?MODULE, Parent, continue} -> + ?P("[server] received controller continue command"), + ok + end, + nbs_server_loop(Parent, Sock, true), + ?P("[server] await controller stop command"), + receive + {?MODULE, Parent, stop} -> + ?P("[server] received controller stop command"), + gen_sctp:close(Sock) + end. + +nbs_server_loop(Parent, Sock, Retry) -> + case gen_sctp:recv(Sock, 5000) of + {ok, {FromIP, FromPort, AncData, Data}} when is_binary(Data) -> + ?P("[server] received data: " + "~n From IP: ~p" + "~n From Port: ~p" + "~n Anc Data: ~p" + "~n sz(Data): ~p", + [FromIP, FromPort, AncData, byte_size(Data)]), + nbs_server_loop(Parent, Sock, Retry); + {ok, {FromIP, FromPort, AncData, Data}} -> + ?P("[server] received data: " + "~n From IP: ~p" + "~n From Port: ~p" + "~n Anc Data: ~p" + "~n Data: ~p", + [FromIP, FromPort, AncData, Data]), + nbs_server_loop(Parent, Sock, Retry); + {error, timeout} when (Retry =:= true) -> + ?P("[server] receive timeout"), + Parent ! {?MODULE, self(), recv}, + ?P("[server] await controller contue command"), + receive + {?MODULE, Parent, continue} -> + ?P("[server] received continue command from controller"), + ok + end, + nbs_server_loop(Parent, Sock, false); + {error, timeout} -> + ?P("[server] receive timeout => done"), + ok; + {error, Reason} -> + exit({server_recv, Reason}) + end. + + +nbs_client_start(Addr, Port) -> + Self = self(), + Pid = spawn_link(fun() -> nbs_client_init(Self, Addr, Port) end), + receive + {?MODULE, Pid, ok} -> + Pid + end. + +nbs_client_init(Parent, Addr, Port) -> + OOpts = [{ip, Addr}, {non_block_send, true}], + ?P("[client] Try create socket"), + case gen_sctp:open(0, OOpts) of + {ok, S} -> + {ok, [true]} = inet:getopts(S, [non_block_send]), + COpts = [{sctp_initmsg, #sctp_initmsg{num_ostreams = 5}}], + ?P("[client] Try connect to server"), + case gen_sctp:connect(S, Addr, Port, COpts) of + {ok, Assoc} -> + ?P("[client] assoc created"), + inet:setopts(S, [{debug, true}]), + Parent ! {?MODULE, self(), ok}, + Data = + list_to_binary( + lists:flatten( + lists:duplicate(1024, "HejHoppDuGlade"))), + nbs_client_loop(Parent, S, Assoc, 0, 0, Data); + {error, CReason} -> + ?P("[client] failed connect: " + "~n Reason: ~p", [CReason]), + gen_sctp:close(S), + exit({client_connect, CReason}) + end; + {error, OReason} -> + ?P("[client] failed create socket: " + "~n Reason: ~p", [OReason]), + exit({client_open, OReason}) + end. + +nbs_client_loop(Parent, S, Assoc, NumWrites, NumBytes, Data) -> + nbs_client_loop(Parent, S, Assoc, NumWrites, NumBytes, Data, true). + +nbs_client_loop(Parent, S, Assoc, NumWrites, NumBytes, Data, true) -> + ?P("[client,true] try send when" + "~n Num Writes: ~p" + "~n Num Bytes: ~p", [NumWrites, NumBytes]), + %% Should we use all the streams? + case gen_sctp:send(S, Assoc, 0, Data) of + ok -> + nbs_client_loop(Parent, S, Assoc, + NumWrites+1, + NumBytes + byte_size(Data), + Data, true); + {error, eagain} -> + ?P("[client] send blocked when" + "~n Num Writes: ~p" + "~n Num Bytes: ~p", [NumWrites, NumBytes]), + Parent ! {?MODULE, self(), blocked}, + ?P("[client] await continue command from controller"), + receive + {?MODULE, Parent, continue} -> + ?P("[client] received continue command from controller"), + ok + end, + nbs_client_loop(Parent, S, Assoc, + NumWrites, + NumBytes, + Data, false); + {error, Reason} -> + ?P("[client] failed sending message: " + "~n Reason: ~p", [Reason]), + gen_sctp:close(S), + exit({send_failed, Reason}) + end; +nbs_client_loop(_Parent, S, Assoc, NumWrites, NumBytes, Data, false) -> + ?P("[client,false] try send when" + "~n Num Writes: ~p" + "~n Num Bytes: ~p", [NumWrites, NumBytes]), + case gen_sctp:send(S, Assoc, 0, Data) of + ok -> + ?P("[client] sent final message - final option verification"), + {ok, [true]} = inet:getopts(S, [non_block_send]), + ok = inet:setopts(S, [{non_block_send, false}]), + {ok, [false]} = inet:getopts(S, [non_block_send]), + ?P("[client] close socket"), + gen_sctp:close(S), + ?P("[client] done"), + exit(normal); + {error, Reason} -> + ?P("[client] failed sending last message: " + "~n Reason: ~p", [Reason]), + gen_sctp:close(S), + exit({send_failed, Reason}) + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% socket_pair_open(Addr, StartTime, Timeout) -> S1 = socket_open([{ip,Addr}], Timeout), @@ -2735,6 +3001,7 @@ match_unless_solaris(A, B) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + is_net_supported() -> try net:info() of #{} ->