From 0f82d12fc387bde06b0f3252af143cafce62cbe3 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 11 Jul 2023 12:02:24 +0200 Subject: [PATCH 01/11] wasi: nonblocking I/O for sockets and pipes on Windows Further work to improve the support to nonblocking I/O on Windows. This drops the special-casing for stdin, and instead checks if a handle is a named pipe (which includes sockets and other kinds of pipes); moreover it further improve _select by introducing a proper wrapper for winsock's select, which is BSD socket's select, and a compatible implementation of FdSet. Signed-off-by: Edoardo Vacchi --- .../wasi_stdlib_test.go | 7 +- internal/platform/fdset.go | 2 + internal/platform/fdset_test.go | 7 +- internal/platform/fdset_unsupported.go | 2 +- internal/platform/fdset_windows.go | 209 ++++++++++++++++ internal/sysfs/file_test.go | 8 - internal/sysfs/file_windows.go | 44 +++- internal/sysfs/select_test.go | 6 - internal/sysfs/select_windows.go | 236 +++++++++++++----- internal/sysfs/select_windows_test.go | 93 ++++--- internal/sysfs/sock_windows.go | 171 +++++++++---- 11 files changed, 610 insertions(+), 175 deletions(-) create mode 100644 internal/platform/fdset_windows.go diff --git a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go index 78e8430001..93ac1a5670 100644 --- a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go +++ b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go @@ -10,7 +10,6 @@ import ( "os" "os/exec" "path" - "runtime" "sort" "strconv" "strings" @@ -478,13 +477,11 @@ func testSock(t *testing.T, bin []byte) { console := <-ch require.NotEqual(t, 0, n) require.NoError(t, err) - require.Equal(t, "wazero\n", console) + // Nonblocking connections may contain error logging, we ignore those. + require.Equal(t, "wazero\n", console[len(console)-7:]) } func Test_HTTP(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("fsapi.Nonblocking() is not supported on wasip1+windows.") - } toolchains := map[string][]byte{} if wasmGotip != nil { toolchains["gotip"] = wasmGotip diff --git a/internal/platform/fdset.go b/internal/platform/fdset.go index 0e8a13d5c6..1017c805ac 100644 --- a/internal/platform/fdset.go +++ b/internal/platform/fdset.go @@ -1,3 +1,5 @@ +//go:build !windows + package platform // Set adds the given fd to the set. diff --git a/internal/platform/fdset_test.go b/internal/platform/fdset_test.go index 63fd79a57d..8a65506f72 100644 --- a/internal/platform/fdset_test.go +++ b/internal/platform/fdset_test.go @@ -1,17 +1,14 @@ +//go:build !windows + package platform import ( - "runtime" "testing" "github.com/tetratelabs/wazero/internal/testing/require" ) func TestFdSet(t *testing.T) { - if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { - t.Skip("not supported") - } - allBitsSetAtIndex0 := FdSet{} allBitsSetAtIndex0.Bits[0] = -1 diff --git a/internal/platform/fdset_unsupported.go b/internal/platform/fdset_unsupported.go index b5aa3c1561..ad9cf09109 100644 --- a/internal/platform/fdset_unsupported.go +++ b/internal/platform/fdset_unsupported.go @@ -1,4 +1,4 @@ -//go:build !darwin && !linux +//go:build !darwin && !linux && !windows package platform diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go new file mode 100644 index 0000000000..39c5e69a5d --- /dev/null +++ b/internal/platform/fdset_windows.go @@ -0,0 +1,209 @@ +package platform + +import ( + "syscall" + "unsafe" +) + +var procGetNamedPipeInfo = kernel32.NewProc("GetNamedPipeInfo") + +// Maximum number of fds in a WinSockFdSet. +const _FD_SETSIZE = 64 + +// WinSockFdSet implements the FdSet representation that is used internally by WinSock. +// +// Note: this representation is quite different from the one used in most POSIX implementations +// where a bitfield is usually implemented; instead on Windows we have a simpler array+count pair. +// Notice that because it keeps a count of the inserted handles, the first argument of select +// in WinSock is actually ignored. +// +// The implementation of the Set, Clear, IsSet, Zero, methods follows exactly +// the real implementation found in WinSock2.h, e.g. see: +// https://github.com/microsoft/win32metadata/blob/ef7725c75c6b39adfdc13ba26fb1d89ac954449a/generation/WinSDK/RecompiledIdlHeaders/um/WinSock2.h#L124-L175 +type WinSockFdSet struct { + // count is the number of used slots used in the handles slice. + count uint64 + // handles is the array of handles. This is called "array" in the WinSock implementation + // and it has a fixed length of _FD_SETSIZE. + handles [_FD_SETSIZE]syscall.Handle +} + +// FdSet implements the same methods provided on other plaforms. +// +// Note: the implementation is very different from POSIX; Windows provides +// POSIX select only for sockets. We emulate a select for other APIs in the sysfs +// package, but we still want to use the "real" select in the case of sockets. +// So, we keep a separate FdSet of sockets, so that we can pass it directly +// to the winsock select implementation +type FdSet struct { + sockets WinSockFdSet + pipes WinSockFdSet + regular WinSockFdSet +} + +// Sockets returns a WinSockFdSet containing the handles in this FdSet that are sockets. +func (f *FdSet) Sockets() *WinSockFdSet { + if f == nil { + return nil + } + return &f.sockets +} + +func (f *FdSet) SetSockets(s WinSockFdSet) { + f.sockets = s +} + +// Regular returns a WinSockFdSet containing the handles in this FdSet that are regular files. +func (f *FdSet) Regular() *WinSockFdSet { + if f == nil { + return nil + } + return &f.regular +} + +func (f *FdSet) SetRegular(r WinSockFdSet) { + f.regular = r +} + +// Regular returns a WinSockFdSet containing the handles in this FdSet that are pipes. +func (f *FdSet) Pipes() *WinSockFdSet { + if f == nil { + return nil + } + return &f.pipes +} + +func (f *FdSet) SetPipes(p WinSockFdSet) { + f.pipes = p +} + +func (f *FdSet) getFdSetFor(fd int) *WinSockFdSet { + h := syscall.Handle(fd) + t, err := syscall.GetFileType(h) + if err != nil { + return nil + } + switch t { + case syscall.FILE_TYPE_CHAR, syscall.FILE_TYPE_DISK: + return &f.regular + case syscall.FILE_TYPE_PIPE: + if isSocket(h) { + return &f.sockets + } else { + return &f.pipes + } + default: + return nil + } +} + +// Set adds the given fd to the set. +func (f *FdSet) Set(fd int) { + if s := f.getFdSetFor(fd); s != nil { + s.Set(fd) + } +} + +// Clear removes the given fd from the set. +func (f *FdSet) Clear(fd int) { + if s := f.getFdSetFor(fd); s != nil { + s.Clear(fd) + } +} + +// IsSet returns true when fd is in the set. +func (f *FdSet) IsSet(fd int) bool { + if s := f.getFdSetFor(fd); s != nil { + return s.IsSet(fd) + } + return false +} + +// Zero clears the set. +func (f *FdSet) Zero() { + f.sockets.Zero() + f.regular.Zero() + f.pipes.Zero() +} + +// Set adds the given fd to the set. +func (f *WinSockFdSet) Set(fd int) { + if f.count < _FD_SETSIZE { + f.handles[f.count] = syscall.Handle(fd) + f.count++ + } +} + +// Clear removes the given fd from the set. +func (f *WinSockFdSet) Clear(fd int) { + h := syscall.Handle(fd) + if !isSocket(h) { + return + } + + for i := uint64(0); i < f.count; i++ { + if f.handles[i] == h { + for ; i < f.count-1; i++ { + f.handles[i] = f.handles[i+1] + } + f.count-- + break + } + } +} + +// IsSet returns true when fd is in the set. +func (f *WinSockFdSet) IsSet(fd int) bool { + h := syscall.Handle(fd) + for i := uint64(0); i < f.count; i++ { + if f.handles[i] == h { + return true + } + } + return false +} + +// Zero clears the set. +func (f *WinSockFdSet) Zero() { + f.count = 0 +} + +func (f *WinSockFdSet) Count() int { + if f == nil { + return 0 + } + return int(f.count) +} + +func (f *WinSockFdSet) Copy() *WinSockFdSet { + if f == nil { + return nil + } + copy := *f + return © +} + +func (f *WinSockFdSet) Get(index int) syscall.Handle { + return f.handles[index] +} + +// isSocket returns true if the given file handle +// is a pipe. +func isSocket(fd syscall.Handle) bool { + // n, err := syscall.GetFileType(fd) + // if err != nil { + // return false + // } + // if n != syscall.FILE_TYPE_PIPE { + // return false + // } + // If the call to GetNamedPipeInfo succeeds then + // the handle is a pipe handle, otherwise it is a socket. + r, _, errno := syscall.SyscallN( + procGetNamedPipeInfo.Addr(), + uintptr(unsafe.Pointer(nil)), + uintptr(unsafe.Pointer(nil)), + uintptr(unsafe.Pointer(nil)), + uintptr(unsafe.Pointer(nil))) + return r != 0 && errno == 0 +} diff --git a/internal/sysfs/file_test.go b/internal/sysfs/file_test.go index 35de16725e..16c5f41581 100644 --- a/internal/sysfs/file_test.go +++ b/internal/sysfs/file_test.go @@ -49,10 +49,6 @@ func TestStdioFileSetNonblock(t *testing.T) { } func TestRegularFileSetNonblock(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Nonblock on regular files is not supported on Windows") - } - // Test using os.Pipe as it is known to support non-blocking reads. r, w, err := os.Pipe() require.NoError(t, err) @@ -339,10 +335,6 @@ func TestFilePollRead(t *testing.T) { // When there's nothing in the pipe, it isn't ready. ready, errno := rF.PollRead(&timeout) - if runtime.GOOS == "windows" { - require.EqualErrno(t, experimentalsys.ENOSYS, errno) - t.Skip("TODO: windows File.PollRead") - } require.EqualErrno(t, 0, errno) require.False(t, ready) diff --git a/internal/sysfs/file_windows.go b/internal/sysfs/file_windows.go index e89db54d48..c07d2b92a1 100644 --- a/internal/sysfs/file_windows.go +++ b/internal/sysfs/file_windows.go @@ -24,15 +24,15 @@ var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe") // https://learn.microsoft.com/en-us/windows/console/console-handles func readFd(fd uintptr, buf []byte) (int, sys.Errno) { handle := syscall.Handle(fd) - fileType, err := syscall.GetFileType(syscall.Stdin) + fileType, err := syscall.GetFileType(handle) if err != nil { return 0, sys.UnwrapOSError(err) } if fileType&syscall.FILE_TYPE_CHAR == 0 { return -1, sys.ENOSYS } - n, err := peekNamedPipe(handle) - if err == syscall.ERROR_BROKEN_PIPE { + n, errno := peekNamedPipe(handle) + if errno == syscall.ERROR_BROKEN_PIPE { return 0, 0 } if n == 0 { @@ -42,24 +42,42 @@ func readFd(fd uintptr, buf []byte) (int, sys.Errno) { return un, sys.UnwrapOSError(err) } +func writeFd(fd uintptr, buf []byte) (int, sys.Errno) { + return -1, sys.ENOSYS +} + +func readSocket(h syscall.Handle, buf []byte) (int, sys.Errno) { + var overlapped syscall.Overlapped + var done uint32 + errno := syscall.ReadFile(h, buf, &done, &overlapped) + if errno == syscall.ERROR_IO_PENDING { + errno = sys.EAGAIN + } + return int(done), sys.UnwrapOSError(errno) +} + +func writeSocket(fd uintptr, buf []byte) (int, sys.Errno) { + var done uint32 + var overlapped syscall.Overlapped + errno := syscall.WriteFile(syscall.Handle(fd), buf, &done, &overlapped) + if errno == syscall.ERROR_IO_PENDING { + errno = syscall.EAGAIN + } + return int(done), sys.UnwrapOSError(errno) +} + // peekNamedPipe partially exposes PeekNamedPipe from the Win32 API // see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe -func peekNamedPipe(handle syscall.Handle) (uint32, error) { +func peekNamedPipe(handle syscall.Handle) (uint32, syscall.Errno) { var totalBytesAvail uint32 totalBytesPtr := unsafe.Pointer(&totalBytesAvail) - _, _, err := procPeekNamedPipe.Call( + _, _, errno := syscall.SyscallN( + procPeekNamedPipe.Addr(), uintptr(handle), // [in] HANDLE hNamedPipe, 0, // [out, optional] LPVOID lpBuffer, 0, // [in] DWORD nBufferSize, 0, // [out, optional] LPDWORD lpBytesRead uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail, 0) // [out, optional] LPDWORD lpBytesLeftThisMessage - if err == syscall.Errno(0) { - return totalBytesAvail, nil - } - return totalBytesAvail, err -} - -func writeFd(fd uintptr, buf []byte) (int, sys.Errno) { - return -1, sys.ENOSYS + return totalBytesAvail, errno } diff --git a/internal/sysfs/select_test.go b/internal/sysfs/select_test.go index 8cee83e4e4..4cbf84cbb5 100644 --- a/internal/sysfs/select_test.go +++ b/internal/sysfs/select_test.go @@ -75,12 +75,6 @@ func TestSelect(t *testing.T) { for { n, err := _select(fd+1, rFdSet, nil, nil, nil) - if runtime.GOOS == "windows" { - // Not implemented for fds != wasiFdStdin - require.ErrorIs(t, err, sys.ENOSYS) - require.Equal(t, -1, n) - break - } if err == sys.EINTR { t.Log("Select interrupted") continue diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index 0791bfd25d..c1221b8dd0 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -4,30 +4,26 @@ import ( "context" "syscall" "time" + "unsafe" "github.com/tetratelabs/wazero/experimental/sys" "github.com/tetratelabs/wazero/internal/platform" ) -// wasiFdStdin is the constant value for stdin on Wasi. -// We need this constant because on Windows os.Stdin.Fd() != 0. -const wasiFdStdin = 0 - -// pollInterval is the interval between each calls to peekNamedPipe in pollNamedPipe +// pollInterval is the interval between each calls to peekNamedPipe in selectAllHandles const pollInterval = 100 * time.Millisecond -// syscall_select emulates the select syscall on Windows for two, well-known cases, returns sys.ENOSYS for all others. -// If r contains fd 0, and it is a regular file, then it immediately returns 1 (data ready on stdin) -// and r will have the fd 0 bit set. -// If r contains fd 0, and it is a FILE_TYPE_CHAR, then it invokes PeekNamedPipe to check the buffer for input; -// if there is data ready, then it returns 1 and r will have fd 0 bit set. +// syscall_select emulates the select syscall on Windows, for a subset of cases. +// +// r, w, e may contain any number of file handles, but regular files and pipes are only processed for r (Read). +// Stdin is a pipe, thus it is checked for readiness when present. Pipes are checked using PeekNamedPipe. +// Regular files always immediately report as ready, regardless their actual state and timeouts. +// // If n==0 it will wait for the given timeout duration, but it will return sys.ENOSYS if timeout is nil, // i.e. it won't block indefinitely. // -// Note: idea taken from https://stackoverflow.com/questions/6839508/test-if-stdin-has-input-for-c-windows-and-or-linux +// Note: ideas taken from https://stackoverflow.com/questions/6839508/test-if-stdin-has-input-for-c-windows-and-or-linux // PeekNamedPipe: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe -// "GetFileType can assist in determining what device type the handle refers to. A console handle presents as FILE_TYPE_CHAR." -// https://learn.microsoft.com/en-us/windows/console/console-handles func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int, error) { if n == 0 { // Don't block indefinitely. @@ -37,69 +33,181 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int time.Sleep(*timeout) return 0, nil } - if r.IsSet(wasiFdStdin) { - fileType, err := syscall.GetFileType(syscall.Stdin) - if err != nil { - return 0, err - } - if fileType&syscall.FILE_TYPE_CHAR != 0 { - res, err := pollNamedPipe(context.TODO(), syscall.Stdin, timeout) - if err != nil { - return -1, err - } - if !res { - r.Zero() - return 0, nil - } - } - r.Zero() - r.Set(wasiFdStdin) - return 1, nil + + n, errno := selectAllHandles(context.TODO(), r, w, e, timeout) + if errno == 0 { + return n, nil } - return -1, sys.ENOSYS + return n, errno } -// pollNamedPipe polls the given named pipe handle for the given duration. +// selectAllHandles emulates a general-purpose POSIX select on Windows. // // The implementation actually polls every 100 milliseconds until it reaches the given duration. // The duration may be nil, in which case it will wait undefinely. The given ctx is -// used to allow for cancellation. Currently used only in tests. -func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *time.Duration) (bool, error) { +// used to allow for cancellation, and it is currently used only in tests. +// +// As indicated in the man page for select [1], r, w, e are modified upon completion: +// +// "Upon successful completion, the pselect() or select() function shall modify the objects pointed to by the readfds, +// writefds, and errorfds arguments to indicate which file descriptors are ready for reading, ready for writing, +// or have an error condition pending, respectively, and shall return the total number of ready descriptors in all the output sets" +// +// However, for our purposes, this may be pedantic because currently we do not check the values of r, w, e +// after the invocation of select; thus, this behavior may be subject to change in the future for the sake of simplicity. +// +// [1]: https://linux.die.net/man/3/select +func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *time.Duration) (int, sys.Errno) { + nregular := r.Regular().Count() + w.Regular().Count() + e.Regular().Count() + + nsocks := 0 + + rp, errno := peekAllPipes(r.Pipes()) + npipes := rp.Count() + + if errno != 0 { + r.Zero() + w.Zero() + e.Zero() + r.SetPipes(*rp) + return nregular + npipes, errno + } + + // winsock_select mutates the given references, so we create copies. + rs, ws, es := r.Sockets().Copy(), w.Sockets().Copy(), e.Sockets().Copy() + // Short circuit when the duration is zero. if duration != nil && *duration == time.Duration(0) { - bytes, err := peekNamedPipe(pipeHandle) - return bytes > 0, err + nsocks, errno = winsock_select(rs, ws, es, duration) + } else { + // Ticker that emits at every pollInterval. + tick := time.NewTicker(pollInterval) + tickCh := tick.C + defer tick.Stop() + + // Timer that expires after the given duration. + // Initialize afterCh as nil: the select below will wait forever. + var afterCh <-chan time.Time + if duration != nil { + // If duration is not nil, instantiate the timer. + after := time.NewTimer(*duration) + defer after.Stop() + afterCh = after.C + } + + // winsock_select is a blocking call. We spin a goroutine + // and write back to a channel the result. We consume + // this result in the for loop together with the polling + // routines. + type selectResult struct { + n int + errno sys.Errno + } + + winsockSelectCh := make(chan selectResult, 1) + defer close(winsockSelectCh) + + go func() { + res := selectResult{} + res.n, res.errno = winsock_select(rs, ws, es, duration) + winsockSelectCh <- res + }() + + nsocks := 0 + + outer: + for { + select { + case <-ctx.Done(): + break outer + case <-afterCh: + break outer + case <-tickCh: + rp, errno = peekAllPipes(r.Pipes()) + npipes = rp.Count() + if errno != 0 || npipes > 0 { + break outer + } + case res := <-winsockSelectCh: + nsocks = res.n + if res.errno != 0 { + break outer + } + // winsock_select has returned with no result, ignore + // and wait for the other pipes. + if nsocks == 0 { + continue + } + // If select has return successfully we peek for the last time at the other pipes + // to see if data is available and return the sum. + rp, errno = peekAllPipes(r.Pipes()) + npipes = rp.Count() + break outer + } + } } - // Ticker that emits at every pollInterval. - tick := time.NewTicker(pollInterval) - tichCh := tick.C - defer tick.Stop() - - // Timer that expires after the given duration. - // Initialize afterCh as nil: the select below will wait forever. - var afterCh <-chan time.Time - if duration != nil { - // If duration is not nil, instantiate the timer. - after := time.NewTimer(*duration) - defer after.Stop() - afterCh = after.C + rr, wr, er := r.Regular().Copy(), w.Regular().Copy(), e.Regular().Copy() + + // Clear all FdSets and set them in accordance to the returned values. + + if r != nil { + // Pipes are handled only for r + r.SetPipes(*rp) + r.SetRegular(*rr) + r.SetSockets(*rs) } - for { - select { - case <-ctx.Done(): - return false, nil - case <-afterCh: - return false, nil - case <-tichCh: - res, err := peekNamedPipe(pipeHandle) - if err != nil { - return false, err - } - if res > 0 { - return true, nil - } + if w != nil { + w.SetRegular(*wr) + w.SetSockets(*ws) + } + + if e != nil { + e.SetRegular(*er) + e.SetSockets(*es) + } + + return nregular + npipes + nsocks, errno +} + +func peekAllPipes(pipeHandles *platform.WinSockFdSet) (*platform.WinSockFdSet, sys.Errno) { + ready := &platform.WinSockFdSet{} + for i := 0; i < pipeHandles.Count(); i++ { + h := pipeHandles.Get(i) + bytes, errno := peekNamedPipe(h) + if bytes > 0 { + ready.Set(int(h)) + } + if errno != 0 { + return ready, sys.UnwrapOSError(errno) } } + return ready, 0 +} + +func winsock_select(r, w, e *platform.WinSockFdSet, timeout *time.Duration) (int, sys.Errno) { + if r.Count() == 0 && w.Count() == 0 && e.Count() == 0 { + return 0, 0 + } + + var t *syscall.Timeval + if timeout != nil { + tv := syscall.NsecToTimeval(timeout.Nanoseconds()) + t = &tv + } + + rp := unsafe.Pointer(r) + wp := unsafe.Pointer(w) + ep := unsafe.Pointer(e) + tp := unsafe.Pointer(t) + + r0, _, err := syscall.SyscallN( + procselect.Addr(), + uintptr(0), // the first argument is ignored and exists only for compat with BSD sockets. + uintptr(rp), + uintptr(wp), + uintptr(ep), + uintptr(tp)) + return int(r0), sys.UnwrapOSError(err) } diff --git a/internal/sysfs/select_windows_test.go b/internal/sysfs/select_windows_test.go index fc82261d19..638eb8d295 100644 --- a/internal/sysfs/select_windows_test.go +++ b/internal/sysfs/select_windows_test.go @@ -7,21 +7,32 @@ import ( "testing" "time" + "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/internal/platform" "github.com/tetratelabs/wazero/internal/testing/require" ) func TestSelect_Windows(t *testing.T) { type result struct { - hasData bool - err error + n int + fdSet platform.FdSet + err sys.Errno } testCtx, cancel := context.WithCancel(context.Background()) defer cancel() + handleAsFdSet := func(readHandle syscall.Handle) *platform.FdSet { + var fdSet platform.FdSet + fdSet.Set(int(readHandle)) + return &fdSet + } + pollToChannel := func(readHandle syscall.Handle, duration *time.Duration, ch chan result) { r := result{} - r.hasData, r.err = pollNamedPipe(testCtx, readHandle, duration) + fdSet := handleAsFdSet(readHandle) + r.n, r.err = selectAllHandles(testCtx, fdSet, nil, nil, duration) + r.fdSet = *fdSet ch <- r close(ch) } @@ -32,10 +43,10 @@ func TestSelect_Windows(t *testing.T) { rh := syscall.Handle(r.Fd()) wh := syscall.Handle(w.Fd()) - // Ensure the pipe has data. + // Ensure the pipe has no data. n, err := peekNamedPipe(rh) - require.NoError(t, err) - require.NotEqual(t, 0, n) + require.Zero(t, err) + require.Zero(t, n) // Write to the channel. msg, err := syscall.ByteSliceFromString("test\n") @@ -45,24 +56,26 @@ func TestSelect_Windows(t *testing.T) { // Ensure the pipe has data. n, err = peekNamedPipe(rh) - require.NoError(t, err) - require.NotEqual(t, 0, n) + require.Zero(t, err) + require.Equal(t, 6, int(n)) }) - t.Run("pollNamedPipe should return immediately when duration is nil (no data)", func(t *testing.T) { + t.Run("selectAllHandles should return immediately when duration is nil (no data)", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) d := time.Duration(0) - hasData, err := pollNamedPipe(testCtx, rh, &d) - require.NoError(t, err) - require.False(t, hasData) + fdSet := handleAsFdSet(rh) + n, err := selectAllHandles(testCtx, fdSet, nil, nil, &d) + require.Zero(t, err) + require.Zero(t, n) + require.Zero(t, fdSet.Pipes().Count()) }) - t.Run("pollNamedPipe should return immediately when duration is nil (data)", func(t *testing.T) { + t.Run("selectAllHandles should return immediately when duration is nil (data)", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) - rh := syscall.Handle(r.Fd()) + rh := handleAsFdSet(syscall.Handle(r.Fd())) wh := syscall.Handle(w.Fd()) // Write to the channel immediately. @@ -73,12 +86,13 @@ func TestSelect_Windows(t *testing.T) { // Verify that the write is reported. d := time.Duration(0) - hasData, err := pollNamedPipe(testCtx, rh, &d) - require.NoError(t, err) - require.True(t, hasData) + n, err := selectAllHandles(testCtx, rh, nil, nil, &d) + require.Zero(t, err) + require.NotEqual(t, 0, n) + require.Equal(t, syscall.Handle(r.Fd()), rh.Pipes().Get(0)) }) - t.Run("pollNamedPipe should wait forever when duration is nil", func(t *testing.T) { + t.Run("selectAllHandles should wait forever when duration is nil", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -91,7 +105,7 @@ func TestSelect_Windows(t *testing.T) { require.Equal(t, 0, len(ch)) }) - t.Run("pollNamedPipe should wait forever when duration is nil", func(t *testing.T) { + t.Run("selectAllHandles should wait forever when duration is nil", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -113,14 +127,15 @@ func TestSelect_Windows(t *testing.T) { // Ensure that the write occurs (panic after an arbitrary timeout). select { case <-time.After(500 * time.Millisecond): - panic("unreachable!") + t.Fatal("unreachable!") case r := <-ch: - require.NoError(t, r.err) - require.True(t, r.hasData) + require.Zero(t, r.err) + require.NotEqual(t, 0, r.n) + require.Equal(t, rh, r.fdSet.Pipes().Get(0)) } }) - t.Run("pollNamedPipe should wait for the given duration", func(t *testing.T) { + t.Run("selectAllHandles should wait for the given duration", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -145,12 +160,13 @@ func TestSelect_Windows(t *testing.T) { case <-time.After(500 * time.Millisecond): panic("no data!") case r := <-ch: - require.NoError(t, r.err) - require.True(t, r.hasData) + require.Zero(t, r.err) + require.Equal(t, 1, r.n) + require.Equal(t, rh, r.fdSet.Pipes().Get(0)) } }) - t.Run("pollNamedPipe should timeout after the given duration", func(t *testing.T) { + t.Run("selectAllHandles should timeout after the given duration", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -165,11 +181,12 @@ func TestSelect_Windows(t *testing.T) { // Ensure that the timer has expired. res := <-ch - require.NoError(t, res.err) - require.False(t, res.hasData) + require.Zero(t, res.err) + require.Zero(t, res.n) + require.Zero(t, res.fdSet.Pipes().Count()) }) - t.Run("pollNamedPipe should return when a write occurs before the given duration", func(t *testing.T) { + t.Run("selectAllHandles should return when a write occurs before the given duration", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -188,7 +205,21 @@ func TestSelect_Windows(t *testing.T) { require.NoError(t, err) res := <-ch - require.NoError(t, res.err) - require.True(t, res.hasData) + require.Zero(t, res.err) + require.Equal(t, 1, res.n) + require.Equal(t, rh, res.fdSet.Pipes().Get(0)) + }) + + t.Run("selectAllHandles should return when a regular file is given", func(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "ex") + defer f.Close() + require.NoError(t, err) + fh := syscall.Handle(f.Fd()) + fdSet := handleAsFdSet(fh) + d := time.Duration(0) + n, errno := selectAllHandles(testCtx, fdSet, nil, nil, &d) + require.Zero(t, errno) + require.Equal(t, 1, n) + require.Equal(t, fh, fdSet.Regular().Get(0)) }) } diff --git a/internal/sysfs/sock_windows.go b/internal/sysfs/sock_windows.go index a150a7403c..ef912947e0 100644 --- a/internal/sysfs/sock_windows.go +++ b/internal/sysfs/sock_windows.go @@ -5,21 +5,35 @@ package sysfs import ( "net" "syscall" + "time" "unsafe" "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/internal/platform" socketapi "github.com/tetratelabs/wazero/internal/sock" ) -// MSG_PEEK is the flag PEEK for syscall.Recvfrom on Windows. -// This constant is not exported on this platform. -const MSG_PEEK = 0x2 +const ( + // MSG_PEEK is the flag PEEK for syscall.Recvfrom on Windows. + // This constant is not exported on this platform. + MSG_PEEK = 0x2 + // _FIONBIO is the flag to set the O_NONBLOCK flag on socket handles using ioctlsocket. + _FIONBIO = 0x8004667e + // _WASWOULDBLOCK corresponds to syscall.EWOULDBLOCK in WinSock. + _WASWOULDBLOCK = 10035 +) var ( // modws2_32 is WinSock. modws2_32 = syscall.NewLazyDLL("ws2_32.dll") // procrecvfrom exposes recvfrom from WinSock. procrecvfrom = modws2_32.NewProc("recvfrom") + // procaccept exposes accept from WinSock. + procaccept = modws2_32.NewProc("accept") + // procioctlsocket exposes ioctlsocket from WinSock. + procioctlsocket = modws2_32.NewProc("ioctlsocket") + // procselect exposes select from WinSock. + procselect = modws2_32.NewProc("select") ) // recvfrom exposes the underlying syscall in Windows. @@ -28,7 +42,7 @@ var ( // we do not need really need all the parameters that are actually // allowed in WinSock. // We ignore `from *sockaddr` and `fromlen *int`. -func recvfrom(s syscall.Handle, buf []byte, flags int32) (n int, errno syscall.Errno) { +func recvfrom(s syscall.Handle, buf []byte, flags int32) (n int, errno sys.Errno) { var _p0 *byte if len(buf) > 0 { _p0 = &buf[0] @@ -41,7 +55,41 @@ func recvfrom(s syscall.Handle, buf []byte, flags int32) (n int, errno syscall.E uintptr(flags), 0, // from *sockaddr (optional) 0) // fromlen *int (optional) - return int(r0), e1 + return int(r0), sys.UnwrapOSError(e1) +} + +func setNonblockSocket(fd syscall.Handle, enabled bool) sys.Errno { + opt := uint64(0) + if enabled { + opt = 1 + } + // ioctlsocket(fd, FIONBIO, &opt) + _, _, errno := syscall.SyscallN( + procioctlsocket.Addr(), + uintptr(fd), + uintptr(_FIONBIO), + uintptr(unsafe.Pointer(&opt))) + return sys.UnwrapOSError(errno) +} + +// syscallConnControl extracts a syscall.RawConn from the given syscall.Conn and applies +// the given fn to a file descriptor, returning an integer or a nonzero syscall.Errno on failure. +// +// syscallConnControl streamlines the pattern of extracting the syscall.Rawconn, +// invoking its syscall.RawConn.Control method, then handling properly the errors that may occur +// within fn or returned by syscall.RawConn.Control itself. +func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno)) (n int, errno sys.Errno) { + syscallConn, err := conn.SyscallConn() + if err != nil { + return 0, sys.UnwrapOSError(err) + } + // Prioritize the inner errno over Control + if controlErr := syscallConn.Control(func(fd uintptr) { + n, errno = fn(fd) + }); errno == 0 { + errno = sys.UnwrapOSError(controlErr) + } + return } // newTCPListenerFile is a constructor for a socketapi.TCPSock. @@ -53,7 +101,9 @@ func recvfrom(s syscall.Handle, buf []byte, flags int32) (n int, errno syscall.E // standard library, instead of invoke syscalls/Win32 APIs // because they are sensibly different from Unix's. func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock { - return &winTcpListenerFile{tl: tl} + w := &winTcpListenerFile{tl: tl} + _ = w.SetNonblock(true) + return w } var _ socketapi.TCPSock = (*winTcpListenerFile)(nil) @@ -61,26 +111,56 @@ var _ socketapi.TCPSock = (*winTcpListenerFile)(nil) type winTcpListenerFile struct { baseSockFile - tl *net.TCPListener + tl *net.TCPListener + closed bool + nonblock bool } // Accept implements the same method as documented on socketapi.TCPSock func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) { - conn, err := f.tl.Accept() - if err != nil { + // Ensure we have an incoming connection using winsock_select. + n, errno := syscallConnControl(f.tl, func(fd uintptr) (int, sys.Errno) { + fdSet := platform.WinSockFdSet{} + fdSet.Set(int(fd)) + t := time.Duration(0) + return winsock_select(&fdSet, nil, nil, &t) + }) + + // Otherwise return immediately. + if n == 0 || errno != 0 { + return nil, sys.EAGAIN + } + + // Accept normally blocks goroutines, but we + // made sure that we have an incoming connection, + // so we should be safe. + if conn, err := f.tl.Accept(); err != nil { return nil, sys.UnwrapOSError(err) + } else { + return newTcpConn(conn.(*net.TCPConn)), 0 } - return &winTcpConnFile{tc: conn.(*net.TCPConn)}, 0 +} + +// IsNonblock implements File.IsNonblock +func (f *winTcpListenerFile) IsNonblock() bool { + return f.nonblock } // SetNonblock implements the same method as documented on fsapi.File func (f *winTcpListenerFile) SetNonblock(enabled bool) sys.Errno { - return 0 // setNonblock() is a no-op on Windows + f.nonblock = enabled + _, errno := syscallConnControl(f.tl, func(fd uintptr) (int, sys.Errno) { + return 0, setNonblockSocket(syscall.Handle(fd), enabled) + }) + return errno } // Close implements the same method as documented on fsapi.File func (f *winTcpListenerFile) Close() sys.Errno { - return sys.UnwrapOSError(f.tl.Close()) + if !f.closed { + return sys.UnwrapOSError(f.tl.Close()) + } + return 0 } // Addr is exposed for testing. @@ -90,12 +170,18 @@ func (f *winTcpListenerFile) Addr() *net.TCPAddr { var _ socketapi.TCPConn = (*winTcpConnFile)(nil) +// winTcpConnFile is a blocking connection. +// +// It is a wrapper for an underlying net.TCPConn. type winTcpConnFile struct { baseSockFile tc *net.TCPConn - // closed is true when closed was called. This ensures proper sys.EBADF + // nonblock is true when the underlying connection is flagged as non-blocking. + // This ensures that reads and writes return EAGAIN without blocking the caller. + nonblock bool + // closed is true when closed was called. This ensures proper syscall.EBADF closed bool } @@ -105,23 +191,30 @@ func newTcpConn(tc *net.TCPConn) socketapi.TCPConn { // SetNonblock implements the same method as documented on fsapi.File func (f *winTcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) { - syscallConn, err := f.tc.SyscallConn() - if err != nil { - return sys.UnwrapOSError(err) - } - - // Prioritize the error from setNonblock over Control - if controlErr := syscallConn.Control(func(fd uintptr) { - errno = sys.UnwrapOSError(setNonblock(fd, enabled)) - }); errno == 0 { - errno = sys.UnwrapOSError(controlErr) - } + _, errno = syscallConnControl(f.tc, func(fd uintptr) (int, sys.Errno) { + return 0, sys.UnwrapOSError(setNonblockSocket(syscall.Handle(fd), enabled)) + }) return } +// IsNonblock implements File.IsNonblock +func (f *winTcpConnFile) IsNonblock() bool { + return f.nonblock +} + // Read implements the same method as documented on fsapi.File func (f *winTcpConnFile) Read(buf []byte) (n int, errno sys.Errno) { - if n, errno = read(f.tc, buf); errno != 0 { + if len(buf) == 0 { + return 0, 0 // Short-circuit 0-len reads. + } + if nonBlockingFileReadSupported && f.IsNonblock() { + n, errno = syscallConnControl(f.tc, func(fd uintptr) (int, sys.Errno) { + return readSocket(syscall.Handle(fd), buf) + }) + } else { + n, errno = read(f.tc, buf) + } + if errno != 0 { // Defer validation overhead until we've already had an error. errno = fileError(f, f.closed, errno) } @@ -130,7 +223,14 @@ func (f *winTcpConnFile) Read(buf []byte) (n int, errno sys.Errno) { // Write implements the same method as documented on fsapi.File func (f *winTcpConnFile) Write(buf []byte) (n int, errno sys.Errno) { - if n, errno = write(f.tc, buf); errno != 0 { + if nonBlockingFileWriteSupported && f.IsNonblock() { + return syscallConnControl(f.tc, func(fd uintptr) (int, sys.Errno) { + return writeSocket(fd, buf) + }) + } else { + n, errno = write(f.tc, buf) + } + if errno != 0 { // Defer validation overhead until we've already had an error. errno = fileError(f, f.closed, errno) } @@ -143,22 +243,9 @@ func (f *winTcpConnFile) Recvfrom(p []byte, flags int) (n int, errno sys.Errno) errno = sys.EINVAL return } - conn := f.tc - syscallConn, err := conn.SyscallConn() - if err != nil { - errno = sys.UnwrapOSError(err) - return - } - - // Prioritize the error from recvfrom over Control - if controlErr := syscallConn.Control(func(fd uintptr) { - var recvfromErr error - n, recvfromErr = recvfrom(syscall.Handle(fd), p, MSG_PEEK) - errno = sys.UnwrapOSError(recvfromErr) - }); errno == 0 { - errno = sys.UnwrapOSError(controlErr) - } - return + return syscallConnControl(f.tc, func(fd uintptr) (int, sys.Errno) { + return recvfrom(syscall.Handle(fd), p, MSG_PEEK) + }) } // Shutdown implements the same method as documented on fsapi.Conn From ffa0af63c195741f916b4c9892239bd5463683b4 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 11:33:18 +0200 Subject: [PATCH 02/11] formatting Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows.go | 10 +--------- internal/sysfs/sock_windows.go | 4 ++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index 39c5e69a5d..cbd5d30770 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -187,16 +187,8 @@ func (f *WinSockFdSet) Get(index int) syscall.Handle { return f.handles[index] } -// isSocket returns true if the given file handle -// is a pipe. +// isSocket returns true if the given file handle is a pipe. func isSocket(fd syscall.Handle) bool { - // n, err := syscall.GetFileType(fd) - // if err != nil { - // return false - // } - // if n != syscall.FILE_TYPE_PIPE { - // return false - // } // If the call to GetNamedPipeInfo succeeds then // the handle is a pipe handle, otherwise it is a socket. r, _, errno := syscall.SyscallN( diff --git a/internal/sysfs/sock_windows.go b/internal/sysfs/sock_windows.go index ef912947e0..8e0ab92e54 100644 --- a/internal/sysfs/sock_windows.go +++ b/internal/sysfs/sock_windows.go @@ -179,9 +179,9 @@ type winTcpConnFile struct { tc *net.TCPConn // nonblock is true when the underlying connection is flagged as non-blocking. - // This ensures that reads and writes return EAGAIN without blocking the caller. + // This ensures that reads and writes return sys.EAGAIN without blocking the caller. nonblock bool - // closed is true when closed was called. This ensures proper syscall.EBADF + // closed is true when closed was called. This ensures proper sys.EBADF closed bool } From e6bbb486ab4f40f16b17b67cca5dd20cb7041455 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 11:48:17 +0200 Subject: [PATCH 03/11] peek sockets instead of selecting Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows.go | 14 ++++------ internal/sysfs/select_windows.go | 41 ++++++------------------------ 2 files changed, 13 insertions(+), 42 deletions(-) diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index cbd5d30770..1fa43881ed 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -65,7 +65,7 @@ func (f *FdSet) SetRegular(r WinSockFdSet) { f.regular = r } -// Regular returns a WinSockFdSet containing the handles in this FdSet that are pipes. +// Pipes returns a WinSockFdSet containing the handles in this FdSet that are pipes. func (f *FdSet) Pipes() *WinSockFdSet { if f == nil { return nil @@ -137,10 +137,6 @@ func (f *WinSockFdSet) Set(fd int) { // Clear removes the given fd from the set. func (f *WinSockFdSet) Clear(fd int) { h := syscall.Handle(fd) - if !isSocket(h) { - return - } - for i := uint64(0); i < f.count; i++ { if f.handles[i] == h { for ; i < f.count-1; i++ { @@ -187,15 +183,15 @@ func (f *WinSockFdSet) Get(index int) syscall.Handle { return f.handles[index] } -// isSocket returns true if the given file handle is a pipe. +// isSocket returns true if the given file handle +// is a pipe. func isSocket(fd syscall.Handle) bool { - // If the call to GetNamedPipeInfo succeeds then - // the handle is a pipe handle, otherwise it is a socket. r, _, errno := syscall.SyscallN( procGetNamedPipeInfo.Addr(), + uintptr(fd), uintptr(unsafe.Pointer(nil)), uintptr(unsafe.Pointer(nil)), uintptr(unsafe.Pointer(nil)), uintptr(unsafe.Pointer(nil))) - return r != 0 && errno == 0 + return r == 0 || errno != 0 } diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index c1221b8dd0..690f865572 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -95,26 +95,6 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti afterCh = after.C } - // winsock_select is a blocking call. We spin a goroutine - // and write back to a channel the result. We consume - // this result in the for loop together with the polling - // routines. - type selectResult struct { - n int - errno sys.Errno - } - - winsockSelectCh := make(chan selectResult, 1) - defer close(winsockSelectCh) - - go func() { - res := selectResult{} - res.n, res.errno = winsock_select(rs, ws, es, duration) - winsockSelectCh <- res - }() - - nsocks := 0 - outer: for { select { @@ -125,24 +105,19 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti case <-tickCh: rp, errno = peekAllPipes(r.Pipes()) npipes = rp.Count() - if errno != 0 || npipes > 0 { + if errno != 0 { break outer } - case res := <-winsockSelectCh: - nsocks = res.n - if res.errno != 0 { + + zero := time.Duration(0) + nsocks, errno = winsock_select(rs, ws, es, &zero) + if errno != 0 { break outer } - // winsock_select has returned with no result, ignore - // and wait for the other pipes. - if nsocks == 0 { - continue + + if npipes > 0 || nsocks > 0 { + break outer } - // If select has return successfully we peek for the last time at the other pipes - // to see if data is available and return the sum. - rp, errno = peekAllPipes(r.Pipes()) - npipes = rp.Count() - break outer } } } From d6795b46a5522c7b256200b955e59621ddbdd85c Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 18:02:19 +0200 Subject: [PATCH 04/11] wip Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows.go | 25 +++++ internal/sysfs/select_windows.go | 153 +++++++++++++------------- internal/sysfs/select_windows_test.go | 6 +- 3 files changed, 106 insertions(+), 78 deletions(-) diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index 1fa43881ed..55f23d9b95 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -119,8 +119,30 @@ func (f *FdSet) IsSet(fd int) bool { return false } +func (f *FdSet) Copy() *FdSet { + if f == nil { + return nil + } + return &FdSet{ + sockets: f.sockets, + pipes: f.pipes, + regular: f.regular, + } +} + +// Zero clears the set. +func (f *FdSet) Count() int { + if f == nil { + return 0 + } + return f.sockets.Count() + f.regular.Count() + f.pipes.Count() +} + // Zero clears the set. func (f *FdSet) Zero() { + if f == nil { + return + } f.sockets.Zero() f.regular.Zero() f.pipes.Zero() @@ -161,6 +183,9 @@ func (f *WinSockFdSet) IsSet(fd int) bool { // Zero clears the set. func (f *WinSockFdSet) Zero() { + if f == nil { + return + } f.count = 0 } diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index 690f865572..72da7c0cbc 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -13,6 +13,9 @@ import ( // pollInterval is the interval between each calls to peekNamedPipe in selectAllHandles const pollInterval = 100 * time.Millisecond +// zeroDuration is the zero value for time.Duration. It is used in selectAllHandles. +var zeroDuration = time.Duration(0) + // syscall_select emulates the select syscall on Windows, for a subset of cases. // // r, w, e may contain any number of file handles, but regular files and pipes are only processed for r (Read). @@ -57,96 +60,95 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int // after the invocation of select; thus, this behavior may be subject to change in the future for the sake of simplicity. // // [1]: https://linux.die.net/man/3/select -func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *time.Duration) (int, sys.Errno) { - nregular := r.Regular().Count() + w.Regular().Count() + e.Regular().Count() - - nsocks := 0 - - rp, errno := peekAllPipes(r.Pipes()) - npipes := rp.Count() - - if errno != 0 { - r.Zero() - w.Zero() - e.Zero() - r.SetPipes(*rp) - return nregular + npipes, errno +func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *time.Duration) (n int, errno sys.Errno) { + r2, w2, e2 := r.Copy(), w.Copy(), e.Copy() + + n, errno = peekAllHandles(r2, w2, e2) + // Short circuit when there is an error, there is data or the duration is zero. + if errno != 0 || n > 0 || (duration != nil && *duration == time.Duration(0)) { + update(r, r2) + update(w, w2) + update(e, e2) + return } - // winsock_select mutates the given references, so we create copies. - rs, ws, es := r.Sockets().Copy(), w.Sockets().Copy(), e.Sockets().Copy() - - // Short circuit when the duration is zero. - if duration != nil && *duration == time.Duration(0) { - nsocks, errno = winsock_select(rs, ws, es, duration) - } else { - // Ticker that emits at every pollInterval. - tick := time.NewTicker(pollInterval) - tickCh := tick.C - defer tick.Stop() - - // Timer that expires after the given duration. - // Initialize afterCh as nil: the select below will wait forever. - var afterCh <-chan time.Time - if duration != nil { - // If duration is not nil, instantiate the timer. - after := time.NewTimer(*duration) - defer after.Stop() - afterCh = after.C - } + // Ticker that emits at every pollInterval. + tick := time.NewTicker(pollInterval) + tickCh := tick.C + defer tick.Stop() + + // Timer that expires after the given duration. + // Initialize afterCh as nil: the select below will wait forever. + var afterCh <-chan time.Time + if duration != nil { + // If duration is not nil, instantiate the timer. + after := time.NewTimer(*duration) + defer after.Stop() + afterCh = after.C + } - outer: - for { - select { - case <-ctx.Done(): - break outer - case <-afterCh: +outer: + for { + select { + case <-ctx.Done(): + break outer + case <-afterCh: + break outer + case <-tickCh: + r2, w2, e2 = r.Copy(), w.Copy(), e.Copy() + n, errno = peekAllHandles(r2, w2, e2) + if errno != 0 || n > 0 { break outer - case <-tickCh: - rp, errno = peekAllPipes(r.Pipes()) - npipes = rp.Count() - if errno != 0 { - break outer - } - - zero := time.Duration(0) - nsocks, errno = winsock_select(rs, ws, es, &zero) - if errno != 0 { - break outer - } - - if npipes > 0 || nsocks > 0 { - break outer - } } } } - rr, wr, er := r.Regular().Copy(), w.Regular().Copy(), e.Regular().Copy() + update(r, r2) + update(w, w2) + update(e, e2) + return +} - // Clear all FdSets and set them in accordance to the returned values. +func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) { + // peekAllNonRegularHandles mutates the given references, so we create copies. + r2, w2, e2 := r.Copy(), w.Copy(), e.Copy() + // pipes are not checked on w, e + w2.Pipes().Zero() + e2.Pipes().Zero() - if r != nil { - // Pipes are handled only for r - r.SetPipes(*rp) - r.SetRegular(*rr) - r.SetSockets(*rs) + // peek pipes only for reading + errno := peekAllPipes(r2.Pipes()) + if errno != 0 { + update(r, r2) + update(w, w2) + update(e, e2) + return 0, errno } - if w != nil { - w.SetRegular(*wr) - w.SetSockets(*ws) + nsocks, errno := winsock_select(r.Sockets(), w.Sockets(), e.Sockets(), &zeroDuration) + if errno != 0 { + update(r, r2) + update(w, w2) + update(e, e2) + return 0, errno } - if e != nil { - e.SetRegular(*er) - e.SetSockets(*es) - } + update(r, r2) + update(w, w2) + update(e, e2) - return nregular + npipes + nsocks, errno + return r.Count() + nsocks, 0 +} + +func update(dest, src *platform.FdSet) { + if src != nil { + dest.SetPipes(*src.Pipes()) + dest.SetRegular(*src.Regular()) + dest.SetSockets(*src.Sockets()) + } } -func peekAllPipes(pipeHandles *platform.WinSockFdSet) (*platform.WinSockFdSet, sys.Errno) { +func peekAllPipes(pipeHandles *platform.WinSockFdSet) sys.Errno { ready := &platform.WinSockFdSet{} for i := 0; i < pipeHandles.Count(); i++ { h := pipeHandles.Get(i) @@ -155,10 +157,11 @@ func peekAllPipes(pipeHandles *platform.WinSockFdSet) (*platform.WinSockFdSet, s ready.Set(int(h)) } if errno != 0 { - return ready, sys.UnwrapOSError(errno) + return sys.UnwrapOSError(errno) } } - return ready, 0 + *pipeHandles = *ready + return 0 } func winsock_select(r, w, e *platform.WinSockFdSet, timeout *time.Duration) (int, sys.Errno) { diff --git a/internal/sysfs/select_windows_test.go b/internal/sysfs/select_windows_test.go index 638eb8d295..0fda8188e1 100644 --- a/internal/sysfs/select_windows_test.go +++ b/internal/sysfs/select_windows_test.go @@ -60,7 +60,7 @@ func TestSelect_Windows(t *testing.T) { require.Equal(t, 6, int(n)) }) - t.Run("selectAllHandles should return immediately when duration is nil (no data)", func(t *testing.T) { + t.Run("selectAllHandles should return immediately when duration is zero (no data)", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) @@ -72,7 +72,7 @@ func TestSelect_Windows(t *testing.T) { require.Zero(t, fdSet.Pipes().Count()) }) - t.Run("selectAllHandles should return immediately when duration is nil (data)", func(t *testing.T) { + t.Run("selectAllHandles should return immediately when duration is zero (data)", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) rh := handleAsFdSet(syscall.Handle(r.Fd())) @@ -92,7 +92,7 @@ func TestSelect_Windows(t *testing.T) { require.Equal(t, syscall.Handle(r.Fd()), rh.Pipes().Get(0)) }) - t.Run("selectAllHandles should wait forever when duration is nil", func(t *testing.T) { + t.Run("selectAllHandles should wait forever when duration is nil (no writes)", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) rh := syscall.Handle(r.Fd()) From 4b50041066acde21f3d32284eed0a7246de54ad5 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 18:15:26 +0200 Subject: [PATCH 05/11] wip Signed-off-by: Edoardo Vacchi --- internal/sysfs/select_windows.go | 39 ++++++++++++-------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index 72da7c0cbc..af104d4d77 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -62,7 +62,6 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int // [1]: https://linux.die.net/man/3/select func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *time.Duration) (n int, errno sys.Errno) { r2, w2, e2 := r.Copy(), w.Copy(), e.Copy() - n, errno = peekAllHandles(r2, w2, e2) // Short circuit when there is an error, there is data or the duration is zero. if errno != 0 || n > 0 || (duration != nil && *duration == time.Duration(0)) { @@ -87,56 +86,48 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti afterCh = after.C } -outer: for { select { case <-ctx.Done(): - break outer + r.Zero() + w.Zero() + e.Zero() + return case <-afterCh: - break outer + r.Zero() + w.Zero() + e.Zero() + return case <-tickCh: r2, w2, e2 = r.Copy(), w.Copy(), e.Copy() n, errno = peekAllHandles(r2, w2, e2) if errno != 0 || n > 0 { - break outer + update(r, r2) + update(w, w2) + update(e, e2) + return } } } - update(r, r2) - update(w, w2) - update(e, e2) - return } func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) { - // peekAllNonRegularHandles mutates the given references, so we create copies. - r2, w2, e2 := r.Copy(), w.Copy(), e.Copy() // pipes are not checked on w, e - w2.Pipes().Zero() - e2.Pipes().Zero() + w.Pipes().Zero() + e.Pipes().Zero() // peek pipes only for reading - errno := peekAllPipes(r2.Pipes()) + errno := peekAllPipes(r.Pipes()) if errno != 0 { - update(r, r2) - update(w, w2) - update(e, e2) return 0, errno } nsocks, errno := winsock_select(r.Sockets(), w.Sockets(), e.Sockets(), &zeroDuration) if errno != 0 { - update(r, r2) - update(w, w2) - update(e, e2) return 0, errno } - update(r, r2) - update(w, w2) - update(e, e2) - return r.Count() + nsocks, 0 } From 301d3242a3c2fdfe6490caf57a9c3e2c04bf583d Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 18:56:04 +0200 Subject: [PATCH 06/11] wip Signed-off-by: Edoardo Vacchi --- experimental/sys/syscall_errno_windows.go | 6 +++- internal/sysfs/select_windows_test.go | 34 +++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/experimental/sys/syscall_errno_windows.go b/experimental/sys/syscall_errno_windows.go index 522bd2cac3..4c512133a4 100644 --- a/experimental/sys/syscall_errno_windows.go +++ b/experimental/sys/syscall_errno_windows.go @@ -22,6 +22,10 @@ const ( // _ERROR_DIRECTORY is a Windows error returned by syscall.Rmdir // instead of syscall.ENOTDIR _ERROR_DIRECTORY = syscall.Errno(0x10B) + + // _ERROR_INVALID_SOCKET is a Windows error returned by winsock_select + // when a given handle is not a socket. + _ERROR_INVALID_SOCKET = syscall.Errno(0x2736) ) func errorToErrno(err error) Errno { @@ -39,7 +43,7 @@ func errorToErrno(err error) Errno { return ENOTEMPTY case syscall.ERROR_FILE_EXISTS: return EEXIST - case _ERROR_INVALID_HANDLE: + case _ERROR_INVALID_HANDLE, _ERROR_INVALID_SOCKET: return EBADF case syscall.ERROR_ACCESS_DENIED: // POSIX read and write functions expect EBADF, not EACCES when not diff --git a/internal/sysfs/select_windows_test.go b/internal/sysfs/select_windows_test.go index 0fda8188e1..83ef46fefe 100644 --- a/internal/sysfs/select_windows_test.go +++ b/internal/sysfs/select_windows_test.go @@ -37,6 +37,20 @@ func TestSelect_Windows(t *testing.T) { close(ch) } + t.Run("syscall_select returns sys.ENOSYS when n == 0 and duration is nil", func(t *testing.T) { + n, errno := syscall_select(0, nil, nil, nil, nil) + require.Equal(t, -1, n) + require.EqualErrno(t, sys.ENOSYS, errno) + }) + + t.Run("syscall_select propagates error when peekAllPipes returns an error", func(t *testing.T) { + fdSet := platform.FdSet{} + fdSet.Pipes().Set(-1) + n, errno := syscall_select(0, &fdSet, nil, nil, nil) + require.Equal(t, -1, n) + require.EqualErrno(t, sys.ENOSYS, errno) + }) + t.Run("peekNamedPipe should report the correct state of incoming data in the pipe", func(t *testing.T) { r, w, err := os.Pipe() require.NoError(t, err) @@ -60,6 +74,26 @@ func TestSelect_Windows(t *testing.T) { require.Equal(t, 6, int(n)) }) + t.Run("peekAllPipes should return an error on invalid handle", func(t *testing.T) { + fdSet := platform.WinSockFdSet{} + fdSet.Set(int(-1)) + err := peekAllPipes(&fdSet) + require.EqualErrno(t, sys.EBADF, err) + }) + + t.Run("peekAllHandles should return an error on invalid handle", func(t *testing.T) { + fdSet := platform.FdSet{} + fdSet.Pipes().Set(-1) + n, err := peekAllHandles(&fdSet, nil, nil) + require.EqualErrno(t, sys.EBADF, err) + require.Equal(t, 0, n) + fdSet.Pipes().Zero() + fdSet.Sockets().Set(-1) + n, err = peekAllHandles(&fdSet, nil, nil) + require.EqualErrno(t, sys.EBADF, err) + require.Equal(t, 0, n) + }) + t.Run("selectAllHandles should return immediately when duration is zero (no data)", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) From 0bbeb68b5ff6c42353cf660607b86c797067f997 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 19:04:31 +0200 Subject: [PATCH 07/11] test Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows_test.go | 109 ++++++++++++++++++++++++ internal/sysfs/select_windows.go | 1 - 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 internal/platform/fdset_windows_test.go diff --git a/internal/platform/fdset_windows_test.go b/internal/platform/fdset_windows_test.go new file mode 100644 index 0000000000..3e56b21b8e --- /dev/null +++ b/internal/platform/fdset_windows_test.go @@ -0,0 +1,109 @@ +package platform + +import ( + "syscall" + "testing" + + "github.com/tetratelabs/wazero/internal/testing/require" +) + +func TestWinSockFdSet(t *testing.T) { + allSet := WinSockFdSet{ + count: _FD_SETSIZE, + } + for i := 0; i < _FD_SETSIZE; i++ { + allSet.handles[i] = syscall.Handle(i) + } + + tests := []struct { + name string + init WinSockFdSet + exec func(fdSet *WinSockFdSet) + expected WinSockFdSet + }{ + { + name: "all fields set", + exec: func(fdSet *WinSockFdSet) { + for fd := 0; fd < _FD_SETSIZE; fd++ { + fdSet.Set(fd) + } + }, + expected: allSet, + }, + { + name: "all bits cleared", + init: allSet, + exec: func(fdSet *WinSockFdSet) { + for fd := 0; fd < _FD_SETSIZE; fd++ { + fdSet.Clear(fd) + } + }, + expected: WinSockFdSet{}, + }, + { + name: "zero should clear all bits", + init: allSet, + exec: func(fdSet *WinSockFdSet) { + fdSet.Zero() + }, + expected: WinSockFdSet{}, + }, + { + name: "is-set should return true for all bits", + init: allSet, + exec: func(fdSet *WinSockFdSet) { + for i := 0; i < fdSet.Count(); i++ { + require.True(t, fdSet.IsSet(i)) + } + }, + expected: allSet, + }, + { + name: "is-set should return true for all odd bits", + init: WinSockFdSet{}, + exec: func(fdSet *WinSockFdSet) { + for fd := 1; fd < _FD_SETSIZE; fd += 2 { + fdSet.Set(fd) + } + for fd := 0; fd < _FD_SETSIZE; fd++ { + isSet := fdSet.IsSet(fd) + if fd&0x1 == 0x1 { + require.True(t, isSet) + } else { + require.False(t, isSet) + } + } + fdSet.Zero() + }, + expected: WinSockFdSet{}, + }, + { + name: "should clear all even bits", + init: allSet, + exec: func(fdSet *WinSockFdSet) { + for fd := 0; fd < _FD_SETSIZE; fd += 2 { + fdSet.Clear(fd) + } + for fd := 0; fd < _FD_SETSIZE; fd++ { + isSet := fdSet.IsSet(fd) + if fd&0x1 == 0x1 { + require.True(t, isSet) + } else { + require.False(t, isSet) + } + } + fdSet.Zero() + }, + expected: WinSockFdSet{}, + }, + } + + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + x := tc.init + tc.exec(&x) + require.Equal(t, tc.expected, x) + }) + } +} diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index af104d4d77..a0f58c3c14 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -109,7 +109,6 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti } } } - } func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) { From 7bbba14244ceda1a1b57d5f8c392c79e66473e28 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 19:34:08 +0200 Subject: [PATCH 08/11] more tests Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows.go | 1 + internal/platform/fdset_windows_test.go | 61 ++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index 55f23d9b95..78a51b40e6 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -186,6 +186,7 @@ func (f *WinSockFdSet) Zero() { if f == nil { return } + f.handles = [64]syscall.Handle{} f.count = 0 } diff --git a/internal/platform/fdset_windows_test.go b/internal/platform/fdset_windows_test.go index 3e56b21b8e..90ca5ec80d 100644 --- a/internal/platform/fdset_windows_test.go +++ b/internal/platform/fdset_windows_test.go @@ -1,6 +1,8 @@ package platform import ( + "net" + "os" "syscall" "testing" @@ -14,6 +16,15 @@ func TestWinSockFdSet(t *testing.T) { for i := 0; i < _FD_SETSIZE; i++ { allSet.handles[i] = syscall.Handle(i) } + shiftedFields := WinSockFdSet{ + count: _FD_SETSIZE - 1, + } + for i := 0; i < _FD_SETSIZE; i++ { + shiftedFields.handles[i] = syscall.Handle(i) + } + for i := _FD_SETSIZE / 2; i < _FD_SETSIZE-1; i++ { + shiftedFields.handles[i] = syscall.Handle(i + 1) + } tests := []struct { name string @@ -31,17 +42,15 @@ func TestWinSockFdSet(t *testing.T) { expected: allSet, }, { - name: "all bits cleared", + name: "clear should shift all fields by one position", init: allSet, exec: func(fdSet *WinSockFdSet) { - for fd := 0; fd < _FD_SETSIZE; fd++ { - fdSet.Clear(fd) - } + fdSet.Clear(_FD_SETSIZE / 2) }, - expected: WinSockFdSet{}, + expected: shiftedFields, }, { - name: "zero should clear all bits", + name: "zero should clear all fields", init: allSet, exec: func(fdSet *WinSockFdSet) { fdSet.Zero() @@ -49,7 +58,7 @@ func TestWinSockFdSet(t *testing.T) { expected: WinSockFdSet{}, }, { - name: "is-set should return true for all bits", + name: "is-set should return true for all fields", init: allSet, exec: func(fdSet *WinSockFdSet) { for i := 0; i < fdSet.Count(); i++ { @@ -107,3 +116,41 @@ func TestWinSockFdSet(t *testing.T) { }) } } + +func TestFdSet(t *testing.T) { + t.Run("A pipe should be set in FdSet.Pipe", func(t *testing.T) { + r, _, _ := os.Pipe() + defer r.Close() + + fdSet := FdSet{} + fdSet.Set(int(r.Fd())) + + require.Equal(t, syscall.Handle(r.Fd()), fdSet.Pipes().Get(0)) + }) + + t.Run("A regular file should be set in FdSet.Regular", func(t *testing.T) { + f, err := os.Open(t.TempDir()) + require.NoError(t, err) + defer f.Close() + + fdSet := FdSet{} + fdSet.Set(int(f.Fd())) + + require.Equal(t, syscall.Handle(f.Fd()), fdSet.Regular().Get(0)) + }) + + t.Run("A socket should be set in FdSet.Socket", func(t *testing.T) { + listen, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listen.Close() + + conn, err := listen.(*net.TCPListener).SyscallConn() + require.NoError(t, err) + + conn.Control(func(fd uintptr) { + fdSet := FdSet{} + fdSet.Set(int(fd)) + require.Equal(t, syscall.Handle(fd), fdSet.Sockets().Get(0)) + }) + }) +} From 465d2143efc85e19b53e6015e6f55c5ad883c975 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 19:51:07 +0200 Subject: [PATCH 09/11] coverage Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows_test.go | 2 +- internal/sysfs/select_windows.go | 4 +- internal/sysfs/select_windows_test.go | 72 +++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/internal/platform/fdset_windows_test.go b/internal/platform/fdset_windows_test.go index 90ca5ec80d..c77d95ee69 100644 --- a/internal/platform/fdset_windows_test.go +++ b/internal/platform/fdset_windows_test.go @@ -129,7 +129,7 @@ func TestFdSet(t *testing.T) { }) t.Run("A regular file should be set in FdSet.Regular", func(t *testing.T) { - f, err := os.Open(t.TempDir()) + f, err := os.CreateTemp(t.TempDir(), "test") require.NoError(t, err) defer f.Close() diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index a0f58c3c14..b8b0a08634 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -122,12 +122,12 @@ func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) { return 0, errno } - nsocks, errno := winsock_select(r.Sockets(), w.Sockets(), e.Sockets(), &zeroDuration) + _, errno = winsock_select(r.Sockets(), w.Sockets(), e.Sockets(), &zeroDuration) if errno != 0 { return 0, errno } - return r.Count() + nsocks, 0 + return r.Count() + w.Count() + e.Count(), 0 } func update(dest, src *platform.FdSet) { diff --git a/internal/sysfs/select_windows_test.go b/internal/sysfs/select_windows_test.go index 83ef46fefe..1347fb008a 100644 --- a/internal/sysfs/select_windows_test.go +++ b/internal/sysfs/select_windows_test.go @@ -2,6 +2,7 @@ package sysfs import ( "context" + "net" "os" "syscall" "testing" @@ -94,6 +95,77 @@ func TestSelect_Windows(t *testing.T) { require.Equal(t, 0, n) }) + t.Run("peekAllHandles should return successfully with a regular file", func(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "test") + require.NoError(t, err) + defer f.Close() + + fdSet := platform.FdSet{} + fdSet.Set(int(f.Fd())) + + n, errno := peekAllHandles(&fdSet, nil, nil) + require.Zero(t, errno) + require.Equal(t, 1, n) + require.Equal(t, syscall.Handle(f.Fd()), fdSet.Regular().Get(0)) + }) + + t.Run("peekAllHandles should return successfully with a pipe", func(t *testing.T) { + r, w, err := os.Pipe() + require.NoError(t, err) + defer r.Close() + defer w.Close() + + fdSet := platform.FdSet{} + fdSet.Set(int(r.Fd())) + + n, errno := peekAllHandles(&fdSet, nil, nil) + require.Zero(t, errno) + require.Equal(t, 0, n) + require.Equal(t, 0, fdSet.Pipes().Count()) + + w.Write([]byte("wazero")) + fdSet.Set(int(r.Fd())) + n, errno = peekAllHandles(&fdSet, nil, nil) + require.Zero(t, errno) + require.Equal(t, 1, n) + require.Equal(t, syscall.Handle(r.Fd()), fdSet.Pipes().Get(0)) + }) + + t.Run("peekAllHandles should return successfully with a socket", func(t *testing.T) { + listen, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listen.Close() + + conn, err := listen.(*net.TCPListener).SyscallConn() + require.NoError(t, err) + + fdSet := platform.FdSet{} + conn.Control(func(fd uintptr) { + fdSet.Set(int(fd)) + }) + + n, errno := peekAllHandles(&fdSet, nil, nil) + require.Zero(t, errno) + require.Equal(t, 0, n) + require.Equal(t, 0, fdSet.Sockets().Count()) + + tcpAddr, err := net.ResolveTCPAddr("tcp", listen.Addr().String()) + require.NoError(t, err) + tcp, err := net.DialTCP("tcp", nil, tcpAddr) + require.NoError(t, err) + tcp.Write([]byte("wazero")) + + conn.Control(func(fd uintptr) { + fdSet.Set(int(fd)) + }) + n, errno = peekAllHandles(&fdSet, nil, nil) + require.Zero(t, errno) + require.Equal(t, 1, n) + conn.Control(func(fd uintptr) { + require.Equal(t, syscall.Handle(fd), fdSet.Sockets().Get(0)) + }) + }) + t.Run("selectAllHandles should return immediately when duration is zero (no data)", func(t *testing.T) { r, _, err := os.Pipe() require.NoError(t, err) From 7936d97f865627727e66e21dfbc0b71d5d57c3f0 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 22:35:33 +0200 Subject: [PATCH 10/11] docs Signed-off-by: Edoardo Vacchi --- RATIONALE.md | 40 ++++++++++++++++----------- internal/platform/fdset_windows.go | 43 +++++++++++++++++++----------- internal/sysfs/select_windows.go | 20 +++++--------- 3 files changed, 59 insertions(+), 44 deletions(-) diff --git a/RATIONALE.md b/RATIONALE.md index 8952ced9e1..32a0da20e6 100644 --- a/RATIONALE.md +++ b/RATIONALE.md @@ -1273,7 +1273,7 @@ However, if the reader is detected to read from `os.Stdin`, a special code path is followed, invoking `platform.Select()`. `platform.Select()` is a wrapper for `select(2)` on POSIX systems, -and it is mocked for a handful of cases also on Windows. +and it is emulated on Windows. ### Select on POSIX @@ -1303,25 +1303,35 @@ unless data becomes available on `Stdin` itself. ### Select on Windows -On Windows the `platform.Select()` is much more straightforward, -and it really just replicates the behavior found in the general cases -for `FdRead` subscriptions: in other words, the subscription to `Stdin` -is immediately acknowledged. +On Windows `platform.Select()` cannot be delegated to a single +syscall, because there is no single syscall to handle sockets, +pipes and regular files. -The implementation also support a timeout, but in this case -it relies on `time.Sleep()`, which notably, as compared to the POSIX -case, interruptible and compatible with goroutines. +Instead, we emulate its behavior for the cases that are currently +of interest. -However, because `Stdin` subscriptions are always acknowledged -without wait and because this code path is always followed only -when at least one `Stdin` subscription is present, then the -timeout is effectively always handled externally. +- For regular files, we _always_ report them as ready, as +[most operating systems do anyway][async-io-windows]. -In any case, the behavior of `platform.Select` on Windows -is sensibly different from the behavior on POSIX platforms; -we plan to refine and further align it in semantics in the future. +- For pipes, we iterate on the given `readfds` +and we invoke [`PeekNamedPipe`][peeknamedpipe]. We currently ignore +`writefds` and `exceptfds` for pipes. In particular, +`Stdin`, when present, is set to the `readfds` FdSet. + +- Notably, we include also support for sockets using the [WinSock +implementation of `select`][winsock-select], but instead +of relying on the timeout argument of the `select` function, +we set a 0-duration timeout so that it behaves like a peek. + +This way, we can check for regular files all at once, +at the beginning of the function, then we poll pipes and +sockets periodically using a cancellable `time.Tick`, +which plays nicely with the rest of the Go runtime. [poll_oneoff]: https://github.com/WebAssembly/wasi-poll#why-is-the-function-called-poll_oneoff +[async-io-windows]: https://tinyclouds.org/iocp_links +[peeknamedpipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe +[winsock-select]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select ## Signed encoding of integer global constant initializers diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index 78a51b40e6..7f75888a6e 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -33,14 +33,21 @@ type WinSockFdSet struct { // Note: the implementation is very different from POSIX; Windows provides // POSIX select only for sockets. We emulate a select for other APIs in the sysfs // package, but we still want to use the "real" select in the case of sockets. -// So, we keep a separate FdSet of sockets, so that we can pass it directly -// to the winsock select implementation +// So, we keep separate FdSets for sockets, pipes and regular files, so that we can +// handle them separately. For instance sockets can be used directly in winsock select. type FdSet struct { sockets WinSockFdSet pipes WinSockFdSet regular WinSockFdSet } +// SetAll overwrites all the fields in f with the fields in g. +func (f *FdSet) SetAll(g *FdSet) { + f.sockets = g.sockets + f.pipes = g.pipes + f.regular = g.regular +} + // Sockets returns a WinSockFdSet containing the handles in this FdSet that are sockets. func (f *FdSet) Sockets() *WinSockFdSet { if f == nil { @@ -49,10 +56,6 @@ func (f *FdSet) Sockets() *WinSockFdSet { return &f.sockets } -func (f *FdSet) SetSockets(s WinSockFdSet) { - f.sockets = s -} - // Regular returns a WinSockFdSet containing the handles in this FdSet that are regular files. func (f *FdSet) Regular() *WinSockFdSet { if f == nil { @@ -61,10 +64,6 @@ func (f *FdSet) Regular() *WinSockFdSet { return &f.regular } -func (f *FdSet) SetRegular(r WinSockFdSet) { - f.regular = r -} - // Pipes returns a WinSockFdSet containing the handles in this FdSet that are pipes. func (f *FdSet) Pipes() *WinSockFdSet { if f == nil { @@ -73,10 +72,20 @@ func (f *FdSet) Pipes() *WinSockFdSet { return &f.pipes } -func (f *FdSet) SetPipes(p WinSockFdSet) { - f.pipes = p -} - +// getFdSetFor returns a pointer to the right fd set for the given fd. +// It checks the type for fd and returns the field for pipes, regular or sockets +// to simplify code. +// +// For instance, if fd is a socket and it must be set if f.pipes, instead +// of writing: +// +// if isSocket(fd) { +// f.sockets.Set(fd) +// } +// +// It is possible to write: +// +// f.getFdSetFor(fd).Set(fd) func (f *FdSet) getFdSetFor(fd int) *WinSockFdSet { h := syscall.Handle(fd) t, err := syscall.GetFileType(h) @@ -119,6 +128,7 @@ func (f *FdSet) IsSet(fd int) bool { return false } +// Copy returns a copy of this FdSet. It returns nil, if the FdSet is nil. func (f *FdSet) Copy() *FdSet { if f == nil { return nil @@ -130,7 +140,7 @@ func (f *FdSet) Copy() *FdSet { } } -// Zero clears the set. +// Zero clears the set. It returns 0 if the FdSet is nil. func (f *FdSet) Count() int { if f == nil { return 0 @@ -190,6 +200,7 @@ func (f *WinSockFdSet) Zero() { f.count = 0 } +// Count returns the number of values that are set in the fd set. func (f *WinSockFdSet) Count() int { if f == nil { return 0 @@ -197,6 +208,7 @@ func (f *WinSockFdSet) Count() int { return int(f.count) } +// Copy returns a copy of the fd set or nil if it is nil. func (f *WinSockFdSet) Copy() *WinSockFdSet { if f == nil { return nil @@ -205,6 +217,7 @@ func (f *WinSockFdSet) Copy() *WinSockFdSet { return © } +// Get returns the handle at the given index. func (f *WinSockFdSet) Get(index int) syscall.Handle { return f.handles[index] } diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index b8b0a08634..b5c1a1bdb1 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -65,9 +65,9 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti n, errno = peekAllHandles(r2, w2, e2) // Short circuit when there is an error, there is data or the duration is zero. if errno != 0 || n > 0 || (duration != nil && *duration == time.Duration(0)) { - update(r, r2) - update(w, w2) - update(e, e2) + r.SetAll(r2) + w.SetAll(w2) + e.SetAll(e2) return } @@ -102,9 +102,9 @@ func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *ti r2, w2, e2 = r.Copy(), w.Copy(), e.Copy() n, errno = peekAllHandles(r2, w2, e2) if errno != 0 || n > 0 { - update(r, r2) - update(w, w2) - update(e, e2) + r.SetAll(r2) + w.SetAll(w2) + e.SetAll(e2) return } } @@ -130,14 +130,6 @@ func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) { return r.Count() + w.Count() + e.Count(), 0 } -func update(dest, src *platform.FdSet) { - if src != nil { - dest.SetPipes(*src.Pipes()) - dest.SetRegular(*src.Regular()) - dest.SetSockets(*src.Sockets()) - } -} - func peekAllPipes(pipeHandles *platform.WinSockFdSet) sys.Errno { ready := &platform.WinSockFdSet{} for i := 0; i < pipeHandles.Count(); i++ { From 38218dbb0702aac519784c6e6f06b497b6f8bc6a Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Tue, 18 Jul 2023 22:46:04 +0200 Subject: [PATCH 11/11] fix nil Signed-off-by: Edoardo Vacchi --- internal/platform/fdset_windows.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/platform/fdset_windows.go b/internal/platform/fdset_windows.go index 7f75888a6e..60773ed54a 100644 --- a/internal/platform/fdset_windows.go +++ b/internal/platform/fdset_windows.go @@ -43,6 +43,9 @@ type FdSet struct { // SetAll overwrites all the fields in f with the fields in g. func (f *FdSet) SetAll(g *FdSet) { + if f == nil { + return + } f.sockets = g.sockets f.pipes = g.pipes f.regular = g.regular