Skip to content

Commit

Permalink
transport to stream
Browse files Browse the repository at this point in the history
Former-commit-id: 801caf6
  • Loading branch information
ivcosla committed Oct 1, 2019
1 parent 1951934 commit c5bbe12
Show file tree
Hide file tree
Showing 19 changed files with 7,384 additions and 188 deletions.
48 changes: 24 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Distributed messaging system.

>**TODO:**
>
>- `ACK` frames should include the first 4 bytes of the rolling hash of incoming payloads, enforcing reliability of data. Transports should therefore keep track of incoming/outgoing rolling hashes.
>- Transports should also be noise-encrypted. `REQUEST` and `ACCEPT` frames should include noise handshake messages (`KK` handshake pattern), and the `FWD` and `ACK` payloads are to be encrypted.
> - Transports should implement read/write deadlines and local/remote addresses (like `net.Conn`).
>- `ACK` frames should include the first 4 bytes of the rolling hash of incoming payloads, enforcing reliability of data. Streams should therefore keep track of incoming/outgoing rolling hashes.
>- Streams should also be noise-encrypted. `REQUEST` and `ACCEPT` frames should include noise handshake messages (`KK` handshake pattern), and the `FWD` and `ACK` payloads are to be encrypted.
> - Streams should implement read/write deadlines and local/remote addresses (like `net.Conn`).
> - `dmsg.Server` should check incoming frames to disallow excessive sending of `CLOSE`, `ACCEPT` and `REQUEST` frames.
## Terminology
Expand All @@ -17,15 +17,15 @@ Distributed messaging system.
- **frame -** The data unit of the `dmsg` system.
- **frame type -** The type of `dmsg` frame. There are four frame types; `REQUEST`, `ACCEPT`, `CLOSE`, `FWD`, `ACK`.
- **connection -** The direct line of duplex communication between a `dmsg.Server` and `dmsg.Client`.
- **transport -** A line of communication between two `dmsg.Client`s that is proxied via a `dmsg.Server`.
- **transport ID -** A uint16 value that identifies a transport.
- **stream -** A line of communication between two `dmsg.Client`s that is proxied via a `dmsg.Server`.
- **stream ID -** A uint16 value that identifies a stream.

## Entities

The `dmsg` system is made up of three entity types:
- `dmsg.Discovery` is a RESTful API that allows `dmsg.Client`s to find remote `dmg.Client`s and `dmsg.Server`s.
- `dmsg.Server` proxies frames between clients.
- `dmsg.Client` establishes transports between itself and remote `dmsg.Client`s.
- `dmsg.Client` establishes streams between itself and remote `dmsg.Client`s.

Entities of types `dmsg.Server` or `dmsg.Client` are represented by a `secp256k1` public key.

Expand Down Expand Up @@ -56,12 +56,12 @@ Note that `dmsg.Client` always initiates the `dmsg` connection, and it is a give
Frames are sent and received within a `dmsg` connection after the noise handshake. A frame has two sections; the header and the payload. Here are the fields of a frame:

```
|| FrameType | TransportID | PayloadSize || Payload ||
|| FrameType | StreamID | PayloadSize || Payload ||
|| 1 byte | 2 bytes | 2 bytes || ~ bytes ||
```

- The `FrameType` specifies the frame type via the one byte.
- The `TransportID` contains an encoded `uint16` which represents a identifier for a transport. A set of IDs are unique for a given `dmsg` connection.
- The `StreamID` contains an encoded `uint16` which represents a identifier for a stream. A set of IDs are unique for a given `dmsg` connection.
- The `PayloadSize` contains an encoded `uint16` which represents the size (in bytes) of the payload.
- The `Payload` have a size that is obtainable via `PayloadSize`.

Expand All @@ -72,37 +72,37 @@ The following is a summary of the frame types:
| `0x1` | `REQUEST` | initiating client's public key + responding client's public key | 66 |
| `0x2` | `ACCEPT` | initiating client's public key + responding client's public key | 66 |
| `0x3` | `CLOSE` | 1 byte that represents the reason for closing | 1 |
| `0xa` | `FWD` | uint16 sequence + transport payload | >2 |
| `0xa` | `FWD` | uint16 sequence + stream payload | >2 |
| `0xb` | `ACK` | uint16 sequence | 2 |

## Transports
## Streams

Transports are represented by transport IDs and facilitate duplex communication between two `dmsg.Client`s which are connected to a common `dmsg.Server`.
Streams are represented by stream IDs and facilitate duplex communication between two `dmsg.Client`s which are connected to a common `dmsg.Server`.

