Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Rust tokio async interoperability through pollOneOff #1891

Closed
gaukas opened this issue Dec 26, 2023 · 20 comments
Closed

Feature Request: Rust tokio async interoperability through pollOneOff #1891

gaukas opened this issue Dec 26, 2023 · 20 comments
Labels
enhancement New feature or request

Comments

@gaukas
Copy link
Contributor

gaukas commented Dec 26, 2023

Merry Christmas and Happy Holidays! 🎄

Is your feature request related to a problem? Please describe.
I have a WebAssembly binary generated by compiling from Rust, in which async I/O operations are performed using tokio.

However, the WebAssembly panics when performing async read operations:

thread '<unnamed>' panicked at /path/to/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/io/driver.rs:157:23:
unexpected error when polling the I/O driver: Os { code: 58, kind: Uncategorized, message: "Not supported" }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Ref: tokio-1.35.1/src/runtime/io/driver.rs:157

I noticed this error was actually returned by mio::Poll::poll()(Docs), which calls to Selector::select()(Source). So it is likely the invoked syscall(kevent) has triggered the OS error.

Describe the solution you'd like
Since the given operation can be executed perfectly fine on at least one other popular WebAssembly runtime (wasmtime) with rest of them untested, the best option is seemingly adding the related syscall support in full in order to support complete async/nonblocking operations.

Describe alternatives you've considered
Or, it could be due to a mistake on my end for non-standard use of wazero (see Additional Context). I would be rather happy to see an existing example that works with tokio::select and tokio::io::AsyncReadExt::read().

Additional context
So I am indeed doing something weird (#1857). I patched several interfaces in wazero to allow me insert net.TCPConn into WebAssembly with access through a file descriptor.

(*Context).InsertTCPConn
func (c *Context) InsertTCPConn(conn *net.TCPConn) (key int32, ok bool) {
	return c.fsc.openedFiles.Insert(&FileEntry{
		IsPreopen: true,
		File:      fsapi.Adapt(sysfs.NewTCPConnFile(conn)),
	})
}

Inside WebAssembly I am wrapping the fd back into a TCP connection (std::net::TcpStream):

stdasync/lib.rs
use std::io::{Read, Write};
use std::os::fd::FromRawFd;

#[export_name = "_read"]
pub fn _worker(fd: i32) -> i32 {
    let mut stdstream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
    stdstream
        .set_nonblocking(true)
        .expect("Failed to set non-blocking");

    // in a loop, read from fd, write to fd
    let mut buf = vec![0; 1024];
    match stdstream.read(&mut buf) {
        Ok(0) => break, // End of stream
        Ok(n) => {
            if let Err(e) = stdstream.write(&buf[..n]) {
                println!("Error writing to fd: {:?}", e);
                // ...
            }
            println!("written {} bytes", n)
        }
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
            println!("WouldBlock... retrying.");
            continue; // retry
        }
        Err(e) => {
            println!("Error reading from fd: {:?}", e);
            // ...
        }
    }

    // ...
}

Which works perfectly, both read and write could be performed correctly.

But it will not work if "upgraded" with tokio.

tokioasync/lib.rs
#[tokio::main(flavor = "current_thread")]
async fn tokio_worker(conn: std::net::TcpStream) -> std::io::Result<()> {
    let mut conn: tokio::net::TcpStream =
        tokio::net::TcpStream::from_std(conn).expect("Failed to convert to tokio stream");

    println!("tokio_worker: conn = {:?}", conn);

    let mut rd_buf = vec![0; READ_BUFFER_SIZE];

    loop {
        tokio::select! {
            result = conn.read(&mut rd_buf) => {
                // println!("dst.read() result = {:?}", result);
                match result {
                    Ok(0) => break, // End of stream
                    Ok(n) => {
                        println!("read {} bytes", n);
                        if let Err(e) = conn.write_all(&rd_buf[0..n]).await {
                            println!("Error writing to conn: {:?}", e);
                            return Err(e);
                        }
                    }
                    Err(e) => {
                        println!("Error reading from conn: {:?}", e);
                        return Err(e);
                    }
                }
            }
        }
    }

    Ok(())
}

