Skip to content

Commit

Permalink
[coordinator] Prevent TChannel leaks (#4026)
Browse files Browse the repository at this point in the history
  • Loading branch information
Antanukas authored Dec 23, 2021
1 parent 27003c5 commit 1d05072
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/dbnode/client/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ func (p *connPool) connectEvery(interval time.Duration, stutter time.Duration) {
if p.status == statusOpen {
p.pool = append(p.pool, conn{channel, client})
p.poolLen = int64(len(p.pool))
} else {
// NB(antanas): just being defensive.
// It's likely a corner case and happens only during server shutdown.
channel.Close()
}
p.Unlock()
}()
Expand Down
20 changes: 18 additions & 2 deletions src/dbnode/client/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,26 @@ var (
)

type noopPooledChannel struct {
address string
address string
closeCount int32
}

func asNoopPooledChannel(c Channel) *noopPooledChannel {
cc, ok := c.(*noopPooledChannel)
if !ok {
panic("not a noopPooledChannel")
}
return cc
}

func (c *noopPooledChannel) CloseCount() int {
return int(atomic.LoadInt32(&c.closeCount))
}

func (c *noopPooledChannel) Close() {
atomic.AddInt32(&c.closeCount, 1)
}

func (c *noopPooledChannel) Close() {}
func (c *noopPooledChannel) GetSubChannel(
serviceName string,
opts ...tchannel.SubChannelOption,
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ func (s *session) DedicatedConnection(
}

if err := s.healthCheckNewConnFn(client, s.opts, opts.BootstrappedNodesOnly); err != nil {
channel.Close()
multiErr = multiErr.Add(err)
return
}
Expand Down
22 changes: 16 additions & 6 deletions src/dbnode/client/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,25 +445,35 @@ func TestDedicatedConnection(t *testing.T) {

_, ch, err := s.DedicatedConnection(shardID, DedicatedConnectionOptions{})
require.NoError(t, err)
assert.Equal(t, &noopPooledChannel{"remote1"}, ch)
assert.Equal(t, "remote1", asNoopPooledChannel(ch).address)

_, ch2, err := s.DedicatedConnection(shardID, DedicatedConnectionOptions{ShardStateFilter: shard.Available})
require.NoError(t, err)
assert.Equal(t, &noopPooledChannel{"remote2"}, ch2)
assert.Equal(t, "remote2", asNoopPooledChannel(ch2).address)

s.healthCheckNewConnFn = testHealthCheck(nil, true)
_, ch3, err := s.DedicatedConnection(shardID, DedicatedConnectionOptions{BootstrappedNodesOnly: true})
require.NoError(t, err)
assert.Equal(t, &noopPooledChannel{"remote1"}, ch3)
assert.Equal(t, "remote1", asNoopPooledChannel(ch3).address)

healthErr := errors.New("unhealthy")
s.healthCheckNewConnFn = testHealthCheck(healthErr, false)

var channels []*noopPooledChannel
s.opts = NewOptions().SetNewConnectionFn(func(_ string, _ string, _ Options) (Channel, rpc.TChanNode, error) {
c := &noopPooledChannel{"test", 0}
channels = append(channels, c)
return c, nil, nil
})
_, _, err = s.DedicatedConnection(shardID, DedicatedConnectionOptions{})
require.NotNil(t, err)
multiErr, ok := err.(xerror.MultiError) // nolint: errorlint
assert.True(t, ok, "expecting MultiError")
assert.True(t, multiErr.Contains(healthErr))
// 2 because of 2 remote hosts failing health check
assert.Len(t, channels, 2)
assert.Equal(t, 1, channels[0].CloseCount())
assert.Equal(t, 1, channels[1].CloseCount())
}

func testSessionClusterConnectConsistencyLevel(
Expand Down Expand Up @@ -621,9 +631,9 @@ func testHealthCheck(err error, bootstrappedNodesOnly bool) func(rpc.TChanNode,
}

func noopNewConnection(
channelName string,
_ string,
addr string,
opts Options,
_ Options,
) (Channel, rpc.TChanNode, error) {
return &noopPooledChannel{addr}, nil, nil
return &noopPooledChannel{addr, 0}, nil, nil
}

0 comments on commit 1d05072

Please sign in to comment.