Skip to content

Commit

Permalink
[aggregator] Add read timeout for aggregator client (#4315)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-mazhut authored Nov 29, 2024
1 parent 4feb762 commit cde168d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/aggregator/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (c *TLSConfiguration) NewTLSOptions() xtls.Options {
type ConnectionConfiguration struct {
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
ConnectionKeepAlive *bool `yaml:"connectionKeepAlive"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
InitReconnectThreshold int `yaml:"initReconnectThreshold"`
MaxReconnectThreshold int `yaml:"maxReconnectThreshold"`
Expand All @@ -254,6 +255,9 @@ func (c *ConnectionConfiguration) NewConnectionOptions(scope tally.Scope) Connec
if c.ConnectionKeepAlive != nil {
opts = opts.SetConnectionKeepAlive(*c.ConnectionKeepAlive)
}
if c.ReadTimeout != 0 {
opts = opts.SetReadTimeout(c.ReadTimeout)
}
if c.WriteTimeout != 0 {
opts = opts.SetWriteTimeout(c.WriteTimeout)
}
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ queueDropType: oldest
connection:
connectionTimeout: 1s
connectionKeepAlive: true
readTimeout: 1s
writeTimeout: 1s
initReconnectThreshold: 2
maxReconnectThreshold: 5000
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestConfigUnmarshal(t *testing.T) {
require.Equal(t, DropOldest, *cfg.QueueDropType)
require.Equal(t, time.Second, cfg.Connection.ConnectionTimeout)
require.Equal(t, true, *cfg.Connection.ConnectionKeepAlive)
require.Equal(t, time.Second, cfg.Connection.ReadTimeout)
require.Equal(t, time.Second, cfg.Connection.WriteTimeout)
require.Equal(t, 2, cfg.Connection.InitReconnectThreshold)
require.Equal(t, 5000, cfg.Connection.MaxReconnectThreshold)
Expand Down
8 changes: 8 additions & 0 deletions src/aggregator/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type connection struct {
initThreshold int
threshold int
lastConnectAttemptNanos int64
readTimeout time.Duration
writeTimeout time.Duration
connTimeout time.Duration
numFailures int
Expand All @@ -87,6 +88,7 @@ func newConnection(addr string, opts ConnectionOptions) *connection {
c := &connection{
addr: addr,
connTimeout: opts.ConnectionTimeout(),
readTimeout: opts.ReadTimeout(),
writeTimeout: opts.WriteTimeout(),
keepAlive: opts.ConnectionKeepAlive(),
initThreshold: opts.InitReconnectThreshold(),
Expand Down Expand Up @@ -271,6 +273,9 @@ func (c *connection) writeWithLock(data []byte) error {
if err := c.conn.SetWriteDeadline(c.nowFn().Add(c.writeTimeout)); err != nil {
c.metrics.setWriteDeadlineError.Inc(1)
}
if err := c.conn.SetReadDeadline(c.nowFn().Add(c.readTimeout)); err != nil {
c.metrics.setReadDeadlineError.Inc(1)
}
if _, err := c.writer.Write(data); err != nil {
c.metrics.writeError.Inc(1)
return err
Expand Down Expand Up @@ -304,6 +309,7 @@ type connectionMetrics struct {
writeError tally.Counter
writeRetries tally.Counter
setKeepAliveError tally.Counter
setReadDeadlineError tally.Counter
setWriteDeadlineError tally.Counter
}

Expand All @@ -318,6 +324,8 @@ func newConnectionMetrics(scope tally.Scope) connectionMetrics {
Counter(errorMetric),
setWriteDeadlineError: scope.Tagged(map[string]string{errorMetricType: "set-write-deadline"}).
Counter(errorMetric),
setReadDeadlineError: scope.Tagged(map[string]string{errorMetricType: "set-read-deadline"}).
Counter(errorMetric),
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/aggregator/client/conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
defaultConnectionTimeout = 1 * time.Second
defaultConnectionKeepAlive = true
defaultReadTimeout = 15 * time.Second
defaultWriteTimeout = 15 * time.Second
defaultInitReconnectThreshold = 1
defaultMaxReconnectThreshold = 4
Expand Down Expand Up @@ -72,6 +73,12 @@ type ConnectionOptions interface {
// ConnectionKeepAlive returns the keepAlive for the connection.
ConnectionKeepAlive() bool

// SetReadTimeout sets the timeout for reading data.
SetReadTimeout(value time.Duration) ConnectionOptions

// ReadTimeout returns the timeout for reading data.
ReadTimeout() time.Duration

// SetWriteTimeout sets the timeout for writing data.
SetWriteTimeout(value time.Duration) ConnectionOptions

Expand Down Expand Up @@ -138,6 +145,7 @@ type connectionOptions struct {
writeRetryOpts retry.Options
rwOpts xio.Options
connTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
maxDuration time.Duration
initThreshold int
Expand All @@ -161,6 +169,7 @@ func NewConnectionOptions() ConnectionOptions {
instrumentOpts: instrument.NewOptions(),
connTimeout: defaultConnectionTimeout,
connKeepAlive: defaultConnectionKeepAlive,
readTimeout: defaultReadTimeout,
writeTimeout: defaultWriteTimeout,
initThreshold: defaultInitReconnectThreshold,
maxThreshold: defaultMaxReconnectThreshold,
Expand Down Expand Up @@ -213,6 +222,16 @@ func (o *connectionOptions) ConnectionKeepAlive() bool {
return o.connKeepAlive
}

func (o *connectionOptions) SetReadTimeout(value time.Duration) ConnectionOptions {
opts := *o
opts.readTimeout = value
return &opts
}

func (o *connectionOptions) ReadTimeout() time.Duration {
return o.readTimeout
}

func (o *connectionOptions) SetWriteTimeout(value time.Duration) ConnectionOptions {
opts := *o
opts.writeTimeout = value
Expand Down
5 changes: 4 additions & 1 deletion src/aggregator/client/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func TestConnectWithCustomDialer(t *testing.T) {
mockConn := NewMockConn(ctrl)

mockConn.EXPECT().Write(testData)
mockConn.EXPECT().SetReadDeadline(gomock.Any())
mockConn.EXPECT().SetWriteDeadline(gomock.Any())
testWithConn(t, mockConn)
})
Expand All @@ -347,6 +348,7 @@ func TestConnectWithCustomDialer(t *testing.T) {
mockConn := NewMockConn(ctrl)

mockConn.EXPECT().Write(testData)
mockConn.EXPECT().SetReadDeadline(gomock.Any())
mockConn.EXPECT().SetWriteDeadline(gomock.Any())

mockKeepAlivable := NewMockkeepAlivable(ctrl)
Expand Down Expand Up @@ -489,7 +491,8 @@ func testConnectionOptions() ConnectionOptions {
SetInitReconnectThreshold(2).
SetMaxReconnectThreshold(6).
SetReconnectThresholdMultiplier(2).
SetWriteTimeout(100 * time.Millisecond)
SetWriteTimeout(100 * time.Millisecond).
SetReadTimeout(100 * time.Millisecond)
}

func testTLSConnectionOptions() ConnectionOptions {
Expand Down

0 comments on commit cde168d

Please sign in to comment.