Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3PH part 1: Prepare for 3PH support #594

Merged
merged 4 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions rpc/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,25 @@ type PeerID struct {
// attached to the message via SCM_RIGHTS. This file descriptor would be one
// end of a newly-created socketpair, with the other end having been sent to the
// process hosting the capability in RecipientId.
type ThirdPartyCapID capnp.Ptr
//
// Some networks, as an optimization, may permit ThirdPartyToContact to be
// forwarded across multiple vats. For example, imagine Alice sends a capability
// to Bob, who passes it along to Carol, who further pass it to Dave. Bob will send
// a `Provide` message to Alice telling her to expect the capability to be picked
// up by Carol, and then will pass Carol a `ThirdPartyToContact` pointing to Alice.
// If `ThirdPartyToContact` is non-forwardable, then Carol must form a connection
// to Alice, send an `Accept` to receive the capability, and then immediately send
// a `Provide` to provide it to Dave, before then being able to give a
// `ThirdPartyToContact` to Dave which points to Alice. This is a bit of a waste.
// If `ThirdPartyToContact` is forwardable, then Carol can simply pass it along to
// Dave without making any connection to Alice. Some VatNetwork implementations may
// require that Carol add a signature to the `ThirdPartyToContact` authenticating
// that she really did forward it to Dave, which Dave will then present back to
// Alice. Other implementations may simply pass along an unguessable token and
// instruct Alice that whoever presents the token should receive the capability.
// A VatNetwork may choose not to allow forwarding if it doesn't want its security
// to be dependent on secret bearer tokens nor cryptographic signatures.
type ThirdPartyToContact capnp.Ptr

// The information that must be sent in a `Provide` message to identify the
// recipient of the capability.
Expand All @@ -41,7 +59,7 @@ type ThirdPartyCapID capnp.Ptr
// attached to the message via SCM_RIGHTS. This file descriptor would be one
// end of a newly-created socketpair, with the other end having been sent to the
// capability's recipient in ThirdPartyCapId.
type RecipientID capnp.Ptr
type ThirdPartyToAwait capnp.Ptr

// The information that must be sent in an `Accept` message to identify the
// object being accepted.
Expand All @@ -50,46 +68,72 @@ type RecipientID capnp.Ptr
// be the public key fingerprint of the provider vat along with a nonce matching
// the one in the `RecipientId` used in the `Provide` message sent from that
// provider.
type ProvisionID capnp.Ptr
type ThirdPartyCompletion capnp.Ptr

// Data needed to perform a third-party handoff, returned by
// Newtork.Introduce.
// Data needed to perform a third-party handoff.
type IntroductionInfo struct {
SendToRecipient ThirdPartyCapID
SendToProvider RecipientID
SendToProvider ThirdPartyToAwait
SendToRecipient ThirdPartyToContact
}

// A Network is a reference to a multi-party (generally >= 3) network
// of Cap'n Proto peers. Use this instead of NewConn when establishing
// connections outside a point-to-point setting.
//
// In addition to satisfying the method set, a correct implementation
// of Network must be comparable.
type Network interface {
// Return the identifier for caller on this network.
LocalID() PeerID

// Connect to another peer by ID. The supplied Options are used
// for the connection, with the values for RemotePeerID and Network
// overridden by the Network.
Dial(PeerID, *Options) (*Conn, error)
// Connect to another peer by ID. Re-uses any existing connection
// to the peer.
Dial(PeerID) (*Conn, error)

// Accept and handle incoming connections on the network until
// the context is canceled.
Serve(context.Context) error
}

// Accept the next incoming connection on the network, using the
// supplied Options for the connection. Generally, callers will
// want to invoke this in a loop when launching a server.
Accept(context.Context, *Options) (*Conn, error)
// A Network3PH is a Network which supports three-party handoff of capabilities.
// TODO(before merge): could this interface be named better?
type Network3PH interface {
// Introduces both connections for a three-party handoff. After this,
// the `ThirdPartyToAwait` will be sent to the `provider` and the
// `ThirdPartyToContact` will be sent to the `recipient`.
//
// An error indicates introduction is not possible between the two `Conn`s.
Introduce(provider *Conn, recipient *Conn) (IntroductionInfo, error)

// Introduce the two connections, in preparation for a third party
// handoff. Afterwards, a Provide messsage should be sent to
// provider, and a ThirdPartyCapId should be sent to recipient.
Introduce(provider, recipient *Conn) (IntroductionInfo, error)
// Attempts forwarding of a `ThirdPartyToContact` received from `from` to
// `destination`, with both vats being in this Network. This method
// return a `ThirdPartyToContact` to send to `destination`.
//
// An error indicates forwarding is not possible.
Forward(from *Conn, destination *Conn, info ThirdPartyToContact) (ThirdPartyToContact, error)

// Given a ThirdPartyCapID, received from introducedBy, connect
// to the third party. The caller should then send an Accept
// message over the returned Connection.
DialIntroduced(capID ThirdPartyCapID, introducedBy *Conn) (*Conn, ProvisionID, error)
// Completes a three-party handoff.
//
// The provided `completion` has been received from `conn` in an `Accept`.
//
// This method blocks until there is a matching `AwaitThirdParty`, if there is
// none currently, and returns the `value` passed to it.
//
// An error indicates that this completion can never succeed, for example due
// to a `completion` that is malformed. The error will be sent in response to the
// `Accept`.
CompleteThirdParty(ctx context.Context, conn *Conn, completion ThirdPartyCompletion) (any, error)

// Given a RecipientID received in a Provide message via
// introducedBy, wait for the recipient to connect, and
// return the connection formed. If there is already an
// established connection to the relevant Peer, this
// SHOULD return the existing connection immediately.
AcceptIntroduced(recipientID RecipientID, introducedBy *Conn) (*Conn, error)
// Awaits for completion of a three-party handoff.
//
// The provided `await` has been received from `conn`.
//
// While the context is valid, any `CompleteThirdParty` calls that match
// the provided `await` should return `value`.
//
// After the context is canceled, future calls to `CompleteThirdParty` are
// not required to return the provided `value`.
//
// This method SHOULD not block.
AwaitThirdParty(ctx context.Context, conn *Conn, await ThirdPartyToAwait, value any)
}
4 changes: 2 additions & 2 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (c *lockedConn) parseReturn(dq *deferred.Queue, ret rpccp.Return, called []
return parsedReturn{err: rpcerr.WrapFailed("parse return", err), parseFailed: true}
}
return parsedReturn{err: exc.New(exc.Type(e.Type()), "", reason)}
case rpccp.Return_Which_acceptFromThirdParty:
case rpccp.Return_Which_awaitFromThirdParty:
// TODO: 3PH. Can wait until after the MVP, because we can keep
// setting allowThirdPartyTailCall = false
fallthrough
Expand Down Expand Up @@ -1742,7 +1742,7 @@ func (c *Conn) handleDisembargo(ctx context.Context, in transport.IncomingMessag
})
})

