Skip to content

Commit

Permalink
[Improvement-16880] Merge worker group from config and ui and distinc…
Browse files Browse the repository at this point in the history
…t display it in api (#16883)
  • Loading branch information
SbloodyS authored Dec 9, 2024
1 parent cad8df0 commit 566651c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public Map<String, Object> queryAssignedWorkerGroupsByProject(User loginUser, Lo
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroup);
return projectWorkerGroup;
}).collect(Collectors.toList());
}).distinct().collect(Collectors.toList());

result.put(Constants.DATA_LIST, projectWorkerGroups);
putMsg(result, Status.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Map<String, Object> queryAllGroup(User loginUser) {
.map(WorkerGroup::getName)
.collect(Collectors.toList());
availableWorkerGroupList.addAll(configWorkerGroupNames);
result.put(Constants.DATA_LIST, availableWorkerGroupList);
result.put(Constants.DATA_LIST, availableWorkerGroupList.stream().distinct().collect(Collectors.toList()));
putMsg(result, Status.SUCCESS);
return result;
}
Expand Down Expand Up @@ -362,6 +362,7 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
}

workerGroupDao.deleteById(id);
boardCastToMasterThatWorkerGroupChanged();

log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServe
// WorkerIdentifier(workerAddress) -> worker
private final Map<String, WorkerServerMetadata> workerMapping = new ConcurrentHashMap<>();

// WorkerGroup -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> workerGroupMapping = new ConcurrentHashMap<>();
// WorkerGroup from db -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> dbWorkerGroupMapping = new ConcurrentHashMap<>();

// WorkerGroup from config -> WorkerIdentifier(workerAddress)
private final Map<String, List<String>> configWorkerGroupMapping = new ConcurrentHashMap<>();

private final List<IClustersChangeListener<WorkerServerMetadata>> workerClusterChangeListeners =
new CopyOnWriteArrayList<>();
Expand All @@ -59,27 +62,44 @@ public Optional<WorkerServerMetadata> getServer(final String address) {
return Optional.ofNullable(workerMapping.get(address));
}

public List<String> getWorkerServerAddressByGroup(String workerGroup) {
public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getConfigWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
}
return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
}

public List<String> getNormalWorkerServerAddressByGroup(String workerGroup) {
List<String> normalWorkerAddresses = getWorkerServerAddressByGroup(workerGroup)
List<String> dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
List<String> configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
return UnmodifiableList.unmodifiableList(normalWorkerAddresses);
dbWorkerAddresses.removeAll(configWorkerAddresses);
dbWorkerAddresses.addAll(configWorkerAddresses);
return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
}

public boolean containsWorkerGroup(String workerGroup) {
return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
|| workerGroupMapping.containsKey(workerGroup);
|| dbWorkerGroupMapping.containsKey(workerGroup)
|| configWorkerGroupMapping.containsKey(workerGroup);
}

@Override
Expand All @@ -89,9 +109,9 @@ public void registerListener(IClustersChangeListener<WorkerServerMetadata> liste

@Override
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
synchronized (workerGroupMapping) {
synchronized (dbWorkerGroupMapping) {
for (WorkerGroup workerGroup : workerGroups) {
workerGroupMapping.remove(workerGroup.getName());
dbWorkerGroupMapping.remove(workerGroup.getName());
}
}
}
Expand All @@ -112,8 +132,8 @@ public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
.filter(Objects::nonNull)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
synchronized (workerGroupMapping) {
workerGroupMapping.put(workerGroup.getName(), activeWorkers);
synchronized (dbWorkerGroupMapping) {
dbWorkerGroupMapping.put(workerGroup.getName(), activeWorkers);
}
}
}
Expand All @@ -130,15 +150,15 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) {
@Override
public void onServerAdded(WorkerServerMetadata workerServer) {
workerMapping.put(workerServer.getAddress(), workerServer);
synchronized (workerGroupMapping) {
List<String> addWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup());
synchronized (configWorkerGroupMapping) {
List<String> addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (addWorkerGroupAddrList == null) {
List<String> newWorkerGroupAddrList = new ArrayList<>();
newWorkerGroupAddrList.add(workerServer.getAddress());
workerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
} else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
addWorkerGroupAddrList.add(workerServer.getAddress());
workerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
}
}
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {
Expand All @@ -149,12 +169,12 @@ public void onServerAdded(WorkerServerMetadata workerServer) {
@Override
public void onServerRemove(WorkerServerMetadata workerServer) {
workerMapping.remove(workerServer.getAddress(), workerServer);
synchronized (workerGroupMapping) {
List<String> removeWorkerGroupAddrList = workerGroupMapping.get(workerServer.getWorkerGroup());
synchronized (configWorkerGroupMapping) {
List<String> removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
removeWorkerGroupAddrList.remove(workerServer.getAddress());
if (removeWorkerGroupAddrList.isEmpty()) {
workerGroupMapping.remove(workerServer.getWorkerGroup());
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
}

@Test
Expand All @@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
}

Expand All @@ -74,15 +74,15 @@ void testOnWorkerGroupChange() {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());

WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
.name("flinkCluster")
.addrList("")
.build();
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
}

Expand All @@ -94,7 +94,7 @@ void testOnServerAdded() {
WorkerClusters workerClusters = new WorkerClusters();
workerClusters.onServerAdded(normalWorkerServerMetadata);
workerClusters.onServerAdded(busyWorkerServerMetadata);
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -110,7 +110,7 @@ void testOnServerRemove() {
workerClusters.onServerAdded(busyWorkerServerMetadata);
workerClusters.onServerRemove(busyWorkerServerMetadata);

assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
Expand All @@ -137,7 +137,7 @@ void testOnServerUpdate() {

workerClusters.onServerUpdate(workerServerMetadata);

assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker group name
# worker group name. If it is not set, the default value is default.
group: default
server-load-protection:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void validate(Object target, Errors errors) {

workerConfig.setWorkerRegistryPath(
RegistryNodeType.WORKER.getRegistryPath() + "/" + workerConfig.getWorkerAddress());

if (StringUtils.isEmpty(group)) {
workerConfig.setGroup("default");
}

printConfig();
}

Expand All @@ -87,6 +92,7 @@ private void printConfig() {
"\n address -> " + workerAddress +
"\n registry-path: " + workerRegistryPath +
"\n physical-task-config -> " + physicalTaskConfig +
"\n group -> " + group +
"\n****************************Worker Configuration**************************************";
log.info(config);
}
Expand Down

0 comments on commit 566651c

Please sign in to comment.