Skip to content

Commit

Permalink
Fix flaky tests: TestConfig and TestEmbeddedConfig (#732)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul authored May 29, 2018
1 parent 08d46d3 commit 1abb3f5
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 55 deletions.
15 changes: 11 additions & 4 deletions src/cmd/services/m3dbnode/main/main_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ import (

// TestIndexEnabledServer tests booting a server using file based configuration.
func TestIndexEnabledServer(t *testing.T) {
// Temporarily skip while we debug flakiness
t.SkipNow()

if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}
Expand Down Expand Up @@ -187,8 +184,18 @@ func TestIndexEnabledServer(t *testing.T) {
cli, err := cfg.DB.Client.NewClient(client.ConfigurationParameters{})
require.NoError(t, err)

session, err := cli.DefaultSession()
adminCli := cli.(client.AdminClient)
adminSession, err := adminCli.DefaultAdminSession()
require.NoError(t, err)
defer adminSession.Close()

// Propagation of shard state from Initializing --> Available post-bootstrap is eventually
// consistent, so we must wait.
waitUntilAllShardsAreAvailable(t, adminSession)

// Cast to narrower-interface instead of grabbing DefaultSession to make sure
// we use the same topology.Map that we validated in waitUntilAllShardsAreAvailable.
session := adminSession.(client.Session)

defer session.Close()

Expand Down
63 changes: 53 additions & 10 deletions src/cmd/services/m3dbnode/main/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/m3db/m3cluster/integration/etcd"
"github.com/m3db/m3cluster/placement"
"github.com/m3db/m3cluster/services"
"github.com/m3db/m3cluster/shard"
"github.com/m3db/m3db/src/cmd/services/m3dbnode/config"
"github.com/m3db/m3db/src/cmd/services/m3dbnode/server"
"github.com/m3db/m3db/src/dbnode/client"
Expand All @@ -49,9 +50,6 @@ import (

// TestConfig tests booting a server using file based configuration.
func TestConfig(t *testing.T) {
// Temporarily skip while we debug flakiness
t.SkipNow()

// Embedded kv
embeddedKV, err := etcd.New(etcd.NewOptions())
require.NoError(t, err)
Expand Down Expand Up @@ -181,10 +179,18 @@ func TestConfig(t *testing.T) {
cli, err := cfg.DB.Client.NewClient(client.ConfigurationParameters{})
require.NoError(t, err)

session, err := cli.DefaultSession()
adminCli := cli.(client.AdminClient)
adminSession, err := adminCli.DefaultAdminSession()
require.NoError(t, err)
defer adminSession.Close()

// Propagation of shard state from Initializing --> Available post-bootstrap is eventually
// consistent, so we must wait.
waitUntilAllShardsAreAvailable(t, adminSession)

defer session.Close()
// Cast to narrower-interface instead of grabbing DefaultSession to make sure
// we use the same topology.Map that we validated in waitUntilAllShardsAreAvailable.
session := adminSession.(client.Session)

start := time.Now().Add(-time.Minute)
values := []struct {
Expand Down Expand Up @@ -229,9 +235,6 @@ func TestConfig(t *testing.T) {

// TestEmbeddedConfig tests booting a server using an embedded KV.
func TestEmbeddedConfig(t *testing.T) {
// Temporarily skip while we debug flakiness
t.SkipNow()

// Create config file
tmpl, err := template.New("config").Parse(testConfig + embeddedKVConfigPortion)
require.NoError(t, err)
Expand Down Expand Up @@ -375,10 +378,18 @@ func TestEmbeddedConfig(t *testing.T) {
cli, err := cfg.DB.Client.NewClient(client.ConfigurationParameters{})
require.NoError(t, err)

session, err := cli.DefaultSession()
adminCli := cli.(client.AdminClient)
adminSession, err := adminCli.DefaultAdminSession()
require.NoError(t, err)
defer adminSession.Close()

// Propagation of shard state from Initializing --> Available post-bootstrap is eventually
// consistent, so we must wait.
waitUntilAllShardsAreAvailable(t, adminSession)

defer session.Close()
// Cast to narrower-interface instead of grabbing DefaultSession to make sure
// we use the same topology.Map that we validated in waitUntilAllShardsAreAvailable.
session := adminSession.(client.Session)

start := time.Now().Add(-time.Minute)
values := []struct {
Expand Down Expand Up @@ -622,3 +633,35 @@ db:
endpoint: {{.InitialClusterEndpoint}}
`
)

// waitUntilAllShardsAreAvailable continually polls the session checking to see if the topology.Map
// that the session is currently storing contains a non-zero number of host shard sets, and if so,
// makes sure that all their shard states are Available.
func waitUntilAllShardsAreAvailable(t *testing.T, session client.AdminSession) {
outer:
for {
time.Sleep(10 * time.Millisecond)

topoMap, err := session.TopologyMap()
require.NoError(t, err)

var (
hostShardSets = topoMap.HostShardSets()
)

if len(hostShardSets) == 0 {
// We haven't received an actual topology yet.
continue
}

for _, hostShardSet := range hostShardSets {
for _, hostShard := range hostShardSet.ShardSet().All() {
if hostShard.State() != shard.Available {
continue outer
}
}
}

break
}
}
28 changes: 14 additions & 14 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ var (
// errUnableToEncodeTags is raised when the server is unable to encode provided tags
// to be sent over the wire.
errUnableToEncodeTags = errors.New("unable to include tags")
// errNoTopology is returned when the session does not have a topology. Should never happen
// errNoTopologyMap is returned when the session does not have a topology. Should never happen
// in practice.
errNoTopology = fmt.Errorf("%s session does not have a topology", instrument.InvariantViolatedMetricName)
errNoTopologyMap = fmt.Errorf("%s session does not have a topology map", instrument.InvariantViolatedMetricName)
)

// sessionState is volatile state that is protected by a
Expand Down Expand Up @@ -1571,22 +1571,22 @@ func (s *session) Replicas() int {
return v
}

func (s *session) Topology() (topology.Topology, error) {
func (s *session) TopologyMap() (topology.Map, error) {
s.state.RLock()
status := s.state.status
topology := s.state.topo
topoMap := s.state.topoMap
s.state.RUnlock()

// Make sure the session is open, as thats what sets the initial topology.
if status != statusOpen {
return nil, errSessionStatusNotOpen
}
if topology == nil {
if topoMap == nil {
// Should never happen.
return nil, errNoTopology
return nil, errNoTopologyMap
}

return topology, nil
return topoMap, nil
}

func (s *session) Truncate(namespace ident.ID) (int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ type AdminSession interface {
// Replicas returns the replication factor
Replicas() int

// Topology returns the current topology
Topology() (topology.Topology, error)
// TopologyMap returns the current topology map
TopologyMap() (topology.Map, error)

// Truncate will truncate the namespace for a given shard
Truncate(namespace ident.ID) (int64, error)
Expand Down
3 changes: 1 addition & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,13 +799,12 @@ func initialTopologyState(opts Options) (topologyState, error) {
return topologyState{}, err
}

topology, err := session.Topology()
topoMap, err := session.TopologyMap()
if err != nil {
return topologyState{}, err
}

var (
topoMap = topology.Get()
hostShardSets = topoMap.HostShardSets()
topologyState = topologyState{
majorityReplicas: topoMap.MajorityReplicas(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,10 @@ func newValidMockClient(t *testing.T, ctrl *gomock.Controller) *client.MockAdmin
mockMap.EXPECT().HostShardSets().Return(hostShardSets).AnyTimes()
mockMap.EXPECT().MajorityReplicas().Return(3).AnyTimes()

mockTopology := topology.NewMockTopology(ctrl)
mockTopology.EXPECT().
Get().
Return(mockMap)

mockAdminSession := client.NewMockAdminSession(ctrl)
mockAdminSession.EXPECT().
Topology().
Return(mockTopology, nil)
TopologyMap().
Return(mockMap, nil)

mockClient := client.NewMockAdminClient(ctrl)
mockClient.EXPECT().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,10 @@ func TestBootstrapIndexErr(t *testing.T) {
require.NoError(t, err)
topoOpts := topology.NewStaticOptions().
SetShardSet(shardSet)
topo := topology.NewStaticTopology(topoOpts)
topoMap := topology.NewStaticTopology(topoOpts).Get()

mockAdminSession := client.NewMockAdminSession(ctrl)
mockAdminSession.EXPECT().Topology().Return(topo, nil)
mockAdminSession.EXPECT().TopologyMap().Return(topoMap, nil)
mockAdminSessionCalls := []*gomock.Call{}

for blockStart := start; blockStart.Before(end); blockStart = blockStart.Add(blockSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,10 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) {
mockMap.EXPECT().HostShardSets().Return(hostShardSets).AnyTimes()
mockMap.EXPECT().MajorityReplicas().Return(replicaMajority).AnyTimes()

mockTopology := topology.NewMockTopology(ctrl)
mockTopology.EXPECT().
Get().
Return(mockMap)

mockAdminSession := client.NewMockAdminSession(ctrl)
mockAdminSession.EXPECT().
Topology().
Return(mockTopology, nil)
TopologyMap().
Return(mockMap, nil)

mockClient := client.NewMockAdminClient(ctrl)
mockClient.EXPECT().
Expand Down

0 comments on commit 1abb3f5

Please sign in to comment.