Skip to content

Commit

Permalink
[aggregator] Followers CanLead before the first flush is persisted (#…
Browse files Browse the repository at this point in the history
…4169)

* Have follower report CanLead if there are no flush times
* Refactor out common integration test multiServerSetup
  • Loading branch information
justinjc authored Dec 3, 2022
1 parent 98aec27 commit 297c37e
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 282 deletions.
14 changes: 14 additions & 0 deletions src/aggregator/aggregator/follower_flush_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ func (mgr *followerFlushManager) CanLead() bool {
mgr.metrics.notCampaigning.Inc(1)
return false
}

flushTimes, err := mgr.flushTimesManager.Get()
if err != nil {
// Error getting flush times, proceed to other logic to determine CanLead.
mgr.logger.Warn("Error getting flush times from follower flush manager",
zap.Error(err),
)
} else if flushTimes == nil || len(flushTimes.ByShard) == 0 {
// If there are no flush times, the cluster is just coming up (the leader has not
// gotten a chance to flush), so the follower can also lead at this point.
// See https://github.com/m3db/m3/issues/4168 for longer discussion.
return true
}

if mgr.processed == nil {
return false
}
Expand Down
69 changes: 51 additions & 18 deletions src/aggregator/aggregator/follower_flush_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,18 @@ func TestFollowerFlushManagerCanNotLeadNotCampaigning(t *testing.T) {
require.False(t, mgr.CanLead())
}

func TestFollowerFlushManagerCanNotLeadProtoNotUpdated(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
require.False(t, mgr.CanLead())
}

func TestFollowerFlushManagerCanNotLeadStandardFlushWindowsNotEnded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(testFlushTimes, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.processed = testFlushTimes
mgr.openedAt = time.Unix(3624, 0)
Expand All @@ -108,7 +100,11 @@ func TestFollowerFlushManagerCanNotLeadTimedFlushWindowsNotEnded(t *testing.T) {

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(testFlushTimes, nil).AnyTimes()
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.processed = testFlushTimes2
electionManager.EXPECT().IsCampaigning().Return(true)
Expand All @@ -127,7 +123,11 @@ func TestFollowerFlushManagerCanNotLeadForwardedFlushWindowsNotEnded(t *testing.
doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(testFlushTimes, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.processed = testFlushTimes
mgr.openedAt = time.Unix(3640, 0)
Expand Down Expand Up @@ -263,9 +263,12 @@ func testCanLeadNotFlushed(
clockOpts := clock.NewOptions().SetNowFn(func() time.Time {
return now
})
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(flushTimes, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetClockOptions(clockOpts)
SetClockOptions(clockOpts).
SetFlushTimesManager(flushTimesMgr)

if bufferPast > 0 {
opts = opts.SetBufferForPastTimedMetric(bufferPast)
Expand All @@ -278,14 +281,40 @@ func testCanLeadNotFlushed(
require.Equal(t, expectedCanLead, mgr.CanLead())
}

func TestFollowerFlushManagerCanLeadNoFlushTimes(t *testing.T) {
// Note: most tests here already test whether the follower CanLead when flush
// times are present, e.g. TestFollowerFlushManagerCanLeadNotFlushed.
ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true).AnyTimes()
flushTimesMgr := NewMockFlushTimesManager(ctrl)
// Return no flush times - this is true when the cluster is just brought up
// and the leader hasn't flushed yet.
flushTimesMgr.EXPECT().Get().Return(nil, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)

mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)

require.True(t, mgr.CanLead())
}

func TestFollowerFlushManagerCanLeadNoTombstonedShards(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(testFlushTimes, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.flushTimesState = flushTimesProcessed
mgr.processed = testFlushTimes
Expand All @@ -300,7 +329,11 @@ func TestFollowerFlushManagerCanLeadWithTombstonedShards(t *testing.T) {
doneCh := make(chan struct{})
electionManager := NewMockElectionManager(ctrl)
electionManager.EXPECT().IsCampaigning().Return(true)
opts := NewFlushManagerOptions().SetElectionManager(electionManager)
flushTimesMgr := NewMockFlushTimesManager(ctrl)
flushTimesMgr.EXPECT().Get().Return(testFlushTimes2, nil)
opts := NewFlushManagerOptions().
SetElectionManager(electionManager).
SetFlushTimesManager(flushTimesMgr)
mgr := newFollowerFlushManager(doneCh, opts).(*followerFlushManager)
mgr.flushTimesState = flushTimesProcessed
mgr.processed = testFlushTimes2
Expand Down
4 changes: 4 additions & 0 deletions src/aggregator/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
aggclient "github.com/m3db/m3/src/aggregator/client"
)

const (
pipelineRollupID = "pipelineRollup"
)

type conditionFn func() bool

func waitUntil(fn conditionFn, timeout time.Duration) bool {
Expand Down
101 changes: 101 additions & 0 deletions src/aggregator/integration/multi_server_follower_health_init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//go:build integration
// +build integration

// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"sync"
"sync/atomic"
"testing"

httpserver "github.com/m3db/m3/src/aggregator/server/http"
xtest "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMultiServerFollowerHealthInit(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

testParams := newTestServerSetups(t)
servers := testParams.servers

// Start the servers.
log := xtest.NewLogger(t)
log.Info("test forwarding pipeline")
for i, server := range servers {
require.NoError(t, server.startServer())
log.Sugar().Infof("server %d is now up", i)
}

// Stop the servers.
for i, server := range servers {
i := i
server := server
defer func() {
require.NoError(t, server.stopServer())
log.Sugar().Infof("server %d is now down", i)
}()
}

// Waiting for two leaders to come up.
var (
leaders = make(map[int]struct{})
leaderCh = make(chan int, len(servers)/2)
numLeaders int32
wg sync.WaitGroup
)
wg.Add(len(servers) / 2)
for i, server := range servers {
i, server := i, server
go func() {
if err := server.waitUntilLeader(); err == nil {
res := int(atomic.AddInt32(&numLeaders, 1))
if res <= len(servers)/2 {
leaderCh <- i
wg.Done()
}
}
}()
}
wg.Wait()
close(leaderCh)

for i := range leaderCh {
leaders[i] = struct{}{}
log.Sugar().Infof("server %d has become the leader", i)
}
log.Sugar().Infof("%d servers have become leaders", len(leaders))

for _, server := range servers {
var resp httpserver.StatusResponse
require.NoError(t, server.getStatusResponse(httpserver.StatusPath, &resp))

// No data has been written to the aggregators yet, but all servers (including the
// followers) should be able to lead.
assert.True(t, resp.Status.FlushStatus.CanLead)
}
}
Loading

0 comments on commit 297c37e

Please sign in to comment.