So I have made an educated guess that I have been doing what supposed to be done for my purposes. But if there are apparently better ways to insert TCP connections/sockets dynamically (i.e., after instantiation), I am open to advices.

@gaukas gaukas added the enhancement New feature or request label Dec 26, 2023
@ncruces
Copy link
Collaborator

ncruces commented Dec 26, 2023

I definitely don't have enough Rust knowledge to triage this, but for the benefit of others and yourself, do you have code that you can share that shows the issue?

You patched wazero, where's this patched wazero?
You built a WASM module, can you share it, both in binary and source form?

I don't mean snippets, but a minimal, complete and verifiable example.

Without that, it seems very unlikely that anyone will be able to help you.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 26, 2023

Thanks for your timely response!

You patched wazero, where's this patched wazero?

I simply patched it in my fork here: https://github.com/gaukas/wazero

a minimal, complete and verifiable example

Gladly. https://github.com/gaukas/wasmasync contains the requested examples, including a driver written in Go (using my fork of wazero), 4 examples written in Rust (asyncrs, asynctokio, asynctokio2, syncrs) and 1 example in WAT.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 26, 2023

Any general feedback would be really helpful as well, including my patch and advice on creating WebAssembly binary achieving the same goal.

One reason I had to use Rust is that I could not figure out a proper way to do this in Go. It seems TinyGo's WASI target did not really like what I'm trying to do (recovering a fd into a file/socket and write to it). I could be doing it in a wrong way tho... It could be much easier if Go/TinyGo's WASI target actually supports everything I needed.

@evacchi
Copy link
Contributor

evacchi commented Dec 27, 2023

Hi, thanks for the detailed reproducer.

The error "Not supported" is likely not to be a real OS error but a WASI error, specifically, wazero's WASI implementation, which, in some cases, may return ENOTSUP.

First of all, you can get further information about what's going on at the WASI/FS/Socket layer by adding these lines to your main.go:

	// Configure host logging.
	ctx = context.WithValue(ctx, experimental.FunctionListenerFactoryKey{},
		logging.NewHostLoggingListenerFactory(os.Stderr, logging.LogScopeFilesystem|logging.LogScopePoll|logging.LogScopeSock))

This will enable host logging and dump to the console some further info about which syscalls are failing.

In this case, however, you can find the offender in wazero/imports/wasi_snapshot_preview1/poll.go. There is one case where we return ENOTSUP, i.e. if we find a subscription to FdWrite:

case wasip1.EventTypeFdWrite:
fd := int32(le.Uint32(argBuf))
if fd < 0 {
return sys.EBADF
}
if _, ok := fsc.LookupFile(fd); ok {
evt.errno = wasip1.ErrnoNotsup
} else {
evt.errno = wasip1.ErrnoBadf
}
nevents++
writeEvent(outBuf[outOffset:], evt)

Since you are already getting your hands dirty with the code base, you could arbitrarily return success to a writing subscription if the underlying fd is already in nonblocking mode. Notice this is somewhat correct for real disk files, but not quite correct for sockets, that's one of the reasons we are not supporting it at the moment: it requires significant plumbing on the codebase to support properly, and we had no evidence of real-world usage so far.

		case wasip1.EventTypeFdWrite:
			fd := int32(le.Uint32(argBuf))
			if fd < 0 {
				return sys.EBADF
			}
			if file, ok := fsc.LookupFile(fd); !ok {
				evt.errno = wasip1.ErrnoBadf
				writeEvent(outBuf[outOffset:], evt)
				nevents++
			} else if file.File.IsNonblock() {
				writeEvent(outBuf[outOffset:], evt)
				nevents++
			} else {
				evt.errno = wasip1.ErrnoNotsup
				writeEvent(outBuf[outOffset:], evt)
				nevents++
			}

Eventually, however, on my macOS system, asynctokio.wasm dies again with an EBADF returned by fd_read():

// Read implements the same method as documented on sys.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno sys.Errno) {
n, err := syscall.Read(int(f.fd), buf)
if err != nil {
// Defer validation overhead until we've already had an error.
errno = sys.UnwrapOSError(err)
errno = fileError(f, f.closed, errno)
}
return n, errno
}

As you can see from the logs as well:

