diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index a90b10e5..b93a048e 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -54,17 +54,20 @@ type Coordinator interface { NodeAvailabilityListener - GetServers() []model.ServerAddress + FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) + ClusterStatus() model.ClusterStatus } type coordinator struct { - sync.RWMutex + sync.Mutex assignmentsChanged common.ConditionContext MetadataProvider clusterConfigProvider func() (model.ClusterConfig, error) model.ClusterConfig + nodeIndexes sync.Map + clusterConfigChangeCh chan any shardControllers map[int64]ShardController @@ -101,6 +104,7 @@ func NewCoordinator(metadataProvider MetadataProvider, shardControllers: make(map[int64]ShardController), nodeControllers: make(map[string]NodeController), drainingNodes: make(map[string]NodeController), + nodeIndexes: sync.Map{}, rpc: rpc, log: slog.With( slog.String("component", "coordinator"), @@ -118,6 +122,7 @@ func NewCoordinator(metadataProvider MetadataProvider, for _, sa := range c.ClusterConfig.Servers { c.nodeControllers[sa.Internal] = NewNodeController(sa, c, c, c.rpc) + c.nodeIndexes.Store(sa.GetNodeId(), sa) } if c.clusterStatus == nil { @@ -471,6 +476,10 @@ func (c *coordinator) handleClusterConfigUpdated() error { } } + for _, sc := range c.shardControllers { + sc.SyncServerInfo() + } + c.ClusterConfig = newClusterConfig c.clusterStatus = clusterStatus @@ -513,10 +522,12 @@ func (c *coordinator) rebalanceCluster() error { return nil } -func (c *coordinator) GetServers() []model.ServerAddress { - c.RLock() - defer c.RUnlock() - return c.ClusterConfig.Servers +func (c *coordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { + if info, exist := c.nodeIndexes.Load(id); exist { + address := info.(model.ServerAddress) + return &address, true + } + return nil, false } func (c *coordinator) FindServerByInternalAddress(internalAddress string) *model.ServerAddress { @@ -536,6 +547,8 @@ func (*coordinator) findServerByInternalAddress(newClusterConfig model.ClusterCo func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) { // Check for nodes to add for _, sa := range newClusterConfig.Servers { + c.nodeIndexes.Store(sa.GetNodeId(), sa) + if _, ok := c.nodeControllers[sa.Internal]; ok { continue } @@ -559,6 +572,7 @@ func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConf } c.log.Info("Detected a removed node", slog.Any("addr", ia)) + c.nodeIndexes.Delete(model.NodeId(ia)) // Moved the node delete(c.nodeControllers, ia) nc.SetStatus(Draining) diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index 28783ded..6043b8b9 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -816,12 +816,6 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) { clusterConfig.Servers = clusterServer configChangesCh <- nil - // new term - coordinatorInstance := c.(*coordinator) - controller := coordinatorInstance.shardControllers[0] - controllerInstance := controller.(*shardController) - controllerInstance.electionOp <- nil - assert.Eventually(t, func() bool { for _, ns := range c.ClusterStatus().Namespaces { for _, shard := range ns.Shards { diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 5d9a48d3..8e2dee12 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -64,6 +64,8 @@ type ShardController interface { HandleNodeFailure(failedNode model.ServerAddress) + SyncServerInfo() + SwapNode(from model.ServerAddress, to model.ServerAddress) error DeleteShard() @@ -77,7 +79,7 @@ type shardController struct { shard int64 namespaceConfig *model.NamespaceConfig shardMetadata model.ShardMetadata - shardMetadataMutex sync.Mutex + shardMetadataMutex sync.RWMutex rpc RpcProvider coordinator Coordinator @@ -139,6 +141,8 @@ func NewShardController(namespace string, shard int64, namespaceConfig *model.Na s.ctx, s.cancel = context.WithCancel(context.Background()) + s.SyncServerInfo() + s.log.Info( "Started shard controller", slog.Any("shard-metadata", s.shardMetadata), @@ -196,7 +200,7 @@ func (s *shardController) run() { case a := <-s.newTermAndAddFollowerOp: s.internalNewTermAndAddFollower(a.ctx, a.node, a.res) - case <-s.electionOp: // for testing + case <-s.electionOp: s.electLeaderWithRetries() } } @@ -214,7 +218,7 @@ func (s *shardController) handleNodeFailure(failedNode model.ServerAddress) { ) if s.shardMetadata.Leader != nil && - *s.shardMetadata.Leader == failedNode { + s.shardMetadata.Leader.Internal == failedNode.Internal { s.log.Info( "Detected failure on shard leader", slog.Any("leader", failedNode), @@ -377,19 +381,11 @@ func (s *shardController) electLeader() error { } func (s *shardController) getRefreshedEnsemble() []model.ServerAddress { - serversInfos := s.coordinator.GetServers() - // build a logic index here. - // todo: might introduce global index in the coordinator in the future - index := map[string]model.ServerAddress{} - for _, server := range serversInfos { - index[server.Internal] = server - } - currentEnsemble := s.shardMetadata.Ensemble refreshedEnsembleServiceInfo := make([]model.ServerAddress, len(currentEnsemble)) for idx, candidate := range currentEnsemble { - if refreshedInfo, exist := index[candidate.Internal]; exist { - refreshedEnsembleServiceInfo[idx] = refreshedInfo + if refreshedInfo, exist := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); exist { + refreshedEnsembleServiceInfo[idx] = *refreshedInfo continue } refreshedEnsembleServiceInfo[idx] = candidate @@ -894,6 +890,26 @@ func (s *shardController) waitForFollowersToCatchUp(ctx context.Context, leader return nil } +func (s *shardController) SyncServerInfo() { + s.shardMetadataMutex.RLock() + exist := false + for _, candidate := range s.shardMetadata.Ensemble { + if newInfo, ok := s.coordinator.FindNodeInfoById(candidate.GetNodeId()); ok { + if *newInfo != candidate { + exist = true + break + } + } + } + if !exist { + s.shardMetadataMutex.RUnlock() + return + } + s.shardMetadataMutex.RUnlock() + s.log.Info("node info changed, start a new leader election") + s.electionOp <- nil +} + func listContains(list []model.ServerAddress, sa model.ServerAddress) bool { for _, item := range list { if item.Public == sa.Public && item.Internal == sa.Internal { diff --git a/coordinator/impl/shard_controller_test.go b/coordinator/impl/shard_controller_test.go index 2075d5fe..1906b845 100644 --- a/coordinator/impl/shard_controller_test.go +++ b/coordinator/impl/shard_controller_test.go @@ -365,8 +365,8 @@ func (m *mockCoordinator) WaitForNextUpdate(ctx context.Context, currentValue *p panic("not implemented") } -func (m *mockCoordinator) GetServers() []model.ServerAddress { - return []model.ServerAddress{} +func (m *mockCoordinator) FindNodeInfoById(id model.NodeId) (*model.ServerAddress, bool) { + return nil, false } func (m *mockCoordinator) InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error { diff --git a/coordinator/model/cluster_status.go b/coordinator/model/cluster_status.go index defd45f8..60177887 100644 --- a/coordinator/model/cluster_status.go +++ b/coordinator/model/cluster_status.go @@ -14,6 +14,8 @@ package model +type NodeId string + type ServerAddress struct { // Public is the endpoint that is advertised to clients Public string `json:"public" yaml:"public"` @@ -22,6 +24,12 @@ type ServerAddress struct { Internal string `json:"internal" yaml:"internal"` } +func (s *ServerAddress) GetNodeId() NodeId { + // use the internal address as the node id by default. + // todo: introduce node id in the future + return NodeId(s.Internal) +} + type Int32HashRange struct { // The minimum inclusive hash that the shard can contain Min uint32 `json:"min"`