Skip to content

Commit

Permalink
Add custom DialContext support to m3aggregator client. (#4131)
Browse files Browse the repository at this point in the history
Very similar to #4075, with similar reasoning. Allows customization of net.Dial behavior. This is useful for e.g.
proxying across network boundaries. See comment on ConnectionOptions#ContextDialer for more context on the behavior.
  • Loading branch information
andrewmains12 authored Aug 1, 2022
1 parent c4ca2ee commit eb7fc73
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 21 deletions.
26 changes: 26 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ update those files. There are `make` targets to help with generation:

Don't forget to account for changes to generated files in tests.

You can build mocks/protos etc. for a single component with:

```
make mock-gen-<component>
e.g.
make mock-gen-aggregator
```

### Adding new mocks

`mockgen` statements are centralized in a single generate.go file per component. The convention is:

`src/<component>/generated/mocks/generate.go`

e.g. for the aggregator
`src/aggregator/generated/mocks/generate.go`

### Adding new proto definitions

Proto definitions should be placed in:

```
src/<component>/generated/proto`
```

## Scoping Pull Requests

Inspired by Phabricator's article about
Expand Down
43 changes: 36 additions & 7 deletions src/aggregator/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"context"
"errors"
"math/rand"
"net"
Expand All @@ -29,6 +30,7 @@ import (

"github.com/m3db/m3/src/x/clock"
xio "github.com/m3db/m3/src/x/io"
xnet "github.com/m3db/m3/src/x/net"
"github.com/m3db/m3/src/x/retry"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -59,7 +61,7 @@ type connection struct {
connectWithLockFn connectWithLockFn
sleepFn sleepFn
nowFn clock.NowFn
conn *net.TCPConn
conn net.Conn
rngFn retry.RngFn
writeWithLockFn writeWithLockFn
addr string
Expand All @@ -74,6 +76,7 @@ type connection struct {
numFailures int
mtx sync.Mutex
keepAlive bool
dialer xnet.ContextDialerFn
}

// newConnection creates a new connection.
Expand All @@ -88,6 +91,7 @@ func newConnection(addr string, opts ConnectionOptions) *connection {
maxThreshold: opts.MaxReconnectThreshold(),
maxDuration: opts.MaxReconnectDuration(),
writeRetryOpts: opts.WriteRetryOptions(),
dialer: opts.ContextDialer(),
rngFn: rand.New(rand.NewSource(time.Now().UnixNano())).Int63n,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
Expand Down Expand Up @@ -166,27 +170,52 @@ func (c *connection) writeAttemptWithLock(data []byte) error {
}

func (c *connection) connectWithLock() error {
// TODO: propagate this all the way up the callstack.
ctx := context.TODO()

c.lastConnectAttemptNanos = c.nowFn().UnixNano()
conn, err := net.DialTimeout(tcpProtocol, c.addr, c.connTimeout)

ctx, cancel := context.WithTimeout(ctx, c.connTimeout)
defer cancel()

conn, err := c.dialContext(ctx, c.addr)
if err != nil {
c.metrics.connectError.Inc(1)
return err
}

tcpConn := conn.(*net.TCPConn)
if err := tcpConn.SetKeepAlive(c.keepAlive); err != nil {
c.metrics.setKeepAliveError.Inc(1)
// N.B.: If using a custom dialer which doesn't return *net.TCPConn, users are responsible for TCP keep alive options
// themselves.
if tcpConn, ok := conn.(keepAlivable); ok {
if err := tcpConn.SetKeepAlive(c.keepAlive); err != nil {
c.metrics.setKeepAliveError.Inc(1)
}
}

if c.conn != nil {
c.conn.Close() // nolint: errcheck
}

c.conn = tcpConn
c.writer.Reset(tcpConn)
c.conn = conn
c.writer.Reset(conn)
return nil
}

// Make sure net.TCPConn implements this; otherwise bad things will happen.
var _ keepAlivable = (*net.TCPConn)(nil)

type keepAlivable interface {
SetKeepAlive(shouldKeepAlive bool) error
}

func (c *connection) dialContext(ctx context.Context, addr string) (net.Conn, error) {
if dialer := c.dialer; dialer != nil {
return dialer(ctx, tcpProtocol, addr)
}
var dialer net.Dialer
return dialer.DialContext(ctx, tcpProtocol, addr)
}

func (c *connection) checkReconnectWithLock() error {
// If we haven't accumulated enough failures to warrant another reconnect
// and we haven't past the maximum duration since the last time we attempted
Expand Down
170 changes: 170 additions & 0 deletions src/aggregator/client/conn_mock_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions src/aggregator/client/conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/instrument"
xio "github.com/m3db/m3/src/x/io"
xnet "github.com/m3db/m3/src/x/net"
"github.com/m3db/m3/src/x/retry"
)

Expand Down Expand Up @@ -111,6 +112,17 @@ type ConnectionOptions interface {

// RWOptions returns the RW options.
RWOptions() xio.Options

// ContextDialer allows customizing the way an aggregator client the aggregator, at the TCP layer.
// By default, this is:
// (&net.ContextDialer{}).DialContext. This can be used to do a variety of things, such as forwarding a connection
// over a proxy.
// NOTE: if your xnet.ContextDialerFn returns anything other a *net.TCPConn, TCP options such as KeepAlivePeriod
// will *not* be applied automatically. It is your responsibility to make sure these get applied as needed in
// your custom xnet.ContextDialerFn.
ContextDialer() xnet.ContextDialerFn
// SetContextDialer sets ContextDialer() -- see that method.
SetContextDialer(dialer xnet.ContextDialerFn) ConnectionOptions
}

type connectionOptions struct {
Expand All @@ -125,6 +137,7 @@ type connectionOptions struct {
maxThreshold int
multiplier int
connKeepAlive bool
dialer xnet.ContextDialerFn
}

// NewConnectionOptions create a new set of connection options.
Expand All @@ -147,6 +160,7 @@ func NewConnectionOptions() ConnectionOptions {
maxDuration: defaultMaxReconnectDuration,
writeRetryOpts: defaultWriteRetryOpts,
rwOpts: xio.NewOptions(),
dialer: nil, // Will default to net.Dialer{}.DialContext
}
}

Expand Down Expand Up @@ -259,3 +273,14 @@ func (o *connectionOptions) SetRWOptions(value xio.Options) ConnectionOptions {
func (o *connectionOptions) RWOptions() xio.Options {
return o.rwOpts
}

func (o *connectionOptions) ContextDialer() xnet.ContextDialerFn {
return o.dialer
}

// SetContextDialer see ContextDialer.
func (o *connectionOptions) SetContextDialer(dialer xnet.ContextDialerFn) ConnectionOptions {
opts := *o
opts.dialer = dialer
return &opts
}
Loading

0 comments on commit eb7fc73

Please sign in to comment.