...
==> wasi_snapshot_preview1.fd_read(fd=3,iovs=1048300,iovs_len=1)
<== (nread=,errno=EAGAIN)
==> wasi_snapshot_preview1.fd_read(fd=3,iovs=1048300,iovs_len=1)
<== (nread=,errno=EBADF)
Error reading from fd: Os { code: 8, kind: Uncategorized, message: "Bad file descriptor" }
...

This also happens with asyncrs but not asynctokio2. I am not familiar enough with Rust and Tokio to tell if this is correct or not (i.e., EBADF should be due to the underlying FD being closed, although I can't find any evidence in the logs).

I also cannot exclude something is going on at the use-site in Go, e.g. the socket has been closed on that end and the wasm side is now accessing an invalid descriptor:

https://github.com/gaukas/wasmasync/blob/9c89b62a92059913607ee398836de361faaa52b4/main.go#L162-L168

maybe @ncruces or @achille-roussel have ideas :)

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

Thank you for such a detailed response and walkthrough!

that's one of the reasons we are not supporting it at the moment: it requires significant plumbing on the codebase to support properly

Thanks for the explanation. Is there an issue tracker or roadmap for what's needed to be done? I might be able to help in some ways.

on my macOS system, asynctokio.wasm dies again with an EBADF returned by fd_read()

This also happens with asyncrs but not asynctokio2.

This is a bit concerning since this error does not occur on my Linux test machine. Could you please try wasm/syncrs.wasm and/or wat/rw.wasm?

Also, do you have any insights on possible discrepancies between wazero's linux and macOS implementation which might be related to this issue? Or are you suggesting it is more likely unrelated to wazero but rather inside WebAssembly or in Go's host setup (before entering wazero)?

Again, thank you so much for your time and effort. I appreciate it.

@evacchi
Copy link
Contributor

evacchi commented Dec 27, 2023

let me first reply to the last question:

on my macOS system, asynctokio.wasm dies again with an EBADF returned by fd_read()

This also happens with asyncrs but not asynctokio2.

This is a bit concerning since this error does not occur on my Linux test machine. Could you please try wasm/syncrs.wasm and/or wat/rw.wasm?

I meant that applying the patch above the programs progress but they fail later with that error. Otherwise they fail exactly as you reported.

that's one of the reasons we are not supporting it at the moment: it requires significant plumbing on the codebase to support properly

Thanks for the explanation. Is there an issue tracker or roadmap for what's needed to be done? I might be able to help in some ways.

In general, requirements are user-driven, so in our experience, so far, nobody has encountered issues in the implementation before you. What's your use case? :o) And I mean, besides "I want to use Tokio" :P You can disclose it privately if you are more comfortable; reach us out on the Gophers slack. We hang on the #wazero channel but you can DM me if you prefer :)

In any case, to implement the missing part of poll_oneoff, it should:

