Skip to content

Commit

Permalink
support auto refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Nov 7, 2024
1 parent a53d726 commit 742aeec
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 27 deletions.
26 changes: 20 additions & 6 deletions coordinator/impl/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,20 @@ type Coordinator interface {

NodeAvailabilityListener

GetServers() []model.ServerAddress
FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool)

ClusterStatus() model.ClusterStatus
}

type coordinator struct {
sync.RWMutex
sync.Mutex
assignmentsChanged common.ConditionContext

MetadataProvider
clusterConfigProvider func() (model.ClusterConfig, error)
model.ClusterConfig
nodeIndexes sync.Map

clusterConfigChangeCh chan any

shardControllers map[int64]ShardController
Expand Down Expand Up @@ -101,6 +104,7 @@ func NewCoordinator(metadataProvider MetadataProvider,
shardControllers: make(map[int64]ShardController),
nodeControllers: make(map[string]NodeController),
drainingNodes: make(map[string]NodeController),
nodeIndexes: sync.Map{},
rpc: rpc,
log: slog.With(
slog.String("component", "coordinator"),
Expand All @@ -118,6 +122,7 @@ func NewCoordinator(metadataProvider MetadataProvider,

for _, sa := range c.ClusterConfig.Servers {
c.nodeControllers[sa.Internal] = NewNodeController(sa, c, c, c.rpc)
c.nodeIndexes.Store(sa.GetNodeId(), sa)
}

if c.clusterStatus == nil {
Expand Down Expand Up @@ -471,6 +476,10 @@ func (c *coordinator) handleClusterConfigUpdated() error {
}
}

for _, sc := range c.shardControllers {
sc.SyncServerInfo()
}

c.ClusterConfig = newClusterConfig
c.clusterStatus = clusterStatus

Expand Down Expand Up @@ -513,10 +522,12 @@ func (c *coordinator) rebalanceCluster() error {
return nil
}

func (c *coordinator) GetServers() []model.ServerAddress {
c.RLock()
defer c.RUnlock()
return c.ClusterConfig.Servers
func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) {
if info, exist := c.nodeIndexes.Load(id); exist {
address := info.(model.ServerAddress)

Check failure on line 527 in coordinator/impl/coordinator.go

View workflow job for this annotation

GitHub Actions / Build & Test

unchecked-type-assertion: type cast result is unchecked in info.(model.ServerAddress) - type assertion will panic if not matched (revive)
return &address, true
}
return nil, false
}

func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress {
Expand All @@ -536,6 +547,8 @@ func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterCo
func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) {
// Check for nodes to add
for _, sa := range newClusterConfig.Servers {
c.nodeIndexes.Store(sa.GetNodeId(), sa)

if _, ok := c.nodeControllers[sa.Internal]; ok {
continue
}
Expand All @@ -559,6 +572,7 @@ func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConf
}

c.log.Info("Detected a removed node", slog.Any("addr", ia))
c.nodeIndexes.Delete(model.NodeId(ia))
// Moved the node
delete(c.nodeControllers, ia)
nc.SetStatus(Draining)
Expand Down
6 changes: 0 additions & 6 deletions coordinator/impl/coordinator_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,6 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) {
clusterConfig.Servers = clusterServer
configChangesCh <- nil

// new term
coordinatorInstance := c.(*coordinator)
controller := coordinatorInstance.shardControllers[0]
controllerInstance := controller.(*shardController)
controllerInstance.electionOp <- nil

assert.Eventually(t, func() bool {
for _, ns := range c.ClusterStatus().Namespaces {
for _, shard := range ns.Shards {
Expand Down
42 changes: 29 additions & 13 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type ShardController interface {

HandleNodeFailure(failedNode model.ServerAddress)

SyncServerInfo()

SwapNode(from model.ServerAddress, to model.ServerAddress) error
DeleteShard()

Expand All @@ -77,7 +79,7 @@ type shardController struct {
shard int64
namespaceConfig *model.NamespaceConfig
shardMetadata model.ShardMetadata
shardMetadataMutex sync.Mutex
shardMetadataMutex sync.RWMutex
rpc RpcProvider
coordinator Coordinator

Expand Down Expand Up @@ -139,6 +141,8 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na

s.ctx, s.cancel = context.WithCancel(context.Background())

s.SyncServerInfo()

s.log.Info(
"Started shard controller",
slog.Any("shard-metadata", s.shardMetadata),
Expand Down Expand Up @@ -196,7 +200,7 @@ func (s *shardController) run() {
case a := <-s.newTermAndAddFollowerOp:
s.internalNewTermAndAddFollower(a.ctx, a.node, a.res)

case <-s.electionOp: // for testing
case <-s.electionOp:
s.electLeaderWithRetries()
}
}
Expand All @@ -214,7 +218,7 @@ func (s *shardController) handleNodeFailure(failedNode model.ServerAddress) {
)

if s.shardMetadata.Leader != nil &&
*s.shardMetadata.Leader == failedNode {
s.shardMetadata.Leader.Internal == failedNode.Internal {
s.log.Info(
"Detected failure on shard leader",
slog.Any("leader", failedNode),
Expand Down Expand Up @@ -377,19 +381,11 @@ func (s *shardController) electLeader() error {
}

func (s *shardController) getRefreshedEnsemble() []model.ServerAddress {
serversInfos := s.coordinator.GetServers()
// build a logic index here.
// todo: might introduce global index in the coordinator in the future
index := map[string]model.ServerAddress{}
for _, server := range serversInfos {
index[server.Internal] = server
}

currentEnsemble := s.shardMetadata.Ensemble
refreshedEnsembleServiceInfo := make([]model.ServerAddress, len(currentEnsemble))
for idx, candidate := range currentEnsemble {
if refreshedInfo, exist := index[candidate.Internal]; exist {
refreshedEnsembleServiceInfo[idx] = refreshedInfo
if refreshedInfo, exist := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); exist {
refreshedEnsembleServiceInfo[idx] = *refreshedInfo
continue
}
refreshedEnsembleServiceInfo[idx] = candidate
Expand Down Expand Up @@ -894,6 +890,26 @@ func (s *shardController) waitForFollowersToCatchUp(ctx context.Context, leader
return nil
}

func (s *shardController) SyncServerInfo() {
s.shardMetadataMutex.RLock()
exist := false
for _, candidate := range s.shardMetadata.Ensemble {
if newInfo, ok := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); ok {
if *newInfo != candidate {
exist = true
break
}
}
}
if !exist {
s.shardMetadataMutex.RUnlock()
return
}
s.shardMetadataMutex.RUnlock()
s.log.Info("node info changed, start a new leader election")
s.electionOp <- nil
}

func listContains(list []model.ServerAddress, sa model.ServerAddress) bool {
for _, item := range list {
if item.Public == sa.Public && item.Internal == sa.Internal {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/impl/shard_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p
panic("not implemented")
}

func (m *mockCoordinator) GetServers() []model.ServerAddress {
return []model.ServerAddress{}
func (m *mockCoordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) {
return nil, false
}

func (m *mockCoordinator) InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error {
Expand Down
8 changes: 8 additions & 0 deletions coordinator/model/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package model

type NodeId string

type ServerAddress struct {
// Public is the endpoint that is advertised to clients
Public string `json:"public" yaml:"public"`
Expand All @@ -22,6 +24,12 @@ type ServerAddress struct {
Internal string `json:"internal" yaml:"internal"`
}

func (s *ServerAddress) GetNodeId() NodeId {
// use the internal address as the node id by default.
// todo: introduce node id in the future
return NodeId(s.Internal)
}

type Int32HashRange struct {
// The minimum inclusive hash that the shard can contain
Min uint32 `json:"min"`
Expand Down

0 comments on commit 742aeec

Please sign in to comment.