Skip to content

Commit

Permalink
Merge pull request #87 from loopholelabs/staging
Browse files Browse the repository at this point in the history
Release v0.4.0
  • Loading branch information
ShivanshVij authored Mar 24, 2022
2 parents 9969d0d + d971ec8 commit b035542
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 143 deletions.
20 changes: 19 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [v0.4.0] - 2022-03-24 (Beta)

## Changes

- Changing `Connect` signatures and `Start` signatures for servers, and clients
- Changing the functionality of Server.`Start` so that it blocks and returns an error
- Adding `ServeConn` and `FromConn` functions for severs and clients
- Updating `protoc-gen-frisbee` to comply with the new changes
- Updating the buf.build manifest for `protoc-gen-frisbee`
- Making `baseContext`, `onClosed`, and `preWrite` hooks for the Server private, and creating `Setter` functions that
make it impossible to set those functions to nil

## Fixes

- Fixing panics from `ConnectSync` and `ConnectAsync` functions when the connection cannot be established - it now
returns an error properly instead

## [v0.3.2] - 2022-03-18 (Beta)

## Changes
Expand Down Expand Up @@ -179,7 +196,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

Initial Release of Frisbee

[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.3.2...HEAD
[unreleased]: https://github.com/loopholelabs/frisbee/compare/v0.4.0...HEAD
[v0.4.0]: https://github.com/loopholelabs/frisbee/compare/v0.3.2...v0.4.0
[v0.3.2]: https://github.com/loopholelabs/frisbee/compare/v0.3.1...v0.3.2
[v0.3.1]: https://github.com/loopholelabs/frisbee/compare/v0.3.0...v0.3.1
[v0.3.0]: https://github.com/loopholelabs/frisbee/compare/v0.2.4...v0.3.0
Expand Down
12 changes: 7 additions & 5 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,29 @@ type Async struct {
}

// ConnectAsync creates a new TCP connection (using net.Dial) and wraps it in a frisbee connection
func ConnectAsync(addr string, keepAlive time.Duration, logger *zerolog.Logger, TLSConfig *tls.Config, blocking bool) (*Async, error) {
func ConnectAsync(addr string, keepAlive time.Duration, logger *zerolog.Logger, TLSConfig *tls.Config) (*Async, error) {
var conn net.Conn
var err error

if TLSConfig != nil {
conn, err = tls.Dial("tcp", addr, TLSConfig)
} else {
conn, err = net.Dial("tcp", addr)
_ = conn.(*net.TCPConn).SetKeepAlive(true)
_ = conn.(*net.TCPConn).SetKeepAlivePeriod(keepAlive)
if err == nil {
_ = conn.(*net.TCPConn).SetKeepAlive(true)
_ = conn.(*net.TCPConn).SetKeepAlivePeriod(keepAlive)
}
}

if err != nil {
return nil, err
}

return NewAsync(conn, logger, blocking), nil
return NewAsync(conn, logger), nil
}

// NewAsync takes an existing net.Conn object and wraps it in a frisbee connection
func NewAsync(c net.Conn, logger *zerolog.Logger, blocking bool) (conn *Async) {
func NewAsync(c net.Conn, logger *zerolog.Logger) (conn *Async) {
conn = &Async{
conn: c,
closed: atomic.NewBool(false),
Expand Down
38 changes: 20 additions & 18 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestNewAsync(t *testing.T) {

reader, writer := net.Pipe()

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

p := packet.Get()
p.Metadata.Id = 64
Expand Down Expand Up @@ -97,8 +97,8 @@ func TestAsyncLargeWrite(t *testing.T) {

reader, writer := net.Pipe()

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

randomData := make([][]byte, testSize)
p := packet.Get()
Expand Down Expand Up @@ -145,8 +145,8 @@ func TestAsyncRawConn(t *testing.T) {
reader, writer, err := pair.New()
require.NoError(t, err)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

randomData := make([]byte, packetSize)
_, _ = rand.Read(randomData)
Expand Down Expand Up @@ -204,8 +204,8 @@ func TestAsyncReadClose(t *testing.T) {

emptyLogger := zerolog.New(ioutil.Discard)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

p := packet.Get()
p.Metadata.Id = 64
Expand Down Expand Up @@ -252,8 +252,8 @@ func TestAsyncReadAvailableClose(t *testing.T) {

emptyLogger := zerolog.New(ioutil.Discard)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

p := packet.Get()
p.Metadata.Id = 64
Expand Down Expand Up @@ -302,8 +302,8 @@ func TestAsyncWriteClose(t *testing.T) {

emptyLogger := zerolog.New(ioutil.Discard)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

p := packet.Get()
p.Metadata.Id = 64
Expand Down Expand Up @@ -353,8 +353,8 @@ func TestAsyncTimeout(t *testing.T) {
reader, writer, err := pair.New()
require.NoError(t, err)

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

p := packet.Get()
p.Metadata.Id = 64
Expand Down Expand Up @@ -389,7 +389,9 @@ func TestAsyncTimeout(t *testing.T) {
err = writerConn.conn.Close()
assert.NoError(t, err)

runtime.Gosched()
time.Sleep(defaultDeadline * 5)
runtime.Gosched()

_, err = readerConn.ReadPacket()
assert.ErrorIs(t, err, ConnectionClosed)
Expand All @@ -414,8 +416,8 @@ func BenchmarkAsyncThroughputPipe(b *testing.B) {

reader, writer := net.Pipe()

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

b.Run("32 Bytes", throughputRunner(testSize, 32, readerConn, writerConn))
b.Run("512 Bytes", throughputRunner(testSize, 512, readerConn, writerConn))
Expand All @@ -437,8 +439,8 @@ func BenchmarkAsyncThroughputNetwork(b *testing.B) {
b.Fatal(err)
}

readerConn := NewAsync(reader, &emptyLogger, false)
writerConn := NewAsync(writer, &emptyLogger, false)
readerConn := NewAsync(reader, &emptyLogger)
writerConn := NewAsync(writer, &emptyLogger)

b.Run("32 Bytes", throughputRunner(testSize, 32, readerConn, writerConn))
b.Run("512 Bytes", throughputRunner(testSize, 512, readerConn, writerConn))
Expand Down
42 changes: 26 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

// Client connects to a frisbee Server and can send and receive frisbee packets
type Client struct {
addr string
conn *Async
handlerTable HandlerTable
ctx context.Context
Expand All @@ -48,12 +47,7 @@ type Client struct {

// NewClient returns an uninitialized frisbee Client with the registered ClientRouter.
// The ConnectAsync method must then be called to dial the server and initialize the connection.
//
// If poolSize == 0 then no pool will be allocated, and all handlers will be run synchronously for their
// incoming connections. If poolSize == -1 then a pool with unlimited size will be allocated. Otherwise, a pool
// with size `poolSize` will be allocated.
func NewClient(addr string, handlerTable HandlerTable, ctx context.Context, opts ...Option) (*Client, error) {

func NewClient(handlerTable HandlerTable, ctx context.Context, opts ...Option) (*Client, error) {
for i := uint16(0); i < RESERVED9; i++ {
if _, ok := handlerTable[i]; ok {
return nil, InvalidHandlerTable
Expand All @@ -71,7 +65,6 @@ func NewClient(addr string, handlerTable HandlerTable, ctx context.Context, opts
}

return &Client{
addr: addr,
handlerTable: handlerTable,
ctx: ctx,
options: options,
Expand All @@ -81,26 +74,43 @@ func NewClient(addr string, handlerTable HandlerTable, ctx context.Context, opts
}

// Connect actually connects to the given frisbee server, and starts the reactor goroutines
// to receive and handle incoming packets.
func (c *Client) Connect() error {
c.Logger().Debug().Msgf("Connecting to %s", c.addr)
// to receive and handle incoming packets. If this function is called, FromConn should not be called.
func (c *Client) Connect(addr string) error {
c.Logger().Debug().Msgf("Connecting to %s", addr)
var frisbeeConn *Async
var err error
frisbeeConn, err = ConnectAsync(c.addr, c.options.KeepAlive, c.Logger(), c.options.TLSConfig, true)
frisbeeConn, err = ConnectAsync(addr, c.options.KeepAlive, c.Logger(), c.options.TLSConfig)
if err != nil {
return err
}
c.conn = frisbeeConn
c.Logger().Info().Msgf("Connected to %s", c.addr)
c.Logger().Info().Msgf("Connected to %s", addr)

c.wg.Add(1)
go c.handleConn()
c.Logger().Debug().Msgf("Connection handler started for %s", addr)

if c.options.Heartbeat > time.Duration(0) {
c.wg.Add(1)
go c.heartbeat()
c.Logger().Debug().Msgf("Heartbeat started for %s", addr)
}

return nil
}

// FromConn takes a pre-existing connection to a Frisbee server and starts the reactor goroutines
// to receive and handle incoming packets. If this function is called, Connect should not be called.
func (c *Client) FromConn(conn net.Conn) error {
c.conn = NewAsync(conn, c.Logger())
c.wg.Add(1)
go c.handleConn()
c.Logger().Debug().Msgf("Connection handler started for %s", c.addr)
c.Logger().Debug().Msgf("Connection handler started for %s", c.conn.RemoteAddr())

if c.options.Heartbeat > time.Duration(0) {
c.wg.Add(1)
go c.heartbeat()
c.Logger().Debug().Msgf("Heartbeat started for %s", c.addr)
c.Logger().Debug().Msgf("Heartbeat started for %s", c.conn.RemoteAddr())
}

return nil
Expand Down Expand Up @@ -210,7 +220,7 @@ LOOP:
c.ctx = c.UpdateContext(c.ctx, c.conn)
}
case CLOSE:
c.Logger().Debug().Msgf("Closing connection %s because of CLOSE action", c.addr)
c.Logger().Debug().Msgf("Closing connection %s because of CLOSE action", c.conn.RemoteAddr())
c.wg.Done()
_ = c.Close()
return
Expand Down
41 changes: 25 additions & 16 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"github.com/loopholelabs/frisbee/pkg/metadata"
"github.com/loopholelabs/frisbee/pkg/packet"
"github.com/loopholelabs/testing/conn/pair"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,22 +64,24 @@ func TestClientRaw(t *testing.T) {
}

emptyLogger := zerolog.New(ioutil.Discard)
s, err := NewServer(":0", serverHandlerTable, WithLogger(&emptyLogger))
s, err := NewServer(serverHandlerTable, WithLogger(&emptyLogger))
require.NoError(t, err)

s.ConnContext = func(ctx context.Context, c *Async) context.Context {
return context.WithValue(ctx, clientConnContextKey, c)
}

err = s.Start()
serverConn, clientConn, err := pair.New()
require.NoError(t, err)

c, err := NewClient(s.listener.Addr().String(), clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
go s.ServeConn(serverConn)

c, err := NewClient(clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
assert.NoError(t, err)
_, err = c.Raw()
assert.ErrorIs(t, ConnectionNotInitialized, err)

err = c.Connect()
err = c.FromConn(clientConn)
require.NoError(t, err)

data := make([]byte, packetSize)
Expand Down Expand Up @@ -154,18 +157,20 @@ func TestClientStaleClose(t *testing.T) {
}

emptyLogger := zerolog.New(ioutil.Discard)
s, err := NewServer(":0", serverHandlerTable, WithLogger(&emptyLogger))
s, err := NewServer(serverHandlerTable, WithLogger(&emptyLogger))
require.NoError(t, err)

err = s.Start()
serverConn, clientConn, err := pair.New()
require.NoError(t, err)

c, err := NewClient(s.listener.Addr().String(), clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
go s.ServeConn(serverConn)

c, err := NewClient(clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
assert.NoError(t, err)
_, err = c.Raw()
assert.ErrorIs(t, ConnectionNotInitialized, err)

err = c.Connect()
err = c.FromConn(clientConn)
require.NoError(t, err)

data := make([]byte, packetSize)
Expand Down Expand Up @@ -210,21 +215,23 @@ func BenchmarkThroughputClient(b *testing.B) {
}

emptyLogger := zerolog.New(ioutil.Discard)
s, err := NewServer(":0", serverHandlerTable, WithLogger(&emptyLogger))
s, err := NewServer(serverHandlerTable, WithLogger(&emptyLogger))
if err != nil {
b.Fatal(err)
}

err = s.Start()
serverConn, clientConn, err := pair.New()
if err != nil {
b.Fatal(err)
}

c, err := NewClient(s.listener.Addr().String(), clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
go s.ServeConn(serverConn)

c, err := NewClient(clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
if err != nil {
b.Fatal(err)
}
err = c.Connect()
err = c.FromConn(clientConn)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -291,21 +298,23 @@ func BenchmarkThroughputResponseClient(b *testing.B) {
}

emptyLogger := zerolog.New(ioutil.Discard)
s, err := NewServer(":0", serverHandlerTable, WithLogger(&emptyLogger))
s, err := NewServer(serverHandlerTable, WithLogger(&emptyLogger))
if err != nil {
b.Fatal(err)
}

err = s.Start()
serverConn, clientConn, err := pair.New()
if err != nil {
b.Fatal(err)
}

c, err := NewClient(s.listener.Addr().String(), clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
go s.ServeConn(serverConn)

c, err := NewClient(clientHandlerTable, context.Background(), WithLogger(&emptyLogger))
if err != nil {
b.Fatal(err)
}
err = c.Connect()
err = c.FromConn(clientConn)
if err != nil {
b.Fatal(err)
}
Expand Down
Loading

0 comments on commit b035542

Please sign in to comment.