Transport IDs are assigned in such a manner:
- A `dmsg.Client` manages the assignment of even transport IDs between itself and each connected `dmsg.Server`. The set of transport IDs will be unique between itself and each `dmsg.Server`.
- A `dmsg.Server` manages the assignment of odd transport IDs between itself and each connected `dmsg.Client`. The set of transport IDs will be unique between itself and each `dmsg.Client`.
Stream IDs are assigned in such a manner:
- A `dmsg.Client` manages the assignment of even stream IDs between itself and each connected `dmsg.Server`. The set of stream IDs will be unique between itself and each `dmsg.Server`.
- A `dmsg.Server` manages the assignment of odd stream IDs between itself and each connected `dmsg.Client`. The set of stream IDs will be unique between itself and each `dmsg.Client`.

For a given transport:
- Between the initiating client and the common server - the transport ID is always a even value.
- Between the responding client and the common server - the transport ID is always a odd value.
For a given stream:
- Between the initiating client and the common server - the stream ID is always a even value.
- Between the responding client and the common server - the stream ID is always a odd value.

Hence, a transport in it's entirety, is represented by 2 transport IDs.
Hence, a stream in it's entirety, is represented by 2 stream IDs.

### Transport Establishment
### Stream Establishment

1. The initiating client chooses an even transport ID and forms a `REQUEST` frame with the chosen transport ID, initiating client's public key (itself) and also the responding client's public key. The `REQUEST` frame is then sent to the common server. The transport ID chosen must be unused between the initiating client and the server.
2. The common server receives the `REQUEST` frame and checks the contents. If valid, and the responding client exists, the server chooses an odd transport ID, swaps this original transport ID of the `REQUEST` frame with the chosen odd transport ID, and continues to forward it to the responding client. In doing this, the server records a rule relating the initiating/responding clients and the associated odd/even transport IDs.
3. The responding client receives the `REQUEST` frame and checks the contents. If valid, the responding client sends an `ACCEPT` frame (containing the same payload as the `REQUEST`) back to the common server. The common server changes the transport ID, and forwards the `ACCEPT` to the initiating client.
1. The initiating client chooses an even stream ID and forms a `REQUEST` frame with the chosen stream ID, initiating client's public key (itself) and also the responding client's public key. The `REQUEST` frame is then sent to the common server. The stream ID chosen must be unused between the initiating client and the server.
2. The common server receives the `REQUEST` frame and checks the contents. If valid, and the responding client exists, the server chooses an odd stream ID, swaps this original stream ID of the `REQUEST` frame with the chosen odd stream ID, and continues to forward it to the responding client. In doing this, the server records a rule relating the initiating/responding clients and the associated odd/even stream IDs.
3. The responding client receives the `REQUEST` frame and checks the contents. If valid, the responding client sends an `ACCEPT` frame (containing the same payload as the `REQUEST`) back to the common server. The common server changes the stream ID, and forwards the `ACCEPT` to the initiating client.

On any step, if an error occurs, any entity can send a `CLOSE` frame.

### Acknowledgement Logic

Each `FWD` frame is to be met with an `ACK` frame in order to be considered delivered.

- Each `FWD` payload has a 2-byte prefix (represented by a uint16 sequence). This sequence is unique per transport.
- The destination of the transport, after receiving the `FWD` frame, responds with an `ACK` frame with the same sequence as the payload.
- Each `FWD` payload has a 2-byte prefix (represented by a uint16 sequence). This sequence is unique per stream.
- The destination of the stream, after receiving the `FWD` frame, responds with an `ACK` frame with the same sequence as the payload.

## `dmsg.Discovery`

Expand Down
40 changes: 20 additions & 20 deletions TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ Note that even though `dmsg-discovery` is also considered to be an entity of `dm

- Given:
- clientA is connected to clientB via a server.
- There is already a single transport established between clientA and clientB.
- The single transport is being written/read to/from in a continuous loop.
- There is already a single stream established between clientA and clientB.
- The single stream is being written/read to/from in a continuous loop.
- When:
- clientA dials transports to clientB until failure (in which clientB does not call `.Accept()`).
- clientA dials streams to clientB until failure (in which clientB does not call `.Accept()`).
- Then:
- Read/writes to/from the existing transport should still work.
- Read/writes to/from the existing stream should still work.

**`capped_transport_buffer_should_not_result_in_hang`**
**`capped_stream_buffer_should_not_result_in_hang`**

- Given:
- A transport is established between clientA and clientB.
- A stream is established between clientA and clientB.
- clientA writes to clientB until clientB's buffer is capped (or in other words, clientA's write blocks).
- When:
- clientB dials to clientA and begins reading/writing to/from the newly established transport.
- clientB dials to clientA and begins reading/writing to/from the newly established stream.
- Then:
- It should work as expected still.

Expand All @@ -39,40 +39,40 @@ Note that even though `dmsg-discovery` is also considered to be an entity of `dm
- The server restarts.
- Then:
- Both clients will automatically reconnect to the server.
- Transports can be established between clientA and clientB.
- Streams can be established between clientA and clientB.

