From 335fea292bacd4d670f4d2ae395c9590b5d451b2 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Sun, 9 Jul 2023 22:16:12 +0200 Subject: [PATCH] wasi: nonblocking pipes on Windows Improve #1500 with a simple observation, i.e. we really need non-blocking I/O only for pipes and sockets. Now, for sockets we can use WinSock select, as for pipes, select_windows.go already imports PeekNamedPipe. We combine PeekNamedPipe with a blocking Read only for those cases when PeekNamedPipe returns n > 0, which means a Read of n bytes won't block. We special case n == 0 to return EAGAIN and ERROR_BROKEN_PIPE as a non-failure (EOF reached). We also introduce `isNonblock(f)` function, specialized on Windows to also check if the file is ModeNamedPipe; otherwise we would default to blocking anyway. Signed-off-by: Edoardo Vacchi --- .../wasi_stdlib_test.go | 38 +++++++++--- internal/sysfs/file_test.go | 6 +- internal/sysfs/file_unsupported.go | 2 +- internal/sysfs/file_windows.go | 61 +++++++++++++++++++ internal/sysfs/nonblock_unix.go | 10 ++- internal/sysfs/nonblock_windows.go | 18 +++++- internal/sysfs/osfile.go | 2 +- internal/sysfs/select_windows.go | 29 +-------- internal/sysfs/select_windows_test.go | 8 +-- 9 files changed, 128 insertions(+), 46 deletions(-) create mode 100644 internal/sysfs/file_windows.go diff --git a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go index 556fc1d6a0..2fe9babe25 100644 --- a/imports/wasi_snapshot_preview1/wasi_stdlib_test.go +++ b/imports/wasi_snapshot_preview1/wasi_stdlib_test.go @@ -498,18 +498,42 @@ func Test_Stdin(t *testing.T) { } func testStdin(t *testing.T, bin []byte) { - r, w, err := os.Pipe() + stdinReader, stdinWriter, err := os.Pipe() + require.NoError(t, err) + stdoutReader, stdoutWriter, err := os.Pipe() + require.NoError(t, err) + defer func() { + stdinReader.Close() + stdinWriter.Close() + stdoutReader.Close() + stdoutReader.Close() + }() require.NoError(t, err) moduleConfig := wazero.NewModuleConfig(). WithSysNanotime(). // poll_oneoff requires nanotime. WithArgs("wasi", "stdin"). - WithStdin(r) - ch := make(chan string, 1) + WithStdin(stdinReader). + WithStdout(stdoutWriter) + ch := make(chan struct{}, 1) go func() { - ch <- compileAndRun(t, testCtx, moduleConfig, bin) + defer close(ch) + + r := wazero.NewRuntime(testCtx) + defer r.Close(testCtx) + _, err := wasi_snapshot_preview1.Instantiate(testCtx, r) + require.NoError(t, err) + _, err = r.InstantiateWithConfig(testCtx, bin, moduleConfig) + require.NoError(t, err) }() + time.Sleep(1 * time.Second) - _, _ = w.WriteString("foo") - s := <-ch - require.Equal(t, "waiting for stdin...\nfoo", s) + buf := make([]byte, 21) + _, _ = stdoutReader.Read(buf) + require.Equal(t, "waiting for stdin...\n", string(buf)) + _, _ = stdinWriter.WriteString("foo") + _ = stdinWriter.Close() + buf = make([]byte, 3) + _, _ = stdoutReader.Read(buf) + require.Equal(t, "foo", string(buf)) + <-ch } diff --git a/internal/sysfs/file_test.go b/internal/sysfs/file_test.go index 6fba3b6c7b..df02bf5b5c 100644 --- a/internal/sysfs/file_test.go +++ b/internal/sysfs/file_test.go @@ -87,11 +87,7 @@ func TestReadFdNonblock(t *testing.T) { // Read from the file without ever writing to it should not block. buf := make([]byte, 8) _, e := readFd(fd, buf) - if runtime.GOOS == "windows" { - require.EqualErrno(t, syscall.ENOSYS, e) - } else { - require.EqualErrno(t, syscall.EAGAIN, e) - } + require.EqualErrno(t, syscall.EAGAIN, e) } func TestFileSetAppend(t *testing.T) { diff --git a/internal/sysfs/file_unsupported.go b/internal/sysfs/file_unsupported.go index cb4bddb339..3b45d7bcd5 100644 --- a/internal/sysfs/file_unsupported.go +++ b/internal/sysfs/file_unsupported.go @@ -1,4 +1,4 @@ -//go:build !unix && !linux && !darwin +//go:build !unix && !linux && !darwin && !windows package sysfs diff --git a/internal/sysfs/file_windows.go b/internal/sysfs/file_windows.go new file mode 100644 index 0000000000..448726fe68 --- /dev/null +++ b/internal/sysfs/file_windows.go @@ -0,0 +1,61 @@ +package sysfs + +import ( + "syscall" + "unsafe" + + "github.com/tetratelabs/wazero/internal/platform" +) + +const NonBlockingFileIoSupported = true + +var kernel32 = syscall.NewLazyDLL("kernel32.dll") + +// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe +var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe") + +// readFd returns ENOSYS on unsupported platforms. +// +// PeekNamedPipe: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe +// "GetFileType can assist in determining what device type the handle refers to. A console handle presents as FILE_TYPE_CHAR." +// https://learn.microsoft.com/en-us/windows/console/console-handles +func readFd(fd uintptr, buf []byte) (int, syscall.Errno) { + handle := syscall.Handle(fd) + fileType, err := syscall.GetFileType(syscall.Stdin) + if err != nil { + return 0, platform.UnwrapOSError(err) + } + if fileType&syscall.FILE_TYPE_CHAR == 0 { + return -1, syscall.ENOSYS + } + n, err := peekNamedPipe(handle) + if err != nil { + errno := platform.UnwrapOSError(err) + if errno == syscall.ERROR_BROKEN_PIPE { + return 0, 0 + } + } + if n == 0 { + return -1, syscall.EAGAIN + } + un, err := syscall.Read(handle, buf[0:n]) + return un, platform.UnwrapOSError(err) +} + +// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API +// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe +func peekNamedPipe(handle syscall.Handle) (uint32, error) { + var totalBytesAvail uint32 + totalBytesPtr := unsafe.Pointer(&totalBytesAvail) + _, _, err := procPeekNamedPipe.Call( + uintptr(handle), // [in] HANDLE hNamedPipe, + 0, // [out, optional] LPVOID lpBuffer, + 0, // [in] DWORD nBufferSize, + 0, // [out, optional] LPDWORD lpBytesRead + uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail, + 0) // [out, optional] LPDWORD lpBytesLeftThisMessage + if err == syscall.Errno(0) { + return totalBytesAvail, nil + } + return totalBytesAvail, err +} diff --git a/internal/sysfs/nonblock_unix.go b/internal/sysfs/nonblock_unix.go index 1ac13e5397..fdee885aa3 100644 --- a/internal/sysfs/nonblock_unix.go +++ b/internal/sysfs/nonblock_unix.go @@ -2,8 +2,16 @@ package sysfs -import "syscall" +import ( + "syscall" + + "github.com/tetratelabs/wazero/internal/fsapi" +) func setNonblock(fd uintptr, enable bool) error { return syscall.SetNonblock(int(fd), enable) } + +func isNonblock(f *osFile) bool { + return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK +} diff --git a/internal/sysfs/nonblock_windows.go b/internal/sysfs/nonblock_windows.go index d4a29ac331..3acbf2721f 100644 --- a/internal/sysfs/nonblock_windows.go +++ b/internal/sysfs/nonblock_windows.go @@ -2,8 +2,24 @@ package sysfs -import "syscall" +import ( + "io/fs" + "syscall" + + "github.com/tetratelabs/wazero/internal/fsapi" +) func setNonblock(fd uintptr, enable bool) error { + // We invoke the syscall, but this is currently no-op. return syscall.SetNonblock(syscall.Handle(fd), enable) } + +func isNonblock(f *osFile) bool { + // On Windows, we support non-blocking reads only on named pipes. + isValid := false + st, errno := f.Stat() + if errno == 0 { + isValid = st.Mode&fs.ModeNamedPipe != 0 + } + return isValid && f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK +} diff --git a/internal/sysfs/osfile.go b/internal/sysfs/osfile.go index fa31dc36ca..f4740fe314 100644 --- a/internal/sysfs/osfile.go +++ b/internal/sysfs/osfile.go @@ -106,7 +106,7 @@ func (f *osFile) reopen() (errno syscall.Errno) { // IsNonblock implements the same method as documented on fsapi.File func (f *osFile) IsNonblock() bool { - return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK + return isNonblock(f) } // SetNonblock implements the same method as documented on fsapi.File diff --git a/internal/sysfs/select_windows.go b/internal/sysfs/select_windows.go index 811c648d60..387b0e75fe 100644 --- a/internal/sysfs/select_windows.go +++ b/internal/sysfs/select_windows.go @@ -4,7 +4,6 @@ import ( "context" "syscall" "time" - "unsafe" "github.com/tetratelabs/wazero/internal/platform" ) @@ -16,11 +15,6 @@ const wasiFdStdin = 0 // pollInterval is the interval between each calls to peekNamedPipe in pollNamedPipe const pollInterval = 100 * time.Millisecond -var kernel32 = syscall.NewLazyDLL("kernel32.dll") - -// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe -var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe") - // syscall_select emulates the select syscall on Windows for two, well-known cases, returns syscall.ENOSYS for all others. // If r contains fd 0, and it is a regular file, then it immediately returns 1 (data ready on stdin) // and r will have the fd 0 bit set. @@ -72,7 +66,8 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *time.Duration) (bool, error) { // Short circuit when the duration is zero. if duration != nil && *duration == time.Duration(0) { - return peekNamedPipe(pipeHandle) + bytes, err := peekNamedPipe(pipeHandle) + return bytes > 0, err } // Ticker that emits at every pollInterval. @@ -101,27 +96,9 @@ func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *tim if err != nil { return false, err } - if res { + if res > 0 { return true, nil } } } } - -// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API -// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe -func peekNamedPipe(handle syscall.Handle) (bool, error) { - var totalBytesAvail uint32 - totalBytesPtr := unsafe.Pointer(&totalBytesAvail) - _, _, err := procPeekNamedPipe.Call( - uintptr(handle), // [in] HANDLE hNamedPipe, - 0, // [out, optional] LPVOID lpBuffer, - 0, // [in] DWORD nBufferSize, - 0, // [out, optional] LPDWORD lpBytesRead - uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail, - 0) // [out, optional] LPDWORD lpBytesLeftThisMessage - if err == syscall.Errno(0) { - return totalBytesAvail > 0, nil - } - return totalBytesAvail > 0, err -} diff --git a/internal/sysfs/select_windows_test.go b/internal/sysfs/select_windows_test.go index 8a53565aee..fc82261d19 100644 --- a/internal/sysfs/select_windows_test.go +++ b/internal/sysfs/select_windows_test.go @@ -33,9 +33,9 @@ func TestSelect_Windows(t *testing.T) { wh := syscall.Handle(w.Fd()) // Ensure the pipe has data. - hasData, err := peekNamedPipe(rh) + n, err := peekNamedPipe(rh) require.NoError(t, err) - require.False(t, hasData) + require.NotEqual(t, 0, n) // Write to the channel. msg, err := syscall.ByteSliceFromString("test\n") @@ -44,9 +44,9 @@ func TestSelect_Windows(t *testing.T) { require.NoError(t, err) // Ensure the pipe has data. - hasData, err = peekNamedPipe(rh) + n, err = peekNamedPipe(rh) require.NoError(t, err) - require.True(t, hasData) + require.NotEqual(t, 0, n) }) t.Run("pollNamedPipe should return immediately when duration is nil (no data)", func(t *testing.T) {