Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi: nonblocking pipes on Windows #1570

Merged
merged 1 commit into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions imports/wasi_snapshot_preview1/wasi_stdlib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,18 +498,42 @@ func Test_Stdin(t *testing.T) {
}

func testStdin(t *testing.T, bin []byte) {
r, w, err := os.Pipe()
stdinReader, stdinWriter, err := os.Pipe()
require.NoError(t, err)
stdoutReader, stdoutWriter, err := os.Pipe()
require.NoError(t, err)
defer func() {
stdinReader.Close()
stdinWriter.Close()
stdoutReader.Close()
stdoutReader.Close()
}()
require.NoError(t, err)
moduleConfig := wazero.NewModuleConfig().
WithSysNanotime(). // poll_oneoff requires nanotime.
WithArgs("wasi", "stdin").
WithStdin(r)
ch := make(chan string, 1)
WithStdin(stdinReader).
WithStdout(stdoutWriter)
ch := make(chan struct{}, 1)
go func() {
ch <- compileAndRun(t, testCtx, moduleConfig, bin)
defer close(ch)

r := wazero.NewRuntime(testCtx)
defer r.Close(testCtx)
_, err := wasi_snapshot_preview1.Instantiate(testCtx, r)
require.NoError(t, err)
_, err = r.InstantiateWithConfig(testCtx, bin, moduleConfig)
require.NoError(t, err)
}()

time.Sleep(1 * time.Second)
_, _ = w.WriteString("foo")
s := <-ch
require.Equal(t, "waiting for stdin...\nfoo", s)
buf := make([]byte, 21)
_, _ = stdoutReader.Read(buf)
require.Equal(t, "waiting for stdin...\n", string(buf))
_, _ = stdinWriter.WriteString("foo")
_ = stdinWriter.Close()
buf = make([]byte, 3)
_, _ = stdoutReader.Read(buf)
require.Equal(t, "foo", string(buf))
<-ch
}
6 changes: 1 addition & 5 deletions internal/sysfs/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ func TestReadFdNonblock(t *testing.T) {
// Read from the file without ever writing to it should not block.
buf := make([]byte, 8)
_, e := readFd(fd, buf)
if runtime.GOOS == "windows" {
require.EqualErrno(t, syscall.ENOSYS, e)
} else {
require.EqualErrno(t, syscall.EAGAIN, e)
}
require.EqualErrno(t, syscall.EAGAIN, e)
}