In this particular case, as [https://www.remlab.net/op/nonblock.shtml][the Remlab article] put it,

writeability implies the send buffer is not full from the standpoint of the underlying protocol of the socket (e.g. TCP/IP).

so, my understanding is that a good implementation of a write subscription should return EAGAIN unless there is space left on the system buffer. How do you know? One way might be to simply invoke poll on the underlying fd. This in turn requires implementing File.Poll() on tcpConnFile and tcpListenerFile in sock_unix.go and doing the same on the Windows counterpart, then you will need to invoke File.Poll in poll.go when the FD is a socket: currently it is only really invoked on Stdin:

if stdinReady, errno := stdin.File.Poll(fsapi.POLLIN, int32(timeout.Milliseconds())); errno != 0 {
return errno

So basically, this would require refactoring the entire pollOneoffFn to actually invoke poll instead of short-circuiting in a well-known case. Or, you should add more short-circuiting only in the case the FD is an actual socket (which, I would say, doesn't happen very often).

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

I meant that applying the patch above the programs progress but they fail later with that error.

Thanks for the clarification. That's actually what I meant, asyncrs and syncrs are expected to succeed, and I don't believe the patch will result in them failing.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

Okay, so after applying the suggested patch, the async read operation does get through.

However, it is quite weird that the file descriptor inside WebAssembly will fail exactly 5 seconds after the function blocks for 5 seconds, no matter what's being done (e.g., number of bytes written), and if and only if tokio is involved. I did not reproduce the same error on asyncrs.

Further, the TCP connections are still alive after the WebAssembly function call fails. Which indicates it is unlikely a problem outside wazero. Guess I will spend more time looking into this.

@evacchi
Copy link
Contributor

evacchi commented Dec 27, 2023

hah, interesting both asyncrs and syncrs eventually fail even without the patch :D I suspect that does not have to do with async I/O but maybe (?) because the Go runtime eventually closes the fd. I was about to reply before your last message because I was booting up a Linux VM to check there too

EDIT: ...aaand I get the same result (./wasm/syncrs.wasm) no patches applied

Error writing to fd: Os { code: 8, kind: Uncategorized, message: "Bad file descriptor" }

EDIT2: I suspect the Go runtime is involved somehow

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

So are we getting split results?

In conclusion on my end, Linux (Ubuntu 22.04), without the patch:

  • syncrs
  • asyncrs
  • asynctokio: Not supported
  • asynctokio2: Not supported

With the patch:

  • syncrs
  • asyncrs
  • asynctokio: OK for the first 5 seconds, then Bad file descriptor
  • asynctokio2: OK for the first 5 seconds, then Bad file descriptor

Edit: Bad file descriptor is local to WebAssembly, Go can still read/write to the TCP connections after WebAssembly fails.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

I just pushed an update to asynctokio2, which uses the loop correctly to infinitely read-then-write (instead of returning on the first success).

@evacchi
Copy link
Contributor

evacchi commented Dec 27, 2023

so yeah apparently I am getting Bad file descriptor in all cases when the patch is applied. Could it be version-dependent? I am testing against go1.21.5 on macOS, go 1.21.1 on Linux

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

My results are from Go 1.21.5 on Linux.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

I think I figured this out. It is due to a very trivial misuse of (*net.TCPConn).File()

func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
f, err := tc.File()
if err != nil {
panic(err)
}
return &tcpConnFile{fd: f.Fd()}
}

The code above called (*net.TCPConn).File() (returning (*os).File) and then (*os.File).Fd(), using it with an assumption that fd obtained is the underlying fd of the TCP connection. However the document says:

File returns a copy of the underlying os.File. It is the caller's responsibility to close f when finished. Closing c does not affect f, and closing f does not affect c.

The returned os.File's file descriptor is different from the connection's. Attempting to change properties of the original using this duplicate may or may not have the desired effect.

Fd returns the integer Unix file descriptor referencing the open file. If f is closed, the file descriptor becomes invalid. If f is garbage collected, a finalizer may close the file descriptor, making it invalid; see runtime.SetFinalizer for more information on when a finalizer might be run. On Unix systems this will cause the SetDeadline methods to stop working. Because file descriptors can be reused, the returned file descriptor may only be closed through the Close method of f, or by its finalizer during garbage collection. Otherwise, during garbage collection the finalizer may close an unrelated file descriptor with the same (reused) number.

Which indicates the fd obtained is rather a different fd for the "copy of the underlying os.File". Therefore it is totally possible and legitimate for the GC to free the file if no pointer to it lives anymore.

func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
	f, err := tc.File()
	if err != nil {
		panic(err)
	}
	tcf := &tcpConnFile{fd: f.Fd()}

	runtime.SetFinalizer(tcf, func(tcf *tcpConnFile) {
		if !tcf.closed {
			f.Close()
		}
	})

	return tcf
}

Setting finalizer for socketapi.TCPConn could be a solution to this problem (and it works, tested).

Alternatively, we might be able to use (*net.TCPConn).SyscallConn() and (syscall).RawConn.Control(f func(fd uintptr)) to achieve a similar goal without saving the copied file (by leaking the underlying fd). The only concern is this is pretty much a non-standard use:

// Control invokes f on the underlying connection's file
// descriptor or handle.
// The file descriptor fd is guaranteed to remain valid while
// f executes but not after f returns.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

Similar issue is expected to exist for TCP Listener. I found I have trouble understanding the comment...

// newTCPListenerFile is a constructor for a socketapi.TCPSock.
//
// Note: the implementation of socketapi.TCPSock goes straight
// to the syscall layer, bypassing most of the Go library.
// For an alternative approach, consider winTcpListenerFile
// where most APIs are implemented with regular Go std-lib calls.
func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
conn, err := tl.File()
if err != nil {
panic(err)
}
fd := conn.Fd()
// We need to duplicate this file handle, or the lifecycle will be tied
// to the TCPListener. We rely on the TCPListener only to set up
// the connection correctly and parse/resolve the TCP Address
// (notice we actually rely on the listener in the Windows implementation).
sysfd, err := syscall.Dup(int(fd))
if err != nil {
panic(err)
}
return &tcpListenerFile{fd: uintptr(sysfd), addr: tl.Addr().(*net.TCPAddr)}
}

What is this syscall.Dup() trying to achieve? I am not sure what does it mean by "the lifecycle will be tied to the TCPListener". I do believe (*TCPListener).File() is already returning a "copy of the underlying file":

File returns a copy of the underlying os.File. It is the caller's responsibility to close f when finished. Closing l does not affect f, and closing f does not affect l.

The returned os.File's file descriptor is different from the connection's. Attempting to change properties of the original using this duplicate may or may not have the desired effect.

Therefore this duplication is likely to be redundant. And similarly, we will need to save the (*os).File returned OR leak the real underlying fd (via syscall.RawConn).

Given the fact that we are manually duplicating, saving the file seems to be more preferable.

@gaukas
Copy link
Contributor Author

gaukas commented Dec 27, 2023

Btw I will be happy to open a PR addressing this, if we can settle on a solution:

TCP Conn:

  • runtime.SetFinalizer() for the (*net.TCPConn).File(), or
  • (*net.TCPConn).SyscallConn(), then (syscall.RawConn).Control() to leak the underlying fd.

TCP Listener:

  • Leave it as is (if I misunderstood anything)
  • Or the same two options for TCP Conn

@evacchi
Copy link
Contributor

evacchi commented Dec 28, 2023

Thanks @gaukas for the detailed detective work. You are indeed correct that File() returns a duplicate file object. As you put it in your earlier comment on TCPConn, however, relying on c.File().Fd() won't work, and it probably won't work for both types:

Which indicates the fd obtained is rather a different fd for the "copy of the underlying os.File".

So, I guess the comment on newTCPListenerFile is incorrect because the fd is not tied to the lifecycle of the original *net.TCPListener but rather, it is tied to the returned file instance in conn.File(). Because that instance goes quickly out of scope, it becomes soon eligible for GC. That's the reason why the extra Dup() is needed.

In the Windows implementation (sock_windows.go) the fd is never "leaked" to the conn/tl structs. Instead, we are holding a reference to the real underlying listener/connection:

func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
return &winTcpListenerFile{tl: tl}
}

there is a good reason for that, dup() is not implemented :D

// fd_windows.go
// Unimplemented functions.
func (fd *netFD) dup() (*os.File, error) {
	// TODO: Implement this
	return nil, syscall.EWINDOWS
}

instead, the fd is never leaked, and all syscalls go through .Control, which ensures that 1) the fd is valid throughout the invocation, 2) the underlying File is never GC'd until the wrapper goes out of scope itself. Moreover, it avoids finalizers altogether.

While this makes the code slightly more complicated, it might be a better solution for *NIX too: it is probably the safer version, and it comes with the added benefit that it should align a bit more the code to the Windows version.

@ncruces
Copy link
Collaborator

ncruces commented Jan 6, 2024

Does #1892 fix this?

@gaukas
Copy link
Contributor Author

gaukas commented Jan 6, 2024

Hi @ncruces, no, #1892 fixes a different problem we found during troubleshooting. The actual problem is in poll_one_off, which we are currently using a bypass as @evacchi suggested without fixing the root cause. The real fix could be a part of goals under #1500. But feel free to close this issue if it duplicates #1500.

@gaukas gaukas changed the title Feature/Example Request: syscall(kevent)-based tokio async interoperability Feature Request: Rust tokio async interoperability through pollOneOff Jan 6, 2024
@gaukas
Copy link
Contributor Author

gaukas commented Jan 6, 2024

I edited the title to better describe the actual issue in case someone had the same problem. Feel free to update if you have a better suggestion!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants