Skip to content

Commit

Permalink
[Improvement-16982][Master] When master startup, initialize the clust…
Browse files Browse the repository at this point in the history
…er from registry
  • Loading branch information
ruanwenjun committed Jan 23, 2025
1 parent 352b47b commit f823b6a
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.dolphinscheduler.server.master.cluster;

import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

Expand All @@ -36,6 +39,9 @@ public class ClusterManager {
@Getter
private WorkerClusters workerClusters;

@Autowired
private MasterSlotManager masterSlotManager;

@Autowired
private WorkerGroupChangeNotifier workerGroupChangeNotifier;

Expand All @@ -48,11 +54,48 @@ public ClusterManager() {
}

public void start() {
initializeMasterClusters();
initializeWorkerClusters();
log.info("ClusterManager started...");
}

/**
* Initialize the master clusters.
* <p> 1. Register master slot listener once master clusters changed.
* <p> 2. Fetch master nodes from registry.
* <p> 3. Subscribe the master change event.
*/
private void initializeMasterClusters() {
this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));

registryClient.getServerList(RegistryNodeType.MASTER).forEach(server -> {
final MasterHeartBeat masterHeartBeat =
JSONUtils.parseObject(server.getHeartBeatInfo(), MasterHeartBeat.class);
masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat));
});
log.info("Initialized MasterClusters: {}", JSONUtils.toPrettyJsonString(masterClusters.getServers()));

this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters);
}

/**
* Initialize the worker clusters.
* <p> 1. Fetch worker nodes from registry.
* <p> 2. Register worker group change notifier once worker clusters changed.
* <p> 3. Subscribe the worker change event.
*/
private void initializeWorkerClusters() {
registryClient.getServerList(RegistryNodeType.WORKER).forEach(server -> {
final WorkerHeartBeat workerHeartBeat =
JSONUtils.parseObject(server.getHeartBeatInfo(), WorkerHeartBeat.class);
workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat));
});
log.info("Initialized WorkerClusters: {}", JSONUtils.toPrettyJsonString(workerClusters.getServers()));

this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters);

this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
this.workerGroupChangeNotifier.start();
log.info("ClusterManager started...");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.cluster;

import java.util.List;

public interface IMasterSlotChangeListener {

void onMasterSlotChanged(final List<MasterServerMetadata> normalMasterServers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.server.master.cluster;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;

Expand All @@ -32,6 +34,7 @@
public class MasterServerMetadata extends BaseServerMetadata implements Comparable<MasterServerMetadata> {

public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
checkNotNull(masterHeartBeat);
return MasterServerMetadata.builder()
.processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.cluster;

import java.util.List;

public class MasterSlotChangeListenerAdaptor
implements
IMasterSlotChangeListener,
IClusters.IClustersChangeListener<MasterServerMetadata> {

private final MasterSlotManager masterSlotManager;

private final MasterClusters masterClusters;

public MasterSlotChangeListenerAdaptor(final MasterSlotManager masterSlotManager,
final MasterClusters masterClusters) {
this.masterSlotManager = masterSlotManager;
this.masterClusters = masterClusters;
}

@Override
public void onMasterSlotChanged(final List<MasterServerMetadata> normalMasterServers) {
masterSlotManager.doReBalance(normalMasterServers);
}

@Override
public void onServerAdded(MasterServerMetadata server) {
onMasterSlotChanged(masterClusters.getNormalServers());
}

@Override
public void onServerRemove(MasterServerMetadata server) {
onMasterSlotChanged(masterClusters.getNormalServers());
}

@Override
public void onServerUpdate(MasterServerMetadata server) {
onMasterSlotChanged(masterClusters.getNormalServers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,14 @@
@Component
public class MasterSlotManager implements IMasterSlotReBalancer {

private final MasterClusters masterClusters;

private final MasterConfig masterConfig;

private volatile int currentSlot = -1;

private volatile int totalSlots = 0;

public MasterSlotManager(ClusterManager clusterManager, MasterConfig masterConfig) {
public MasterSlotManager(final MasterConfig masterConfig) {
this.masterConfig = masterConfig;
this.masterClusters = clusterManager.getMasterClusters();
this.masterClusters.registerListener(new IClusters.IClustersChangeListener<MasterServerMetadata>() {

@Override
public void onServerAdded(MasterServerMetadata server) {
doReBalance(masterClusters.getNormalServers());
}

@Override
public void onServerRemove(MasterServerMetadata server) {
doReBalance(masterClusters.getNormalServers());
}

@Override
public void onServerUpdate(MasterServerMetadata server) {
doReBalance(masterClusters.getNormalServers());
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ class MasterSlotManagerTest {

private MasterSlotManager masterSlotManager;

private ClusterManager clusterManager;
private MasterClusters masterClusters;

private MasterConfig masterConfig;

@BeforeEach
public void setUp() {
clusterManager = new ClusterManager();
masterClusters = new MasterClusters();
masterConfig = new MasterConfig();
masterConfig.setMasterAddress("127.0.0.1:5678");
masterSlotManager = new MasterSlotManager(clusterManager, masterConfig);
masterSlotManager = new MasterSlotManager(masterConfig);
MasterServerMetadata master1 = MasterServerMetadata.builder()
.cpuUsage(0.2)
.memoryUsage(0.4)
Expand All @@ -63,10 +63,11 @@ public void setUp() {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
clusterManager.getMasterClusters().onServerAdded(master1);
clusterManager.getMasterClusters().onServerAdded(master2);
clusterManager.getMasterClusters().onServerAdded(master3);
clusterManager.getMasterClusters().onServerAdded(master4);
this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
masterClusters.onServerAdded(master1);
masterClusters.onServerAdded(master2);
masterClusters.onServerAdded(master3);
masterClusters.onServerAdded(master4);
}

@Test
Expand Down Expand Up @@ -98,8 +99,8 @@ void doReBalance() {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
clusterManager.getMasterClusters().onServerRemove(master2);
clusterManager.getMasterClusters().onServerRemove(master3);
masterClusters.onServerRemove(master2);
masterClusters.onServerRemove(master3);
// After doReBalance, the total master slots should be 2
assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2);
}
Expand Down

0 comments on commit f823b6a

Please sign in to comment.