**`server_disconnect_should_close_transports`**
**`server_disconnect_should_close_streams`**

- Given:
- clientA and clientB are connected to a server
- clientB dials clientA
- clientA accepts connection
- Transports are being created
- Some read/write operations are performed on transports
- Streams are being created
- Some read/write operations are performed on streams
- Server disconnects
- Then:
- Transports should be closed
- Streams should be closed

**`server_disconnect_should_close_transports_while_communication_is_going_on`**
**`server_disconnect_should_close_streams_while_communication_is_going_on`**

- Given:
- clientA and clientB are connected to a server
- clientB dials clientA
- clientA accepts connection
- Transports are being created
- Streams are being created
- Read/write operations are being performed
- Server disconnects
- Then:
- Transports should be closed
- Streams should be closed

**`self_dial_should_work`**

- Given:
- clientA is connected to a server
- clientA dials himself
- Then:
- clientA accept connections, transports are being created successfully
- clientA is able to write/read to/from transports without errors
- clientA accept connections, streams are being created successfully
- clientA is able to write/read to/from streams without errors

### Fuzz testing

Expand All @@ -85,18 +85,18 @@ Possible events:
2. Stop random server.
3. Start random client.
1. With or without `Accept()` handling.
2. With or without `transport.Read()` handling.
2. With or without `stream.Read()` handling.
4. Stop random client.
5. Random client dials to another random client.
6. Random write (in len/count) from random established transport.
6. Random write (in len/count) from random established stream.

Notes:
1. We have a set number of possible servers and we are to start all these servers prior to running the test. This way the discovery has entries of the servers which the clients can access when starting.
2. We may need to log the "events" that happen to calculate the expected state of the system
and run the check every x "events".


For this test we must have a set up system consisting of X number of servers, Y number of clients, Z number of transports and a single discovery.
For this test we must have a set up system consisting of X number of servers, Y number of clients, Z number of streams and a single discovery.
Also we need some kind of control panel from which we will run events. Events maybe picked as following:
- each event has it's own probability
- first, we pick a random number of events to be executed
Expand Down
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SetLogger(log *logging.Logger) ClientOption {
}
}