func TestFileSetAppend(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/sysfs/file_unsupported.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build !unix && !linux && !darwin
//go:build !unix && !linux && !darwin && !windows

package sysfs

Expand Down
61 changes: 61 additions & 0 deletions internal/sysfs/file_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package sysfs

import (
"syscall"
"unsafe"

"github.com/tetratelabs/wazero/internal/platform"
)

const NonBlockingFileIoSupported = true

var kernel32 = syscall.NewLazyDLL("kernel32.dll")

// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe
var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe")

// readFd returns ENOSYS on unsupported platforms.
//
// PeekNamedPipe: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
// "GetFileType can assist in determining what device type the handle refers to. A console handle presents as FILE_TYPE_CHAR."
// https://learn.microsoft.com/en-us/windows/console/console-handles
func readFd(fd uintptr, buf []byte) (int, syscall.Errno) {
handle := syscall.Handle(fd)
fileType, err := syscall.GetFileType(syscall.Stdin)
if err != nil {
return 0, platform.UnwrapOSError(err)
}
if fileType&syscall.FILE_TYPE_CHAR == 0 {
return -1, syscall.ENOSYS
}
n, err := peekNamedPipe(handle)
if err != nil {
errno := platform.UnwrapOSError(err)
if errno == syscall.ERROR_BROKEN_PIPE {
return 0, 0
}
}
if n == 0 {
return -1, syscall.EAGAIN
}
un, err := syscall.Read(handle, buf[0:n])
return un, platform.UnwrapOSError(err)
}

// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API
// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
func peekNamedPipe(handle syscall.Handle) (uint32, error) {
var totalBytesAvail uint32
totalBytesPtr := unsafe.Pointer(&totalBytesAvail)
_, _, err := procPeekNamedPipe.Call(
uintptr(handle), // [in] HANDLE hNamedPipe,
0, // [out, optional] LPVOID lpBuffer,
0, // [in] DWORD nBufferSize,
0, // [out, optional] LPDWORD lpBytesRead
uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail,
0) // [out, optional] LPDWORD lpBytesLeftThisMessage
if err == syscall.Errno(0) {
return totalBytesAvail, nil
}
return totalBytesAvail, err
}
10 changes: 9 additions & 1 deletion internal/sysfs/nonblock_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@

package sysfs

import "syscall"
import (
"syscall"

"github.com/tetratelabs/wazero/internal/fsapi"
)

func setNonblock(fd uintptr, enable bool) error {
return syscall.SetNonblock(int(fd), enable)
}

func isNonblock(f *osFile) bool {
return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
}
18 changes: 17 additions & 1 deletion internal/sysfs/nonblock_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,24 @@

package sysfs

import "syscall"
import (
"io/fs"
"syscall"

"github.com/tetratelabs/wazero/internal/fsapi"
)

func setNonblock(fd uintptr, enable bool) error {
// We invoke the syscall, but this is currently no-op.
return syscall.SetNonblock(syscall.Handle(fd), enable)
}

func isNonblock(f *osFile) bool {
// On Windows, we support non-blocking reads only on named pipes.
isValid := false
st, errno := f.Stat()
if errno == 0 {
isValid = st.Mode&fs.ModeNamedPipe != 0
}
return isValid && f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
}
2 changes: 1 addition & 1 deletion internal/sysfs/osfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (f *osFile) reopen() (errno syscall.Errno) {

// IsNonblock implements the same method as documented on fsapi.File
func (f *osFile) IsNonblock() bool {
return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
return isNonblock(f)
}

// SetNonblock implements the same method as documented on fsapi.File
Expand Down
29 changes: 3 additions & 26 deletions internal/sysfs/select_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"syscall"
"time"
"unsafe"

"github.com/tetratelabs/wazero/internal/platform"
)
Expand All @@ -16,11 +15,6 @@ const wasiFdStdin = 0
// pollInterval is the interval between each calls to peekNamedPipe in pollNamedPipe
const pollInterval = 100 * time.Millisecond

var kernel32 = syscall.NewLazyDLL("kernel32.dll")

// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe
var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe")

// syscall_select emulates the select syscall on Windows for two, well-known cases, returns syscall.ENOSYS for all others.
// If r contains fd 0, and it is a regular file, then it immediately returns 1 (data ready on stdin)
// and r will have the fd 0 bit set.
Expand Down Expand Up @@ -72,7 +66,8 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int
func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *time.Duration) (bool, error) {
// Short circuit when the duration is zero.
if duration != nil && *duration == time.Duration(0) {
return peekNamedPipe(pipeHandle)
bytes, err := peekNamedPipe(pipeHandle)
return bytes > 0, err
}

// Ticker that emits at every pollInterval.
Expand Down Expand Up @@ -101,27 +96,9 @@ func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *tim
if err != nil {
return false, err
}
if res {
if res > 0 {
return true, nil
}
}
}
}

// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API
// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
func peekNamedPipe(handle syscall.Handle) (bool, error) {
var totalBytesAvail uint32
totalBytesPtr := unsafe.Pointer(&totalBytesAvail)
_, _, err := procPeekNamedPipe.Call(
uintptr(handle), // [in] HANDLE hNamedPipe,
0, // [out, optional] LPVOID lpBuffer,
0, // [in] DWORD nBufferSize,
0, // [out, optional] LPDWORD lpBytesRead
uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail,
0) // [out, optional] LPDWORD lpBytesLeftThisMessage
if err == syscall.Errno(0) {
return totalBytesAvail > 0, nil
}
return totalBytesAvail > 0, err
}
8 changes: 4 additions & 4 deletions internal/sysfs/select_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestSelect_Windows(t *testing.T) {
wh := syscall.Handle(w.Fd())

// Ensure the pipe has data.
hasData, err := peekNamedPipe(rh)
n, err := peekNamedPipe(rh)
require.NoError(t, err)
require.False(t, hasData)
require.NotEqual(t, 0, n)

// Write to the channel.
msg, err := syscall.ByteSliceFromString("test\n")
Expand All @@ -44,9 +44,9 @@ func TestSelect_Windows(t *testing.T) {
require.NoError(t, err)

// Ensure the pipe has data.
hasData, err = peekNamedPipe(rh)
n, err = peekNamedPipe(rh)
require.NoError(t, err)
require.True(t, hasData)
require.NotEqual(t, 0, n)
})

t.Run("pollNamedPipe should return immediately when duration is nil (no data)", func(t *testing.T) {
Expand Down