Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Feb 28, 2024
1 parent b15d69d commit 47bf2a3
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 17 deletions.
6 changes: 5 additions & 1 deletion balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (b *balancer) initConnInfoLocked(conns []conn.Conn) {
}()
cancel := connCancel
if b.roundTripperMaxLifetime != 0 {
timer := time.AfterFunc(b.roundTripperMaxLifetime, func() {
timer := b.clock.AfterFunc(b.roundTripperMaxLifetime, func() {
b.recycle(connection)
})
cancel = func() {
Expand Down Expand Up @@ -540,6 +540,10 @@ func (c *connManager) recycleConns(connsToRecycle []conn.Conn) {
i++
}
}
if i == 0 {
// nothing to actually recycle
return
}
connsToRecycle = connsToRecycle[:i]
}
newAddrs := make([]resolver.Address, len(connsToRecycle))
Expand Down
221 changes: 205 additions & 16 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package httplb
import (
"context"
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -86,13 +87,19 @@ func TestConnManager_ReconcileAddresses(t *testing.T) {
require.Equal(t, addrs, latestUpdate.newAddrs)
require.Empty(t, latestUpdate.removeConns)
conns := pool.SnapshotConns()
require.Equal(t, len(addrs), len(conns))
conn1 := balancertesting.FindConn(conns, addrs[0], 1)
require.NotNil(t, conn1)
conn2 := balancertesting.FindConn(conns, addrs[1], 2)
require.NotNil(t, conn2)
conn3 := balancertesting.FindConn(conns, addrs[2], 3)
require.NotNil(t, conn3)
conn4 := balancertesting.FindConn(conns, addrs[3], 4)
require.NotNil(t, conn4)
conn5 := balancertesting.FindConn(conns, addrs[4], 5)
require.NotNil(t, conn5)
conn6 := balancertesting.FindConn(conns, addrs[5], 6)
require.Equal(t, len(addrs), len(conns))
require.NotNil(t, conn6)

// let's try a different set that includes duplicates
addrs = []resolver.Address{
Expand Down Expand Up @@ -125,12 +132,19 @@ func TestConnManager_ReconcileAddresses(t *testing.T) {
conns = pool.SnapshotConns()
require.Equal(t, 10, len(conns))
conn1i7 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.1"}, 7)
require.NotNil(t, conn1i7)
conn1i8 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.1"}, 8)
require.NotNil(t, conn1i8)
conn1i9 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.1"}, 9)
require.NotNil(t, conn1i9)
conn2i10 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.2"}, 10)
require.NotNil(t, conn2i10)
conn2i11 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.2"}, 11)
require.NotNil(t, conn2i11)
conn3i12 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.3"}, 12)
require.NotNil(t, conn3i12)
conn3i13 := balancertesting.FindConn(conns, resolver.Address{HostPort: "1.2.3.3"}, 13)
require.NotNil(t, conn3i13)

// Still multiple addresses, but different counts, to make sure the conn manager
// correctly reconciles.
Expand Down Expand Up @@ -205,7 +219,105 @@ func TestConnManager_ReconcileAddresses(t *testing.T) {

func TestConnManager_RecycleConns(t *testing.T) {
t.Parallel()
// TODO
type updateReq struct {
newAddrs []resolver.Address
removeConns []conn.Conn
}
updates := make(chan updateReq, 1)
pool := balancertesting.NewFakeConnPool()
createConns := func(newAddrs []resolver.Address) []conn.Conn {
conns := make([]conn.Conn, len(newAddrs))
for i := range newAddrs {
var ok bool
conns[i], ok = pool.NewConn(newAddrs[i])
require.True(t, ok)
}
return conns
}
testUpdate := func(newAddrs []resolver.Address, removeConns []conn.Conn) (added []conn.Conn) {
// deterministic reconciliation FTW!
balancertesting.DeterministicReconciler(newAddrs, removeConns)

select {
case updates <- updateReq{newAddrs: newAddrs, removeConns: removeConns}:
default:
require.FailNow(t, "channel should not be full")
}
for _, c := range removeConns {
require.True(t, pool.RemoveConn(c))
}
return createConns(newAddrs)
}
peekLatestUpdate := func() (updateReq, bool) {
select {
case latest := <-updates:
return latest, true
default:
return updateReq{}, false
}
}
getLatestUpdate := func() updateReq {
req, ok := peekLatestUpdate()
require.True(t, ok, "no update available")
return req
}
connMgr := connManager{updateFunc: testUpdate}
addrs := []resolver.Address{
{HostPort: "1.2.3.1"},
{HostPort: "1.2.3.2"},
{HostPort: "1.2.3.3"},
{HostPort: "1.2.3.4"},
}
connMgr.reconcileAddresses(addrs)
latestUpdate := getLatestUpdate()
require.Equal(t, addrs, latestUpdate.newAddrs)
require.Empty(t, latestUpdate.removeConns)
conns := pool.SnapshotConns()
require.Equal(t, len(addrs), len(conns))
conn1 := balancertesting.FindConn(conns, addrs[0], 1)
require.NotNil(t, conn1)
conn2 := balancertesting.FindConn(conns, addrs[1], 2)
require.NotNil(t, conn2)
conn3 := balancertesting.FindConn(conns, addrs[2], 3)
require.NotNil(t, conn3)
conn4 := balancertesting.FindConn(conns, addrs[3], 4)
require.NotNil(t, conn4)

toRecycle := []conn.Conn{conn1}
connMgr.recycleConns(toRecycle)
latestUpdate = getLatestUpdate()
require.Equal(t, []resolver.Address{conn1.Address()}, latestUpdate.newAddrs)
require.Equal(t, toRecycle, latestUpdate.removeConns)
conns = pool.SnapshotConns()
conn1i5 := balancertesting.FindConn(conns, addrs[0], 5)
require.NotNil(t, conn1i5)

// no-op for a connection that has already been removed
connMgr.recycleConns(toRecycle)
latestUpdate, ok := peekLatestUpdate()
require.False(t, ok, "unexpected update was generated: %+v", latestUpdate)

// recycle addr #1 again
toRecycle = []conn.Conn{conn1i5}
connMgr.recycleConns(toRecycle)
latestUpdate = getLatestUpdate()
require.Equal(t, []resolver.Address{conn1.Address()}, latestUpdate.newAddrs)
require.Equal(t, toRecycle, latestUpdate.removeConns)
conns = pool.SnapshotConns()
conn1i6 := balancertesting.FindConn(conns, addrs[0], 6)
require.NotNil(t, conn1i6)

// recycle several together, one that has already been removed
toRecycle = []conn.Conn{conn1i5, conn2, conn4}
connMgr.recycleConns(toRecycle)
latestUpdate = getLatestUpdate()
require.Equal(t, []resolver.Address{conn2.Address(), conn4.Address()}, latestUpdate.newAddrs)
require.Equal(t, []conn.Conn{conn2, conn4}, latestUpdate.removeConns)
conns = pool.SnapshotConns()
conn2i7 := balancertesting.FindConn(conns, addrs[1], 7)
require.NotNil(t, conn2i7)
conn4i8 := balancertesting.FindConn(conns, addrs[3], 8)
require.NotNil(t, conn4i8)
}

func TestBalancer_BasicConnManagement(t *testing.T) {
Expand All @@ -214,6 +326,10 @@ func TestBalancer_BasicConnManagement(t *testing.T) {
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, health.NopChecker, pool, 0)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.start()
t.Cleanup(func() {
err := balancer.Close()
require.NoError(t, err)
})
// Initial resolve
addrs := []resolver.Address{
{HostPort: "1.2.3.1"},
Expand Down Expand Up @@ -268,9 +384,6 @@ func TestBalancer_BasicConnManagement(t *testing.T) {

// Also make sure the set of all conns (not just ones that the picker sees) arrives at the right state
awaitConns(t, pool, addrs, []int{1, 2, 7, 8})

err := balancer.Close()
require.NoError(t, err)
}

func TestBalancer_HealthChecking(t *testing.T) {
Expand All @@ -293,6 +406,12 @@ func TestBalancer_HealthChecking(t *testing.T) {
balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, checker, pool, 0)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.start()
t.Cleanup(func() {
err := balancer.Close()
require.NoError(t, err)
checkers := checker.SnapshotConns()
require.Empty(t, checkers)
})

checker.SetInitialState(health.StateUnknown)
warmChan1 := make(chan struct{})
Expand Down Expand Up @@ -327,9 +446,13 @@ func TestBalancer_HealthChecking(t *testing.T) {

conns := pool.SnapshotConns()
conn1 := balancertesting.FindConn(conns, addrs[0], 1)
require.NotNil(t, conn1)
conn2 := balancertesting.FindConn(conns, addrs[1], 2)
require.NotNil(t, conn2)
conn3 := balancertesting.FindConn(conns, addrs[2], 3)
require.NotNil(t, conn3)
conn4 := balancertesting.FindConn(conns, addrs[3], 4)
require.NotNil(t, conn4)
checker.UpdateHealthState(conn1, health.StateHealthy)
// Now that conn1 is both warm and healthy, we are warmed up. We still include the
// connections in unknown state because the threshold is 3 conns or 25%, whichever is greater.
Expand Down Expand Up @@ -382,11 +505,6 @@ func TestBalancer_HealthChecking(t *testing.T) {
{HostPort: "1.2.3.20"},
}
awaitPickerUpdate(t, pool, true, expectAddrs, []int{1, 2, 7})

err := balancer.Close()
require.NoError(t, err)
checkers := checker.SnapshotConns()
require.Empty(t, checkers)
}

func TestBalancer_Reresolve(t *testing.T) {
Expand All @@ -399,6 +517,12 @@ func TestBalancer_Reresolve(t *testing.T) {
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.clock = clock
balancer.start()
t.Cleanup(func() {
err := balancer.Close()
require.NoError(t, err)
checkers := checker.SnapshotConns()
require.Empty(t, checkers)
})

checker.SetInitialState(health.StateUnknown)

Expand All @@ -413,26 +537,69 @@ func TestBalancer_Reresolve(t *testing.T) {
awaitPickerUpdate(t, pool, false, addrs, []int{1, 2, 3, 4})
conns := pool.SnapshotConns()
conn1 := balancertesting.FindConn(conns, addrs[0], 1)
require.NotNil(t, conn1)
conn2 := balancertesting.FindConn(conns, addrs[1], 2)
require.NotNil(t, conn2)
conn3 := balancertesting.FindConn(conns, addrs[2], 3)
require.NotNil(t, conn3)
conn4 := balancertesting.FindConn(conns, addrs[3], 4)
require.NotNil(t, conn4)
checker.UpdateHealthState(conn1, health.StateUnhealthy)
checker.UpdateHealthState(conn2, health.StateUnhealthy)
awaitResolveNow(t, pool, 1)
checker.UpdateHealthState(conn3, health.StateUnhealthy)
clock.Advance(10 * time.Second)
checker.UpdateHealthState(conn4, health.StateUnhealthy)
awaitResolveNow(t, pool, 2)

err := balancer.Close()
require.NoError(t, err)
checkers := checker.SnapshotConns()
require.Empty(t, checkers)
}

func TestBalancer_RoundTripperMaxLifetime(t *testing.T) {
t.Parallel()
// TODO
clock := clocktest.NewFakeClock()
pool := balancertesting.NewFakeConnPool()

balancer := newBalancer(context.Background(), balancertesting.NewFakePicker, health.NopChecker, pool, time.Second)
balancer.updateHook = balancertesting.DeterministicReconciler
balancer.clock = clock
balancer.start()
t.Cleanup(func() {
err := balancer.Close()
require.NoError(t, err)
})

// Initial resolve
addrs := []resolver.Address{
{HostPort: "1.2.3.1"},
{HostPort: "1.2.3.2"},
{HostPort: "1.2.3.3"},
{HostPort: "1.2.3.4"},
}
balancer.OnResolve(addrs)
awaitPickerUpdate(t, pool, false, addrs, []int{1, 2, 3, 4})
conns := pool.SnapshotConns()
conn1 := balancertesting.FindConn(conns, addrs[0], 1)
require.NotNil(t, conn1)
conn2 := balancertesting.FindConn(conns, addrs[1], 2)
require.NotNil(t, conn2)
conn3 := balancertesting.FindConn(conns, addrs[2], 3)
require.NotNil(t, conn3)
conn4 := balancertesting.FindConn(conns, addrs[3], 4)
require.NotNil(t, conn4)

// After one second, lifetime limit is reached and all of these get recycled.
clock.Advance(time.Second)
// We can await the indices to be updated. But we won't know the precise
// address associated with each index because they are updated from
// independent timer goroutines.
conns = awaitConnIndices(t, pool, []int{5, 6, 7, 8})
gotAddrs := make([]resolver.Address, 0, len(conns))
for cn := range conns {
gotAddrs = append(gotAddrs, cn.Address())
}
sort.Slice(gotAddrs, func(i, j int) bool {
return gotAddrs[i].HostPort < gotAddrs[j].HostPort
})
require.Equal(t, addrs, gotAddrs)
}

func awaitPickerUpdate(t *testing.T, pool *balancertesting.FakeConnPool, warm bool, addrs []resolver.Address, indexes []int) {
Expand Down Expand Up @@ -474,6 +641,28 @@ func awaitConns(t *testing.T, pool *balancertesting.FakeConnPool, addrs []resolv
}
}

func awaitConnIndices(t *testing.T, pool *balancertesting.FakeConnPool, indexes []int) conns.Set {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var gotIndexes []int
for {
snapshot, err := pool.AwaitConnUpdate(ctx)
if err != nil {
require.NotNil(t, gotIndexes, "didn't get connection update after 1 second")
require.FailNow(t, "didn't get expected active connections after 1 second", "want %+v\ngot %+v", indexes, gotIndexes)
}
gotIndexes = make([]int, 0, len(snapshot))
for cn := range snapshot {
gotIndexes = append(gotIndexes, cn.(*balancertesting.FakeConn).Index)
}
sort.Ints(gotIndexes)
if reflect.DeepEqual(indexes, gotIndexes) {
return snapshot
}
}
}

func awaitCheckerUpdate(t *testing.T, checker *balancertesting.FakeHealthChecker, addrs []resolver.Address, indexes []int) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down

0 comments on commit 47bf2a3

Please sign in to comment.