// Client implements transport.Factory
// Client implements stream.Factory
type Client struct {
log *logging.Logger

Expand Down Expand Up @@ -205,7 +205,7 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey)
if err != nil {
return nil, err
}
nc, err := noise.WrapConn(tcpConn, ns, TransportHandshakeTimeout)
nc, err := noise.WrapConn(tcpConn, ns, StreamHandshakeTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,8 +248,8 @@ func (c *Client) Listen(port uint16) (*Listener, error) {
return l, nil
}

// Dial dials a transport to remote dms_client.
func (c *Client) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (*Transport, error) {
// Dial dials a stream to remote dms_client.
func (c *Client) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (*Stream, error) {
entry, err := c.dc.Entry(ctx, remote)
if err != nil {
return nil, fmt.Errorf("get entry failure: %s", err)
Expand All @@ -266,7 +266,7 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (*
c.log.WithError(err).Warn("failed to connect to server")
continue
}
return conn.DialTransport(ctx, remote, port)
return conn.DialStream(ctx, remote, port)
}
return nil, errors.New("failed to find dms_servers for given client pk")
}
Expand All @@ -278,7 +278,7 @@ func (c *Client) Addr() net.Addr {
}
}

// Type returns the transport type.
// Type returns the stream type.
func (c *Client) Type() string {
return Type
}
Expand Down
30 changes: 15 additions & 15 deletions client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ClientConn struct {
// locally-initiated tps use an even tp_id between local and intermediary dms_server.
nextInitID uint16

// Transports: map of transports to remote dms_clients (key: tp_id, val: transport).
tps map[uint16]*Transport
// Streams: map of streams to remote dms_clients (key: tp_id, val: stream).
tps map[uint16]*Stream
mx sync.RWMutex // to protect tps

pm *PortManager
Expand All @@ -44,7 +44,7 @@ func NewClientConn(log *logging.Logger, pm *PortManager, conn net.Conn, lPK, rPK
lPK: lPK,
srvPK: rPK,
nextInitID: randID(true),
tps: make(map[uint16]*Transport),
tps: make(map[uint16]*Stream),
pm: pm,
done: make(chan struct{}),
}
Expand Down Expand Up @@ -75,23 +75,23 @@ func (c *ClientConn) getNextInitID(ctx context.Context) (uint16, error) {
}
}

func (c *ClientConn) addTp(ctx context.Context, rPK cipher.PubKey, lPort, rPort uint16, closeCB func()) (*Transport, error) {
func (c *ClientConn) addTp(ctx context.Context, rPK cipher.PubKey, lPort, rPort uint16, closeCB func()) (*Stream, error) {
c.mx.Lock()
defer c.mx.Unlock()

id, err := c.getNextInitID(ctx)
if err != nil {
return nil, err
}
tp := NewTransport(c.Conn, c.log, Addr{c.lPK, lPort}, Addr{rPK, rPort}, id, maxFwdPayLen, func() {
tp := NewStream(c.Conn, c.log, Addr{c.lPK, lPort}, Addr{rPK, rPort}, id, maxFwdPayLen, func() {
c.delTp(id)
closeCB()
})
c.tps[id] = tp
return tp, nil
}

func (c *ClientConn) setTp(tp *Transport) {
func (c *ClientConn) setTp(tp *Stream) {
c.mx.Lock()
c.tps[tp.id] = tp
c.mx.Unlock()
Expand All @@ -103,7 +103,7 @@ func (c *ClientConn) delTp(id uint16) {
c.mx.Unlock()
}

func (c *ClientConn) getTp(id uint16) (*Transport, bool) {
func (c *ClientConn) getTp(id uint16) (*Stream, bool) {
c.mx.RLock()
tp := c.tps[id]
c.mx.RUnlock()
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c *ClientConn) handleRequestFrame(log *logrus.Entry, id uint16, p []byte)
// TODO(evanlinjin): derive close reason from error.
closeTp := func(origErr error) (cipher.PubKey, error) {
if err := writeCloseFrame(c.Conn, id, PlaceholderReason); err != nil {
log.WithError(err).Warn("handleRequestFrame: failed to close transport: ending conn to server.")
log.WithError(err).Warn("handleRequestFrame: failed to close stream: ending conn to server.")
log.WithError(c.Close()).Warn("handleRequestFrame: closing connection to server.")
return initPK, origErr
}
Expand Down Expand Up @@ -171,13 +171,13 @@ func (c *ClientConn) handleRequestFrame(log *logrus.Entry, id uint16, p []byte)
return closeTp(ErrClientClosed) // TODO(nkryuchkov): reason = client is closed.
}

tp := NewTransport(c.Conn, c.log, pay.RespAddr, pay.InitAddr, id, maxFwdPayLen, func() { c.delTp(id) })
tp := NewStream(c.Conn, c.log, pay.RespAddr, pay.InitAddr, id, maxFwdPayLen, func() { c.delTp(id) })
if err := tp.WriteAccept(int(pay.Window)); err != nil {
return initPK, err
}
go tp.Serve()

if err := lis.IntroduceTransport(tp); err != nil {
if err := lis.IntroduceStream(tp); err != nil {
return initPK, err
}
c.setTp(tp)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (c *ClientConn) Serve(ctx context.Context) (err error) {
// Delete tp on any failure.
if tp, ok := c.getTp(df.TpID); ok {
if err := tp.HandleFrame(f); err != nil {
log.WithError(err).Warnf("Rejected [%s]: Transport closed.", df.Type)
log.WithError(err).Warnf("Rejected [%s]: Stream closed.", df.Type)
}
continue
}
Expand All @@ -227,7 +227,7 @@ func (c *ClientConn) Serve(ctx context.Context) (err error) {
}(log)

default:
log.Debugf("Ignored [%s]: No transport of given ID.", df.Type)
log.Debugf("Ignored [%s]: No stream of given ID.", df.Type)
if df.Type != CloseType {
if err := writeCloseFrame(c.Conn, df.TpID, PlaceholderReason); err != nil {
return err
Expand All @@ -237,8 +237,8 @@ func (c *ClientConn) Serve(ctx context.Context) (err error) {
}
}

// DialTransport dials a transport to remote dms_client.
func (c *ClientConn) DialTransport(ctx context.Context, rPK cipher.PubKey, rPort uint16) (*Transport, error) {
// DialStream dials a stream to remote dms_client.
func (c *ClientConn) DialStream(ctx context.Context, rPK cipher.PubKey, rPort uint16) (*Stream, error) {
lPort, closeCB, err := c.pm.ReserveEphemeral(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -270,7 +270,7 @@ func (c *ClientConn) close() (closed bool) {
tp := tp
go func() {
if err := tp.Close(); err != nil {
log.WithError(err).Warn("Failed to close transport")
log.WithError(err).Warn("Failed to close stream")
}
}()
}
Expand Down
Loading

0 comments on commit c5bbe12

Please sign in to comment.