-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protocol draft - for discussion only #4
base: master
Are you sure you want to change the base?
Changes from 6 commits
a2ae999
63e575c
d486da6
99b8204
e4b08b7
aa9c935
866d51e
74382fa
a1bd203
8aebcaa
59a9064
3514340
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,102 @@ | ||
Placeholder for network protocol. | ||
Status: this is a rough draft, intended to get some ideas out there. | ||
|
||
Missing: how to run on top of HTTP/2. Possibly the structure or semantics of publisher name strings. | ||
Might be removed: extension support. | ||
Need to choose serialization method (protobuf, msgpack, other?) | ||
|
||
## Transport assumptions | ||
|
||
A supporting transport must be similar to a streaming socket: | ||
|
||
1. Bidirectional and full-duplex | ||
2. An octet stream, i.e. all octet values may be sent unencoded | ||
3. Ordered delivery: an implementation may map protocol messages to some features of the underlying transport (e.g. [Ømq](http://zeromq.org/) messages), but the messages must arrive in the same order as they were sent | ||
|
||
Definitely supported transports include TCP, TLS (over TCP), WebSockets, and most socket-like objects (e.g. pipes). HTTP/2 will be supported, but may require a dedicated specification for implementing this protocol. | ||
|
||
## Message framing | ||
|
||
An existing serialization format should be used. Current candidates are [Protocol Buffers](https://github.com/google/protobuf/) (which is slightly less space efficient), [MessagePack](http://msgpack.org/) (whose Scala implementation may not be as good as the others), and possibly [Thrift](https://thrift.apache.org/) (with which I'm less familiar). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is requiring a specific serialization format the appropriate solution? It would definitely simplify things from a spec and implementation perspective, but is it okay for broad adoption? Does it impede usage in environments such as Node.js? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Protobuf, msgpack and Thrift all have implementations for Javasript / Node.js. If we don't specify a serialization format, I think there will be a lot of confusion and incompatibility out there. My goal in writing this wasn't to specify a meta-protocol that users then have to further adapt to their needs, but to specify a complete network protocol such that if someone advertises RS.io on TCP port 12345, you can pick any client implementation and expect it to work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you decide to mandate one, please pick one that has the potential to be decoded with zero copying. Without prejudice, other options are Cap'n Proto, Simple Binary Encoding.. Just ask Todd :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have no special experience or expertise with serialization formats, and no special preference for any particular one. I picked these three for being well-known, compact, and very widely- (and I hope well-) implemented. I've never used Cap'n Proto or SBE before. Looking at them just now, SBE only has implementations for C++, Java and C# - not even JS, which would be a real problem for this specification to mandate. Cap'n Proto is better in that regard, but still nowhere near the number of implementations of msgpack or protobuf. However, CP is also much more complex, precisely because it's a direct representation of usable memory layout and so deals with pointers and alignment and manual packing and so on. I could write a protobuf encoder in a couple of hours, but CP is a whole other story. This also makes me worry about its space efficiency - CP recommends using compression. So maybe CP has advantages, but it's sufficiently complex that the tradeoff isn't obvious to me, FWIW. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Compression should definitely be an option, and I wouldn't want to foreclose the possibility of implementing hot observables over reliable multicast either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tmontgomery I suggest the protocol should only specify a particular serialization format for the framing and protocol messages. And that format should be simple enough, or the part of it used should be small enough, that it would be easy to implement from scratch if needed for some reason as a hardcoded part of an RS.io implementation. Then the implementation or its user could use an unrelated serialization format for the message content. If we simplify this spec a bit, e.g. say the publisher name is a byte array to avoid getting into specifying string semantics, then the only types used are ints and byte arrays. Even with some varint representation, that's small and simple enough that I think we shouldn't stress as much as we are over the selection of the serialization format. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @danarmak that makes sense to me. varints have me a little concerned since they add branching checks as well as "elongate" types passed offset boundaries. i.e. they don't match up on nice word boundaries and slower to handle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better to use int8/16/32/64 and pay for greater per-frame overhead? Transport compression would negate some of the cost, since the extra bytes are zeros. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better for the CPU and striding usually, yes. So, more efficient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will word boundaries be a significant problem? The frame header sizes probably won't be word multiples unless we pad. |
||
|
||
The serialization format should be fast and space-efficient. It does not need to be self-describing, since the message types and their structures are fully specified in the protocol. It needs to have the types boolean, string / byte array (length-prefixed), and varint (an integer encoded using 1 or more bytes depending on its value). | ||
|
||
The full complexity of these formats may not be needed today, but future protocol extensions might benefit. Also, an implementation might encode the published elements using the same format and decode both the framing and the messages using the same parser. | ||
|
||
Each message (frame) begins with a message type, which is a varint, followed by its contents. Messages are self-delimiting, because their structure is known from their type, and all fields are either of fixed size, self-delimited varints, or length-prefixed strings or byte arrays. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the structure of each message being as clearly defined as this, I don't see the need to specify an external serialization format. The simple types defined (vints, length-prefixed arrays, booleans) as a part of the message should more than cover the needs of the protocol itself, without a more formal/heavyweight serialization format/library. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are varints in the header, won't this lead to complex and slow multi-read() de-serialization? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @experquisite The message type could be a single byte. Even if we run out of values, types could be defined where the second byte onwards specifies a subtype. I think this is probably a good change to make regardless of varint parsing efficiency, and I'll make it. The varints used in length-prefixed arrays could be replaced with regular 32bit ints. This would use on average 2, maybe 3 more bytes per message. My intuition was to optimize for size. But I really don't know if there would be a noticeable performance hit. If you have the time, you can run a performance test scenario and find out... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maniksurtani I think the need of a serialization format also arises by the variance in message types (hello, subscribe, goodbye, packedNext, etc.) which are not completely defined in the RS SPI. In order to standardize the definition, we can leverage the existing serialization library, eg: as a protobuf IDL. |
||
|
||
## Protocol negotiation | ||
|
||
The protocol is versioned and supports future extensions. The client (i.e. the side that opened the connection) and the server do a loose handshake: | ||
|
||
--> clientHello(version: varint, extensions: Array[Id]) | ||
<-- serverHello(version: varint, extensions: Array[Id]) | ||
|
||
This is a 'loose' handshake because the server doesn't have to wait for the `clientHello` before sending its `serverHello`. | ||
|
||
The protocol version is currently version 0. If either side receives a hello message with a version it doesn't support, it MUST send a `goodbye` message (defined below) and close the connection. When future versions of the protocol introduce incompatible changes and increment the version number, transports SHOULD indicate the incompatibility when suitable, e.g. by changing the HTTP Content-Type or TCP port number). | ||
|
||
Extension to the protocol specify optional or future behaviors. | ||
1. If an extension defines a new message type not described in this specification, that message MUST NOT be sent before receiving a hello from the other side confirming that it supports that extension. | ||
2. If an extension changes the semantics of message types defined in this specification or by another extension, the modified behavior MUST be negotiated by at least one of the parties sending, and the other acknowledging, a message (defined by the extension being discussed) that declares the new behavior as active. A party supporting such an extension SHOULD NOT send messages whose semantics are modified by it before this negotiation is completed (i.e. the acknowledgement message is received). | ||
|
||
The client can optimistically send more messages after the `clientHello` without waiting for the `serverHello`. If it eventually receieves a `serverHello` with a different protocol version, it must consider that its messages were discarded. Future protocol versions will not be backward-compatible with version 0, in the sense that if a server multiple versions (e.g. both version 0 and some future version 1), it must wait for the `clientHello` and then send a `serverHello` with a version number matching the client's. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The negotiation looks fine. It might be good to mention that the union of extensions is what is chosen. I.e. both ends must support it and agree to use it. Also, ordering of extensions might be necessary to specify. Just some wording to be clear. It might be good to think of most operation as extensions. Such as serialization, compression, encryption, etc. Might be cleaner way to specify these changing needs. If so, we might just borrow some HTTP semantics here. Lots of good stuff that can be leveraged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed an update that clarifies extension negotiation. What is chosen is not the union but the intersection of extensions. I suspect this is what you mean too, since I don't see how the union could work; it would include extensions not supported by one of the two parties. |
||
## The Reactive Streams core protocol | ||
|
||
The basic RS signalling is: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This list of signals looks good to me. I'm going to go play with a basic implementation to try use cases, but it's what I'd expect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks relatively complete at first glance. It also gives us some good data points for how we might need to lay this out. What is the need for varint? Could it be bounded to 64-bits? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tmontgomery The biggest varint values used are message size prefixes, and they can definitely be bounded to 64bit or less. I used the varints only in an attempt to save space. The greatest need to reduce frame overhead is when there are many small frames, and that is also when the varints have the smallest values, fitting in one or two bytes. I had in mind the usecase of a stream of integers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. varints can be a little slow to handle on modern CPUs compared to static fields, that is. For a stream of ints, you are absolutely correct. varints are a good tradeoff of space/CPU cycles to parse. But what about limiting varint to those cases only? For framing and control, it might be better to consider the sizes needed and make them static if we can. Not sure we can, but worth a shot. |
||
|
||
--> subscribe(publisher: String, subscriber: Id, initialDemand: Long = 0) | ||
--> request(subscriber: Id, demand: Long) | ||
--> cancel(subscriber: Id) | ||
<-- onSubscribe(subscriber: Id, elementSize: varint = 0) // For elementSize != 0, see the next section | ||
<-- onNext(subscriber: Id, element: bytes) | ||
<-- onComplete(subscriber: Id) | ||
<-- onError(subscriber: Id, error: String) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are you envisioning these signals being encoded over the wire? I'm assuming binary encoding would represent each of these signals as a byte rather than string of text? Is this the varint you reference in the message frame above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These message definitions can be translated into formal definitions once we choose a serialization format. But like I said above, each message looks like a message type + payload, and the message type is a varint. The varint idea is from protobuf; msgpack has a slightly different solution, but in both cases small (7-bit) integers take 1 byte, bigger integers take 2 bytes, 3, and so on. So the message type code would take 1 byte. |
||
The protocol is fully bidirectional; either party can act in the `-->` direction. The semantics for ordering and asynchronous delivery are the same as in the Reactive Streams specification. | ||
|
||
Unlike in RS, there is no separate Subscription object; the subscriber Id identifies the recipient in all messages going <-- this way. This id is generated by the subscriber and sent in the `subscribe` message. | ||
|
||
The publisher String needs to be parsed by the recipient; it is not described by this specification. [Could be added?] | ||
|
||
The field `onSubscribe.elementSize`, if nonzero, indicates the fixed size of the elements that will be published in this stream. In fixed-size mode, the `onNext.element` field is not length-prefixed. This saves space when the messages are very small, such as individual ints. | ||
|
||
After a subscription is closed, its Id can be reused, to prevent Ids from growing without limit. The subscriber MAY reuse an Id in a `subscribe` message after it has sent `cancel` or received `onComplete` or `onError` for that Id. If it does so, it MUST guarantee that the publisher will not receive messages meant for the previous subscription with that Id after it receives the second `subscribe` message. | ||
|
||
## Packed messaging | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice. I hadn't considered this. |
||
|
||
In typical use, the most common messages by far are `onNext`. The overhead per message is typically 1 byte (message code) + 1-2 bytes (subscriber id) + 1-3 bytes (payload length) = 3-6 bytes total. When the message type is very small (e.g. an int), the overhead can be 100% or more. | ||
|
||
To reduce the overhead, the publisher can optionally declare that all stream elements will have a fixed size by setting the `onSubscribe.elementSize` field to a value greater than zero: | ||
|
||
<-- onSubscribe(subscriber: Id, elementSize: varint) | ||
|
||
The publisher can then send not just `onNext` messages but also `onNextPacked` messages: | ||
|
||
<-- onNextPacked(subscriber: Id, count: varint, messages: count * elementSize bytes) | ||
|
||
Packing does not help if new data becomes available very frequently and must not be delayed before being sent. A typical example is a ticker source. It also can't be done if the client doesn't provide enough demand. | ||
|
||
## Split messaging | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might consider using the terms "fragmentation" and "reassembly". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The max size of a "fragment" will need to be defined. I would suggest no larger than 64K minus some overhead. That way it can be kept to a single 2 byte length. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense: then the overhead from not using varints won't be so large. Unfragmented messages will also need to be limted to 64K. |
||
|
||
A very large `onNext` message might significantly delay messages from other streams. Therefore, large stream elements can be optionally split across multiple messages. Publishers MAY split elements; subscribers MUST support this. | ||
|
||
When an element is split, the publisher will send one or more `onNextPart` messages, followed by a single `onNextLastPart`: | ||
|
||
<-- onNextPart(subscriber: Id, element: Id, data: bytes) | ||
<-- onNextLastPart(subscriber: Id, element: Id, data: bytes) | ||
|
||
`element` is an Id assigned by the Publisher; messages with the same `element` value, in the same stream, will be joined by the Subscriber. The order of the parts is that in which they were sent and received (the transport is required to provide ordered delivery). | ||
|
||
The subscriber's driver will typically join the parts transparently and deliver a single message to the application. | ||
|
||
## Closing the connection | ||
|
||
When the underlying transport is closed, both sides should release all related resources. This protocol version does not specify reusing previously negotiated state after reconnecting. | ||
|
||
The orderly way of closing a connection is to send a `goodbye` message, wait for acknowledgement and then close the underlying connection: | ||
|
||
--> goodbye(reason: String) | ||
<-- goodbye(reason: String) | ||
|
||
Sending `goodbye` implicitly closes all open streams, equivalently to receiving `cancel` or `onError` messages. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be good to use ACK for the acknowledgement instead. That way it is differentiated from the goodbye. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting we define one, or just that the bytes should be serializable using any of these mechanisms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm suggesting we pick one and mandate it. This is for serializing the framing and the protocol messages themselves, not the inner payloads. The payloads are necessarily opaque to the protocol, but the user can choose to use the same serialization format for the payloads for synergy.