diff --git a/cmd/tools/vtest-self.v b/cmd/tools/vtest-self.v index 83d2d53bc31f75..0575416e5d94c7 100644 --- a/cmd/tools/vtest-self.v +++ b/cmd/tools/vtest-self.v @@ -268,8 +268,6 @@ const ( 'vlib/v/tests/orm_joined_tables_select_test.v', 'vlib/v/tests/orm_handle_error_for_select_from_not_created_table_test.v', 'vlib/net/websocket/ws_test.v', - 'vlib/net/unix/unix_test.v', - 'vlib/net/unix/use_net_and_net_unix_together_test.v', 'vlib/net/websocket/websocket_test.v', 'vlib/net/openssl/openssl_compiles_test.c.v', 'vlib/net/http/request_test.v', diff --git a/vlib/net/net_nix.c.v b/vlib/net/net_nix.c.v index 26f824f0ffaa93..c8604d616d45bf 100644 --- a/vlib/net/net_nix.c.v +++ b/vlib/net/net_nix.c.v @@ -13,7 +13,7 @@ module net const is_windows = false -fn error_code() int { +pub fn error_code() int { return C.errno } @@ -24,7 +24,9 @@ pub const ( msg_nosignal = 0x4000 ) -const ( +pub const ( error_ewouldblock = C.EWOULDBLOCK error_einprogress = C.EINPROGRESS ) + +fn C.unlink(&char) int diff --git a/vlib/net/net_windows.c.v b/vlib/net/net_windows.c.v index 0e595c5efa39cf..db895c7330d68b 100644 --- a/vlib/net/net_windows.c.v +++ b/vlib/net/net_windows.c.v @@ -739,7 +739,7 @@ pub fn wsa_error(code int) WsaError { return unsafe { WsaError(code) } } -const ( +pub const ( error_ewouldblock = WsaError.wsaewouldblock error_einprogress = WsaError.wsaeinprogress ) @@ -757,7 +757,7 @@ const ( ) // Error code returns the last socket error -fn error_code() int { +pub fn error_code() int { return C.WSAGetLastError() } diff --git a/vlib/net/socket_options.c.v b/vlib/net/socket_options.c.v index 3ee78acd23c85f..6fe00394a3ff16 100644 --- a/vlib/net/socket_options.c.v +++ b/vlib/net/socket_options.c.v @@ -21,7 +21,7 @@ pub enum SocketOption { ipv6_only = C.IPV6_V6ONLY } -const ( +pub const ( opts_bool = [SocketOption.broadcast, .debug, .dont_route, .error, .keep_alive, .oob_inline] opts_int = [ .receive_buf_size, diff --git a/vlib/net/unix/aasocket.c.v b/vlib/net/unix/aasocket.c.v index 506830b046ca95..78e2302bf4a9c4 100644 --- a/vlib/net/unix/aasocket.c.v +++ b/vlib/net/unix/aasocket.c.v @@ -4,8 +4,12 @@ import net const use_net = net.no_timeout -// 104 for macos, 108 for linux => use the minimum -const max_sun_path = 104 +const max_sun_path = $if windows { + 256 +} $else { + // 104 for macos, 108 for linux => use the minimum + 104 +} // Select represents a select operation enum Select { @@ -13,10 +17,3 @@ enum Select { write except } - -// SocketType are the available sockets -// enum SocketType { -// dgram = C.SOCK_DGRAM -// stream = C.SOCK_STREAM -// seqpacket = C.SOCK_SEQPACKET -// } diff --git a/vlib/net/unix/common.c.v b/vlib/net/unix/common.c.v index 0909cac59dc0eb..5711ed42a54d2b 100644 --- a/vlib/net/unix/common.c.v +++ b/vlib/net/unix/common.c.v @@ -3,12 +3,27 @@ module unix import time import net -const error_ewouldblock = C.EWOULDBLOCK - -fn C.SUN_LEN(ptr &C.sockaddr_un) int +const ( + // no_deadline should be given to functions when no deadline is wanted (i.e. all functions + // return instantly) + no_deadline = time.Time{ + unix: 0 + } + // no_timeout should be given to functions when no timeout is wanted (i.e. all functions + // return instantly) + no_timeout = time.Duration(0) + // infinite_timeout should be given to functions when an infinite_timeout is wanted (i.e. functions + // only ever return with data) + infinite_timeout = time.infinite +) fn C.strncpy(&char, &char, int) +// close a socket, given its file descriptor `handle`. +pub fn close(handle int) ! { + net.close(handle)! +} + // shutdown shutsdown a socket, given its file descriptor `handle`. // By default it shuts it down in both directions, both for reading // and for writing. You can change that using `net.shutdown(handle, how: .read)` @@ -17,11 +32,6 @@ pub fn shutdown(handle int, config net.ShutdownConfig) int { return net.shutdown(handle, config) } -// close a socket, given its file descriptor `handle`. -pub fn close(handle int) ! { - net.close(handle)! -} - // Select waits for an io operation (specified by parameter `test`) to be available fn @select(handle int, test Select, timeout time.Duration) !bool { set := C.fd_set{} @@ -60,32 +70,48 @@ fn @select(handle int, test Select, timeout time.Duration) !bool { return C.FD_ISSET(handle, &set) != 0 } -// wait_for_common wraps the common wait code -fn wait_for_common(handle int, deadline time.Time, timeout time.Duration, test Select) ! { - if deadline.unix == 0 { - // do not accept negative timeout - if timeout < 0 { - return net.err_timed_out - } - ready := @select(handle, test, timeout)! - if ready { - return +[inline] +fn select_deadline(handle int, test Select, deadline time.Time) !bool { + // if we have a 0 deadline here then the timeout that was passed was infinite... + infinite := deadline.unix_time() == 0 + for infinite || time.now() <= deadline { + timeout := if infinite { net.infinite_timeout } else { deadline - time.now() } + ready := @select(handle, test, timeout) or { + if err.code() == 4 { + // Spurious wakeup from signal, keep waiting + continue + } + + // NOT a spurious wakeup + return err } - return net.err_timed_out + + return ready } - // Convert the deadline into a timeout - // and use that - d_timeout := deadline.unix - time.now().unix - if d_timeout < 0 { - // deadline is in the past so this has already - // timed out - return net.err_timed_out + + // Deadline elapsed + return net.err_timed_out +} + +// wait_for_common wraps the common wait code +fn wait_for_common(handle int, deadline time.Time, timeout time.Duration, test Select) ! { + // Convert timeouts to deadlines + real_deadline := if timeout == net.infinite_timeout { + time.unix(0) + } else if timeout <= 0 { + // No timeout set, so assume deadline + deadline + } else { + // timeout + time.now().add(timeout) } - ready := @select(handle, test, d_timeout)! + ready := select_deadline(handle, test, real_deadline)! + if ready { return } + return net.err_timed_out } @@ -99,26 +125,6 @@ fn wait_for_read(handle int, deadline time.Time, timeout time.Duration) ! { return wait_for_common(handle, deadline, timeout, .read) } -// no_deadline should be given to functions when no deadline is wanted (i.e. all functions -// return instantly) -const ( - no_deadline = time.Time{ - unix: 0 - } -) - -// no_timeout should be given to functions when no timeout is wanted (i.e. all functions -// return instantly) -const ( - no_timeout = time.Duration(0) -) - -// infinite_timeout should be given to functions when an infinite_timeout is wanted (i.e. functions -// only ever return with data) -const ( - infinite_timeout = time.infinite -) - [inline] fn wrap_read_result(result int) !int { if result != 0 { diff --git a/vlib/net/unix/stream.c.v b/vlib/net/unix/stream.c.v new file mode 100644 index 00000000000000..be853433b5f52c --- /dev/null +++ b/vlib/net/unix/stream.c.v @@ -0,0 +1,501 @@ +module unix + +import time +import os +import net + +const ( + unix_default_read_timeout = 30 * time.second + unix_default_write_timeout = 30 * time.second + connect_timeout = 5 * time.second + msg_nosignal = 0x4000 +) + +[heap] +pub struct StreamConn { +pub mut: + sock StreamSocket +mut: + handle int + write_deadline time.Time + read_deadline time.Time + read_timeout time.Duration + write_timeout time.Duration + is_blocking bool +} + +// connect_stream returns a SOCK_STREAM connection for an unix domain socket on `socket_path` +pub fn connect_stream(socket_path string) !&StreamConn { + if socket_path.len >= max_sun_path { + return error('Socket path too long! Max length: ${max_sun_path - 1} chars.') + } + mut s := new_stream_socket(socket_path) or { + return error('${err.msg()}; could not create new unix socket') + } + + s.connect(socket_path)! + + return &StreamConn{ + sock: s + read_timeout: unix.unix_default_read_timeout + write_timeout: unix.unix_default_write_timeout + } +} + +// close closes the connection +pub fn (mut c StreamConn) close() ! { + $if trace_unix ? { + eprintln(' StreamConn.close | c.sock.handle: ${c.sock.handle:6}') + } + c.sock.close()! +} + +// write_ptr blocks and attempts to write all data +pub fn (mut c StreamConn) write_ptr(b &u8, len int) !int { + $if trace_unix_sock_handle ? { + eprintln('>>> StreamConn.write_ptr | c: ${ptr_str(c)} | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} | len: ${len}') + } + $if trace_unix ? { + eprintln( + '>>> StreamConn.write_ptr | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} len: ${len} |\n' + + unsafe { b.vstring_with_len(len) }) + } + $if trace_unix_data_write ? { + eprintln( + '>>> StreamConn.write_ptr | data.len: ${len:6} | hex: ${unsafe { b.vbytes(len) }.hex()} | data: ' + + unsafe { b.vstring_with_len(len) }) + } + unsafe { + mut ptr_base := &u8(b) + mut total_sent := 0 + for total_sent < len { + ptr := ptr_base + total_sent + remaining := len - total_sent + mut sent := $if is_coroutine ? { + C.photon_send(c.sock.handle, ptr, remaining, net.msg_nosignal, c.write_timeout) + } $else { + C.send(c.sock.handle, ptr, remaining, net.msg_nosignal) + } + $if trace_unix_data_write ? { + eprintln('>>> UnixConn.write_ptr | data chunk, total_sent: ${total_sent:6}, remaining: ${remaining:6}, ptr: ${voidptr(ptr):x} => sent: ${sent:6}') + } + if sent < 0 { + code := net.error_code() + if code == int(net.error_ewouldblock) { + c.wait_for_write()! + continue + } else { + net.wrap_error(code)! + } + } + total_sent += sent + } + return total_sent + } +} + +// write blocks and attempts to write all data +pub fn (mut c StreamConn) write(bytes []u8) !int { + return c.write_ptr(bytes.data, bytes.len) +} + +// write_string blocks and attempts to write all data +pub fn (mut c StreamConn) write_string(s string) !int { + return c.write_ptr(s.str, s.len) +} + +// read_ptr attempts to write all data +pub fn (mut c StreamConn) read_ptr(buf_ptr &u8, len int) !int { + mut res := $if is_coroutine ? { + wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + } $else { + wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + } + $if trace_unix ? { + eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') + } + if res > 0 { + $if trace_unix_data_read ? { + eprintln( + '<<< StreamConn.read_ptr | 1 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' + + unsafe { buf_ptr.vstring_with_len(res) }) + } + return res + } + code := net.error_code() + if code == int(net.error_ewouldblock) { + c.wait_for_read()! + res = $if is_coroutine ? { + wrap_read_result(C.photon_recv(c.sock.handle, voidptr(buf_ptr), len, 0, c.read_timeout))! + } $else { + wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! + } + $if trace_unix ? { + eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') + } + $if trace_unix_data_read ? { + if res > 0 { + eprintln( + '<<< StreamConn.read_ptr | 2 data.len: ${res:6} | hex: ${unsafe { buf_ptr.vbytes(res) }.hex()} | data: ' + + unsafe { buf_ptr.vstring_with_len(res) }) + } + } + return net.socket_error(res) + } else { + net.wrap_error(code)! + } + return error('none') +} + +// read data into `buf` +pub fn (mut c StreamConn) read(mut buf []u8) !int { + return c.read_ptr(buf.data, buf.len) +} + +// read_deadline returns the read deadline +pub fn (mut c StreamConn) read_deadline() !time.Time { + if c.read_deadline.unix == 0 { + return c.read_deadline + } + return error('none') +} + +// set_read_deadlien sets the read deadline +pub fn (mut c StreamConn) set_read_deadline(deadline time.Time) { + c.read_deadline = deadline +} + +// write_deadline returns the write deadline +pub fn (mut c StreamConn) write_deadline() !time.Time { + if c.write_deadline.unix == 0 { + return c.write_deadline + } + return error('none') +} + +// set_write_deadline sets the write deadline +pub fn (mut c StreamConn) set_write_deadline(deadline time.Time) { + c.write_deadline = deadline +} + +// read_timeout returns the read timeout +pub fn (c &StreamConn) read_timeout() time.Duration { + return c.read_timeout +} + +// set_read_timeout sets the read timeout +pub fn (mut c StreamConn) set_read_timeout(t time.Duration) { + c.read_timeout = t +} + +// write_timeout returns the write timeout +pub fn (c &StreamConn) write_timeout() time.Duration { + return c.write_timeout +} + +// set_write_timout sets the write timeout +pub fn (mut c StreamConn) set_write_timeout(t time.Duration) { + c.write_timeout = t +} + +// wait_for_read blocks until the socket is ready to read +[inline] +pub fn (mut c StreamConn) wait_for_read() ! { + return wait_for_read(c.sock.handle, c.read_deadline, c.read_timeout) +} + +// wait_for_read blocks until the socket is ready to write +[inline] +pub fn (mut c StreamConn) wait_for_write() ! { + return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout) +} + +// str returns a string representation of connection `c` +pub fn (c StreamConn) str() string { + s := c.sock.str().replace('\n', ' ').replace(' ', ' ') + return 'StreamConn{ write_deadline: ${c.write_deadline}, read_deadline: ${c.read_deadline}, read_timeout: ${c.read_timeout}, write_timeout: ${c.write_timeout}, sock: ${s} }' +} + +pub struct StreamListener { +pub mut: + sock StreamSocket +mut: + accept_timeout time.Duration + accept_deadline time.Time +} + +[params] +pub struct ListenOptions { + backlog int = 128 +} + +// listen_stream creates an unix domain socket at `socket_path` +pub fn listen_stream(socket_path string, options ListenOptions) !&StreamListener { + if socket_path.len >= max_sun_path { + return error('Socket path too long! Max length: ${max_sun_path - 1} chars.') + } + mut s := new_stream_socket(socket_path) or { + return error('${err.msg()}; could not create new unix stream socket') + } + + addrs := net.resolve_addrs(socket_path, .unix, .tcp) or { + return error('${err.msg()}; could not resolve path ${socket_path}') + } + addr := addrs[0] + + // cast to the correct type + alen := addr.len() + + // try to unlink/remove an existing filesystem object at `socket_path`. Ignore errors, + // because it's ok if the path doesn't exists and if it exists, but can't be unlinked + // then `bind` will generate an error + $if windows { + os.rm(socket_path) or {} + } $else { + C.unlink(&char(socket_path.str)) + } + + net.socket_error_message(C.bind(s.handle, voidptr(&addr), alen), 'binding to ${socket_path} failed')! + net.socket_error_message(C.listen(s.handle, options.backlog), 'listening on ${socket_path} with maximum backlog pending queue of ${options.backlog}, failed')! + return &StreamListener{ + sock: s + accept_deadline: no_deadline + accept_timeout: infinite_timeout + } +} + +// accept accepts blocks until a new connection occurs +pub fn (mut l StreamListener) accept() !&StreamConn { + $if trace_unix ? { + eprintln(' StreamListener.accept | l.sock.handle: ${l.sock.handle:6}') + } + + mut new_handle := $if is_coroutine ? { + C.photon_accept(l.sock.handle, 0, 0, unix.unix_default_read_timeout) + } $else { + C.accept(l.sock.handle, 0, 0) + } + if new_handle <= 0 { + l.wait_for_accept()! + new_handle = $if is_coroutine ? { + C.photon_accept(l.sock.handle, 0, 0, unix.unix_default_read_timeout) + } $else { + C.accept(l.sock.handle, 0, 0) + } + if new_handle == -1 || new_handle == 0 { + return error('accept failed') + } + } + + mut c := &StreamConn{ + handle: new_handle + read_timeout: unix.unix_default_read_timeout + write_timeout: unix.unix_default_write_timeout + } + c.sock = stream_socket_from_handle(c.handle)! + return c +} + +// accept_deadline returns the deadline until a new client is accepted +pub fn (l &StreamListener) accept_deadline() !time.Time { + if l.accept_deadline.unix != 0 { + return l.accept_deadline + } + return error('no deadline') +} + +// set_accept_deadline sets the deadlinme until a new client is accepted +pub fn (mut l StreamListener) set_accept_deadline(deadline time.Time) { + l.accept_deadline = deadline +} + +// accept_timeout returns the timeout until a new client is accepted +pub fn (l &StreamListener) accept_timeout() time.Duration { + return l.accept_timeout +} + +// set_accept_timeout sets the timeout until a new client is accepted +pub fn (mut l StreamListener) set_accept_timeout(t time.Duration) { + l.accept_timeout = t +} + +// wait_for_accept blocks until a client can be accepted +pub fn (mut l StreamListener) wait_for_accept() ! { + return wait_for_read(l.sock.handle, l.accept_deadline, l.accept_timeout) +} + +// close closes the listening socket and unlinks/removes the socket file +pub fn (mut l StreamListener) close() ! { + l.sock.close()! + l.unlink()! +} + +// unlink removes the unix socket from the file system +pub fn (mut l StreamListener) unlink() ! { + $if windows { + os.rm(l.sock.socket_path)! + } $else { + net.socket_error_message(C.unlink(&char(l.sock.socket_path.str)), 'could not unlink ${l.sock.socket_path}')! + } +} + +// unlink_on_signal removes the socket from the filesystem when signal `signum` occurs +pub fn (mut l StreamListener) unlink_on_signal(signum os.Signal) ! { + os.signal_opt(.int, fn [mut l] (sign os.Signal) { + $if trace_unix ? { + eprintln(' StreamListener.unlink_on_signal received signal ${sign}; unlinking unix socket ${l.sock.socket_path}') + } + l.unlink() or {} + exit(1) + })! +} + +// addr returns the `net.Addr` version of the listening socket's path +pub fn (mut l StreamListener) addr() !net.Addr { + return l.sock.address()! +} + +pub struct StreamSocket { + net.Socket +mut: + socket_path string +} + +fn new_stream_socket(socket_path string) !StreamSocket { + handle := $if is_coroutine ? { + net.socket_error(C.photon_socket(.unix, .tcp, 0))! + } $else { + net.socket_error(C.socket(.unix, .tcp, 0))! + } + mut s := StreamSocket{ + handle: handle + socket_path: socket_path + } + + $if trace_unix ? { + eprintln(' new_unix_socket | s.handle: ${s.handle:6}') + } + + $if !net_blocking_sockets ? { + $if windows { + t := u32(1) // true + net.socket_error(C.ioctlsocket(handle, net.fionbio, &t))! + } $else { + net.socket_error(C.fcntl(handle, C.F_SETFL, C.fcntl(handle, C.F_GETFL) | C.O_NONBLOCK))! + } + } + return s +} + +fn (mut s StreamSocket) close() ! { + // shutdown might be redundant for unix domain sockets, but it doesn't hurt to call it + shutdown(s.handle) + return close(s.handle) +} + +fn (mut s StreamSocket) @select(test Select, timeout time.Duration) !bool { + return @select(s.handle, test, timeout) +} + +// set_option sets an option on the socket +fn (mut s StreamSocket) set_option(level int, opt int, value int) ! { + net.socket_error(C.setsockopt(s.handle, level, opt, &value, sizeof(int)))! +} + +// set_option_bool sets a boolean option on the socket +pub fn (mut s StreamSocket) set_option_bool(opt net.SocketOption, value bool) ! { + if opt !in net.opts_can_set { + return net.err_option_not_settable + } + if opt !in net.opts_bool { + return net.err_option_wrong_type + } + x := int(value) + s.set_option(C.SOL_SOCKET, int(opt), &x)! +} + +// set_option_bool sets an int option on the socket +pub fn (mut s StreamSocket) set_option_int(opt net.SocketOption, value int) ! { + s.set_option(C.SOL_SOCKET, int(opt), value)! +} + +fn (mut s StreamSocket) connect(socket_path string) ! { + if socket_path.len >= max_sun_path { + return error('Socket path too long! Max length: ${max_sun_path - 1} chars.') + } + + addrs := net.resolve_addrs(socket_path, .unix, .tcp) or { + return error('${err.msg()}; could not resolve path ${socket_path}') + } + addr := addrs[0] + // cast to the correct type + alen := addr.len() + eprintln(addr) + + $if !net_blocking_sockets ? { + res := $if is_coroutine ? { + C.photon_connect(s.handle, voidptr(&addr), alen, unix.unix_default_read_timeout) + } $else { + C.connect(s.handle, voidptr(&addr), alen) + } + if res == 0 { + return + } + ecode := net.error_code() + + // no need to check for einprogress on nix + // On windows we expect res == -1 && net.error_code() == ewouldblock + $if windows { + if ecode == int(net.error_ewouldblock) { + // The socket is nonblocking and the connection cannot be completed + // immediately. Wait till the socket is ready to write + write_result := s.@select(.write, unix.connect_timeout)! + err := 0 + len := sizeof(err) + // determine whether connect() completed successfully (SO_ERROR is zero) + xyz := C.getsockopt(s.handle, C.SOL_SOCKET, C.SO_ERROR, &err, &len) + if xyz == 0 && err == 0 { + return + } + if write_result { + if xyz == 0 { + net.wrap_error(err)! + return + } + return + } + return net.err_timed_out + } + } + net.wrap_error(ecode)! + return + } $else { + x := $if is_coroutine ? { + C.photon_connect(s.handle, voidptr(&addr), alen, unix.unix_default_read_timeout) + } $else { + C.connect(s.handle, voidptr(&addr), alen) + } + net.socket_error(x)! + } +} + +// stream_socket_from_handle returns a `StreamSocket` instance from the raw file descriptor `sockfd` +pub fn stream_socket_from_handle(sockfd int) !&StreamSocket { + mut s := &StreamSocket{ + handle: sockfd + } + + $if trace_unix ? { + eprintln(' stream_socket_from_handle | s.handle: ${s.handle:6}') + } + + $if !net_blocking_sockets ? { + $if windows { + t := u32(1) // true + net.socket_error(C.ioctlsocket(sockfd, net.fionbio, &t))! + } $else { + net.socket_error(C.fcntl(sockfd, C.F_SETFL, C.fcntl(sockfd, C.F_GETFL) | C.O_NONBLOCK))! + } + } + return s +} diff --git a/vlib/net/unix/stream_nix.c.v b/vlib/net/unix/stream_nix.c.v deleted file mode 100644 index caf29821ac7080..00000000000000 --- a/vlib/net/unix/stream_nix.c.v +++ /dev/null @@ -1,289 +0,0 @@ -module unix - -import time -import os -import net - -const ( - unix_default_read_timeout = 30 * time.second - unix_default_write_timeout = 30 * time.second - connect_timeout = 5 * time.second - msg_nosignal = 0x4000 -) - -pub struct StreamSocket { -pub: - handle int -mut: - path string -} - -pub struct StreamConn { -pub mut: - sock StreamSocket -mut: - write_deadline time.Time - read_deadline time.Time - read_timeout time.Duration - write_timeout time.Duration -} - -pub struct StreamListener { -pub mut: - sock StreamSocket -mut: - accept_timeout time.Duration - accept_deadline time.Time -} - -fn error_code() int { - return C.errno -} - -fn new_stream_socket() !StreamSocket { - sockfd := net.socket_error(C.socket(net.AddrFamily.unix, net.SocketType.tcp, 0))! - mut s := StreamSocket{ - handle: sockfd - } - return s -} - -fn (mut s StreamSocket) close() ! { - shutdown(s.handle) - return close(s.handle) -} - -fn (mut s StreamSocket) @select(test Select, timeout time.Duration) !bool { - return @select(s.handle, test, timeout) -} - -fn (mut s StreamSocket) connect(a string) ! { - if a.len >= max_sun_path { - return error('Socket path too long! Max length: ${max_sun_path - 1} chars.') - } - mut addr := C.sockaddr_un{} - unsafe { C.memset(&addr, 0, sizeof(C.sockaddr_un)) } - addr.sun_family = u8(C.AF_UNIX) - unsafe { C.strncpy(&addr.sun_path[0], &char(a.str), max_sun_path) } - size := C.SUN_LEN(&addr) - res := C.connect(s.handle, voidptr(&addr), size) - // if res != 1 { - // return none - //} - if res == 0 { - return - } - _ := error_code() - write_result := s.@select(.write, unix.connect_timeout)! - if write_result { - // succeeded - return - } - except_result := s.@select(.except, unix.connect_timeout)! - if except_result { - return net.err_connect_failed - } - // otherwise we timed out - return net.err_connect_timed_out -} - -pub fn listen_stream(sock string) !&StreamListener { - if sock.len >= max_sun_path { - return error('Socket path too long! Max length: ${max_sun_path - 1} chars.') - } - mut s := new_stream_socket()! - s.path = sock - mut addr := C.sockaddr_un{} - unsafe { C.memset(&addr, 0, sizeof(C.sockaddr_un)) } - addr.sun_family = u8(C.AF_UNIX) - unsafe { C.strncpy(&addr.sun_path[0], &char(sock.str), max_sun_path) } - size := C.SUN_LEN(&addr) - if os.exists(sock) { - os.rm(sock)! - } - net.socket_error(C.bind(s.handle, voidptr(&addr), size))! - os.chmod(sock, 0o777)! - net.socket_error(C.listen(s.handle, 128))! - return &StreamListener{ - sock: s - } -} - -pub fn connect_stream(path string) !&StreamConn { - mut s := new_stream_socket()! - s.connect(path)! - return &StreamConn{ - sock: s - read_timeout: unix.unix_default_read_timeout - write_timeout: unix.unix_default_write_timeout - } -} - -pub fn (mut l StreamListener) accept() !&StreamConn { - mut new_handle := C.accept(l.sock.handle, 0, 0) - if new_handle <= 0 { - l.wait_for_accept()! - new_handle = C.accept(l.sock.handle, 0, 0) - if new_handle == -1 || new_handle == 0 { - return error('accept failed') - } - } - new_sock := StreamSocket{ - handle: new_handle - } - return &StreamConn{ - sock: new_sock - read_timeout: unix.unix_default_read_timeout - write_timeout: unix.unix_default_write_timeout - } -} - -pub fn (c &StreamListener) accept_deadline() !time.Time { - if c.accept_deadline.unix != 0 { - return c.accept_deadline - } - return error('no deadline') -} - -pub fn (mut c StreamListener) set_accept_deadline(deadline time.Time) { - c.accept_deadline = deadline -} - -pub fn (c &StreamListener) accept_timeout() time.Duration { - return c.accept_timeout -} - -pub fn (mut c StreamListener) set_accept_timeout(t time.Duration) { - c.accept_timeout = t -} - -pub fn (mut c StreamListener) wait_for_accept() ! { - return wait_for_read(c.sock.handle, c.accept_deadline, c.accept_timeout) -} - -pub fn (mut c StreamListener) close() ! { - os.rm(c.sock.path)! - c.sock.close()! -} - -pub fn (mut c StreamConn) close() ! { - c.sock.close()! -} - -// write_ptr blocks and attempts to write all data -pub fn (mut c StreamConn) write_ptr(b &u8, len int) !int { - $if trace_unix ? { - eprintln( - '>>> StreamConn.write_ptr | c.sock.handle: ${c.sock.handle} | b: ${ptr_str(b)} len: ${len} |\n' + - unsafe { b.vstring_with_len(len) }) - } - unsafe { - mut ptr_base := &u8(b) - mut total_sent := 0 - for total_sent < len { - ptr := ptr_base + total_sent - remaining := len - total_sent - mut sent := C.send(c.sock.handle, ptr, remaining, unix.msg_nosignal) - if sent < 0 { - code := error_code() - if code == int(error_ewouldblock) { - c.wait_for_write()! - continue - } else { - net.wrap_error(code)! - } - } - total_sent += sent - } - return total_sent - } -} - -// write blocks and attempts to write all data -pub fn (mut c StreamConn) write(bytes []u8) !int { - return c.write_ptr(bytes.data, bytes.len) -} - -// write_string blocks and attempts to write all data -pub fn (mut c StreamConn) write_string(s string) !int { - return c.write_ptr(s.str, s.len) -} - -pub fn (mut c StreamConn) read_ptr(buf_ptr &u8, len int) !int { - mut res := wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! - $if trace_unix ? { - eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') - } - if res > 0 { - return res - } - code := error_code() - if code == int(error_ewouldblock) { - c.wait_for_read()! - res = wrap_read_result(C.recv(c.sock.handle, voidptr(buf_ptr), len, 0))! - $if trace_unix ? { - eprintln('<<< StreamConn.read_ptr | c.sock.handle: ${c.sock.handle} | buf_ptr: ${ptr_str(buf_ptr)} len: ${len} | res: ${res}') - } - return net.socket_error(res) - } else { - net.wrap_error(code)! - } - return net.socket_error(code) -} - -pub fn (mut c StreamConn) read(mut buf []u8) !int { - return c.read_ptr(buf.data, buf.len) -} - -pub fn (mut c StreamConn) read_deadline() !time.Time { - if c.read_deadline.unix == 0 { - return c.read_deadline - } - return error('none') -} - -pub fn (mut c StreamConn) set_read_deadline(deadline time.Time) { - c.read_deadline = deadline -} - -pub fn (mut c StreamConn) write_deadline() !time.Time { - if c.write_deadline.unix == 0 { - return c.write_deadline - } - return error('none') -} - -pub fn (mut c StreamConn) set_write_deadline(deadline time.Time) { - c.write_deadline = deadline -} - -pub fn (c &StreamConn) read_timeout() time.Duration { - return c.read_timeout -} - -pub fn (mut c StreamConn) set_read_timeout(t time.Duration) { - c.read_timeout = t -} - -pub fn (c &StreamConn) write_timeout() time.Duration { - return c.write_timeout -} - -pub fn (mut c StreamConn) set_write_timeout(t time.Duration) { - c.write_timeout = t -} - -[inline] -pub fn (mut c StreamConn) wait_for_read() ! { - return wait_for_read(c.sock.handle, c.read_deadline, c.read_timeout) -} - -[inline] -pub fn (mut c StreamConn) wait_for_write() ! { - return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout) -} - -pub fn (c StreamConn) str() string { - s := c.sock.str().replace('\n', ' ').replace(' ', ' ') - return 'StreamConn{ write_deadline: ${c.write_deadline}, read_deadline: ${c.read_deadline}, read_timeout: ${c.read_timeout}, write_timeout: ${c.write_timeout}, sock: ${s} }' -} diff --git a/vlib/net/unix/unix_test.v b/vlib/net/unix/unix_test.v index ba23fbe1ebf17a..7f7b6227ab44ff 100644 --- a/vlib/net/unix/unix_test.v +++ b/vlib/net/unix/unix_test.v @@ -2,8 +2,8 @@ import os import net.unix const ( - tfolder = os.join_path(os.vtmp_dir(), 'unix_test') - test_port = os.join_path(tfolder, 'unix_domain_socket') + tfolder = os.join_path(os.vtmp_dir(), 'unix_test') + socket_path = os.join_path(tfolder, 'v_unix.sock') ) fn testsuite_begin() { @@ -36,7 +36,7 @@ fn echo_server(mut l unix.StreamListener) ! { } fn echo() ! { - mut c := unix.connect_stream(test_port)! + mut c := unix.connect_stream(socket_path)! defer { c.close() or {} } @@ -53,9 +53,13 @@ fn echo() ! { } fn test_tcp() { - assert os.exists(test_port) == false - mut l := unix.listen_stream(test_port) or { panic(err) } + assert os.exists(socket_path) == false + + mut l := unix.listen_stream(socket_path) or { panic(err) } spawn echo_server(mut l) echo() or { panic(err) } l.close() or {} + + // test if socket file is removed/unlinked + assert os.exists(socket_path) == false }