case rpccp.Disembargo_context_Which_accept, rpccp.Disembargo_context_Which_provide:
case rpccp.Disembargo_context_Which_accept:
if c.network != nil {
panic("TODO: 3PH")
}
Expand Down
1 change: 0 additions & 1 deletion std/capnp/c++.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ annotation allowCancellation(interface, method, file) :Void;
# If your code is not cancellation-safe, then allowing cancellation might give a malicious client
# an easy way to induce use-after-free or other bugs in your server, by requesting cancellation
# when not expected.

using Go = import "/go.capnp";
$Go.package("cxx");
$Go.import("capnproto.org/go/capnp/v3/std/capnp/cxx");
48 changes: 48 additions & 0 deletions std/capnp/compat/byte-stream.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
@0x8f5d14e1c273738d;

using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("capnp");
$Cxx.allowCancellation;

interface ByteStream {
write @0 (bytes :Data) -> stream;
# Write a chunk.

end @1 ();
# Signals clean EOF. (If the ByteStream is dropped without calling this, then the stream was
# prematurely canceled and so the body should not be considered complete.)

getSubstream @2 (callback :SubstreamCallback,
limit :UInt64 = 0xffffffffffffffff) -> (substream :ByteStream);
# This method is used to implement path shortening optimization. It is designed in particular
# with KJ streams' pumpTo() in mind.
#
# getSubstream() returns a new stream object that can be used to write to the same destination
# as this stream. The substream will operate until it has received `limit` bytes, or its `end()`
# method has been called, whichever occurs first. At that time, it invokes one of the methods of
# `callback` based on the termination condition.
#
# While a substream is active, it is an error to call write() on the original stream. Doing so
# may throw an exception or may arbitrarily interleave bytes with the substream's writes.

startTls @3 (expectedServerHostname :Text) -> stream;
# Client calls this method when it wants to initiate TLS. This ByteStream is not terminated,
# the caller should reuse it.

interface SubstreamCallback {
ended @0 (byteCount :UInt64);
# `end()` was called on the substream after writing `byteCount` bytes. The `end()` call was
# NOT forwarded to the underlying stream, which remains open.

reachedLimit @1 () -> (next :ByteStream);
# The number of bytes specified by the `limit` parameter of `getSubstream()` was reached.
# The substream will "resolve itself" to `next`, so that all future calls to the substream
# are forwarded to `next`.
#
# If the `write()` call which reached the limit included bytes past the limit, then the first
# `write()` call to `next` will be for those leftover bytes.
}
}
using Go = import "/go.capnp";
$Go.package("bytestream");
$Go.import("capnproto.org/go/capnp/v3/std/capnp/compat/bytestream");
Loading