Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining in client on certain situations triggers "unknown method" in the "parent" capability #604

Open
gabivlj opened this issue Jan 31, 2025 · 7 comments

Comments

@gabivlj
Copy link

gabivlj commented Jan 31, 2025

When you start a pipeline request from the client, while another request is in-flight, go-capnp won't recognise one of the RPC calls that comes in that were pipelined.

This is an issue we encountered and it was a bit hard to repro, here is a repo where you can easily reproduce (rust capnp client, but we also repro'd with a c++ one).
https://github.com/gabivlj/capnp-issue

Schema:


interface Service {
    get @0 () -> (bsr :ByteStreamReturner);
}

interface ByteStreamReturner {
    getConnector @0 () -> (conn :Connector);
    inflighter @1 ();
}

interface Connector {
    connect @0 (down: ByteStream) -> (up :ByteStream);
}

interface ByteStream {
  write @0 (bytes :Data) -> stream;
}

Important part of the repro:


func (c *byteStreamGetter) GetConnector(ctx context.Context, getParams bytestream.ByteStreamReturner_getConnector) (rerr error) {
	getParams.Go()
	fmt.Println("Get from byteStreamGetter")
	res, err := getParams.AllocResults()
	if err != nil {
		return err
	}

	bServer := bytestream.Connector_NewServer(&conn{})
	bServer.HandleUnknownMethod = func(m capnp.Method) *server.Method {
		fmt.Println("Unknown method:", m.String())
		// So here, if you for example wait for the bytestream
		// you return in Connect and do something like this:

		/*
			        b := <-p.c
				fmt.Println("Got bytestream, returning impl")
				methods := bytestream.ByteStream_Methods([]server.Method{}, b)
				s := &server.Method{methods[m.MethodID].Method, methods[m.MethodID].Impl}
				p.c <- b
		*/

		// It will work!
		//
		// IDK why capnp-go here doesn't really understand what's going on
		return nil
	}

	// The more you sleep, the easier to repro it is. Not even sleeping
	// and you will repro this.
	time.Sleep(time.Millisecond)
	client := capnp.NewClient(bServer)
	b := bytestream.Connector(client)
	return res.SetConn(b)
}

type conn struct {
}

func (c *conn) Connect(ctx context.Context, params bytestream.Connector_connect) error {
	res, err := params.AllocResults()
	if err != nil {
		return err
	}

	bServer := bytestream.ByteStream_NewServer(&byteStreamWrite{})
	bServer.HandleUnknownMethod = func(m capnp.Method) *server.Method {
		panic("this one is never triggered")
		return nil
	}

	client := capnp.NewClient(bServer)
	b := bytestream.ByteStream(client)
	return res.SetUp(b)
}

type byteStreamWrite struct {
}

var a atomic.Int64

func (c *byteStreamWrite) Shutdown() {
	log.Println("ByteStream Shutdown()")
}

// Write writes the bytes to the connection.
func (c *byteStreamWrite) Write(ctx context.Context, params bytestream.ByteStream_write) error {
	a.Add(1)
	params.Go()
	fmt.Println("Received Write", a.Load())
	args := params.Args()
	bytes, err := args.Bytes()
	if err != nil {
		return err
	}

	fmt.Println("Write called (good!)", string(bytes))
	return nil
}

You will see the output:

~/repro$ bin/repro
Inflighter BS start
Get from byteStreamGetter
Unknown method: @0xb45bc9c79a817b99.@0
Inflighter BS done
2025/01/31 03:32:07 CAPNP ERROR: reader: transport: stream transport: receive: EOF
2025/01/31 03:32:07 conn done, accepting another
byte stream getter Shutdown()
2025/01/31 03:32:07 ByteStream Shutdown()

What we are doing in the client is basically just create a pipeline that does getConnector() -> connect() -> write(). All of this while the inflighter promise is in the bg.
The unrecognised method is write, and is triggered in the capability returned by getConnector(), so it makes me think go-capnp is somehow confused and thinks write belongs to connector, when it belongs to bytestream.

If you need any more reproduction steps or more clarification, please let me know.

@gabivlj
Copy link
Author

gabivlj commented Jan 31, 2025

@kentonv asked if it happens with -> stream;, just removed from the repro schema and the issue is still being triggered.

@kentonv
Copy link
Member

kentonv commented Jan 31, 2025

To be concise here:

  • We think we are seeing a pipelined write() call being delivered to the wrong capability.
  • Specifically, we call something like connector.connect().write() as a pipelined sequence.
  • The problem only occurs, though, if there was a previous call to connector.inflighter(), which is still running.
  • What we observe is the write() call is being delivered to connector instead of to the ByteStream returned by connect().

(Is that correct, @gabivlj?)

@gabivlj
Copy link
Author

gabivlj commented Jan 31, 2025

Sorry, I can have a simpler repro by not even having an inflight and removing -> stream from the schema now, just doing the pipeline and you will trigger the issue. Updating mentioned repository.

            let request = service.get_request();
            let res = request.send();
            let bs_get = res.promise.await.expect("i dont care about failures");
            let bsr = bs_get.get().unwrap().get_bsr().unwrap();

            // Start the pipeline
            let promise = bsr.get_connector_request().send().pipeline;

            // Get connector
            let connector = promise.get_conn().connect_request();

            // Send the connector and get the pipeline
            let bs = connector.send().pipeline;

            // Get up bytestream
            let up = bs.get_up();

            // Get the write request
            let mut res = up.write_request();

            // Set the bytes
            let mut res_req = res.get();
            res_req.set_bytes("hello world".as_bytes());
            
            // Send everything now
            let _ = res.send().promise.await;

See https://github.com/gabivlj/capnp-issue/compare/gv/no-stream?expand=1

@gabivlj
Copy link
Author

gabivlj commented Jan 31, 2025

Maybe related: #521

@kentonv
Copy link
Member

kentonv commented Jan 31, 2025

Oh yeah #521 sounds like exactly it, except we've discovered the problem is that the message is being delivered to the parent object (which doesn't implement the method, hence the unimplemented error).

@lthibault
Copy link
Collaborator

Came here to point to #521! Agree it sounds like the same issue.

@kentonv
Copy link
Member

kentonv commented Jan 31, 2025

@lthibault Any hints how we should debug this? Unfortunately I don't know Go and @gabivlj doesn't know capnp.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants