Skip to content

Commit

Permalink
get registered servers
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Dec 4, 2023
1 parent c8b8585 commit 9c3c421
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class SchedGroup extends BaseEntity implements Serializable {
private String userToken;

/**
* 负责人
* Group own user
*/
private String ownUser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cn.ponfee.disjob.core.base.Server;

import java.io.Closeable;
import java.util.List;

/**
* Server registry.
Expand Down Expand Up @@ -41,6 +42,14 @@ public interface Registry<R extends Server> extends Closeable {
*/
ServerRole registryRole();

/**
* Gets alive registered servers.
*
* @return list of alive registered servers
* @throws Exception if occur exception
*/
List<R> getRegisteredServers() throws Exception;

/**
* Close registry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
import cn.ponfee.disjob.common.util.GenericUtils;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.discovery.DiscoveryServer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Registry and discovery server.
Expand Down Expand Up @@ -110,6 +115,22 @@ public final ServerRole discoveryRole() {
return discoveryRole;
}

protected final List<R> deserializeRegistryServers(List<String> list) {
return deserializeRegistryServers(list, Function.identity());
}

protected final <T> List<R> deserializeRegistryServers(List<T> list, Function<T, String> function) {
if (CollectionUtils.isEmpty(list)) {
return Collections.emptyList();
}
return list.stream()
.filter(Objects::nonNull)
.map(function)
.filter(StringUtils::isNotBlank)
.<R>map(registryRole::deserialize)
.collect(Collectors.toList());
}

// -------------------------------------------------------------------------------------private method

private static String prune(String namespace, char separator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.consul.configuration.ConsulRegistryProperties;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.OperationException;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
Expand Down Expand Up @@ -113,6 +114,16 @@ public final void deregister(R server) {
}
}

@Override
public List<R> getRegisteredServers() {
HealthServicesRequest request = HealthServicesRequest.newBuilder()
.setPassing(true)
.setToken(token)
.build();
List<HealthService> list = client.getHealthServices(registryRootPath, request).getValue();
return deserializeRegistryServers(list, e -> e.getService().getId().substring(registryRootPath.length() + 1));
}

// ------------------------------------------------------------------Close

@PreDestroy
Expand Down Expand Up @@ -175,7 +186,12 @@ private void checkPass() {
}
log.debug("check pass for server: {} with check id {}", server, checkId);
} catch (Throwable t) {
log.warn("fail to check pass for server: " + server + ", check id is: " + checkId, t);
if ((t instanceof OperationException) && ((OperationException) t).getStatusCode() == 404) {
ThrowingRunnable.doCaught(() -> register(server), () -> "Not found server register failed: " + server);
log.warn("Check pass server not found: " + server + ", check id: " + checkId, t);
} else {
log.warn("Check pass server failed: " + server + ", check id: " + checkId, t);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class DatabaseServerRegistry<R extends Server, D extends Server>

private static final String DEREGISTER_SQL = "DELETE FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND server=?";

private static final String DISCOVER_SQL = "SELECT server FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time>?";
private static final String SELECT_SQL = "SELECT server FROM " + TABLE_NAME + " WHERE namespace=? AND role=? AND heartbeat_time>?";

/**
* Registry namespace
Expand Down Expand Up @@ -179,6 +179,12 @@ public final void deregister(R server) {
log.info("Server deregister: {} | {}", registryRole.name(), server);
}

@Override
public List<R> getRegisteredServers() {
Object[] args = {namespace, registerRoleName, System.currentTimeMillis() - sessionTimeoutMs};
return deserializeRegistryServers(jdbcTemplateWrapper.list(SELECT_SQL, JdbcTemplateWrapper.STRING_ROW_MAPPER, args));
}

// ------------------------------------------------------------------Close

@PreDestroy
Expand Down Expand Up @@ -220,7 +226,7 @@ private void registerServers() {
private void discoverServers() throws Throwable {
RetryTemplate.execute(() -> {
Object[] args = {namespace, discoveryRoleName, System.currentTimeMillis() - sessionTimeoutMs};
List<String> discovered = jdbcTemplateWrapper.list(DISCOVER_SQL, JdbcTemplateWrapper.STRING_ROW_MAPPER, args);
List<String> discovered = jdbcTemplateWrapper.list(SELECT_SQL, JdbcTemplateWrapper.STRING_ROW_MAPPER, args);

if (CollectionUtils.isEmpty(discovered)) {
log.warn("Not discovered available {} from database.", discoveryRole.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ public final void deregister(R server) {
}
}

private String buildRegistryServerId(R server) {
return registryRootPath + separator + server.serialize();
@Override
public List<R> getRegisteredServers() throws Exception {
return deserializeRegistryServers(client.getKeyChildren(registryRootPath));
}

// ------------------------------------------------------------------Close
Expand All @@ -149,6 +150,10 @@ public void close() {

// ------------------------------------------------------------------private method

private String buildRegistryServerId(R server) {
return registryRootPath + separator + server.serialize();
}

private synchronized void doRefreshDiscoveryServers(List<String> list) {
List<D> servers;
if (CollectionUtils.isEmpty(list)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public final void deregister(R server) {
}
}

@Override
public List<R> getRegisteredServers() throws Exception {
List<Instance> list = namingService.getAllInstances(registryRootPath, groupName);
return deserializeRegistryServers(list, Instance::getInstanceId);
}

// ------------------------------------------------------------------Close

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class RedisServerRegistry<R extends Server, D extends Server> ex
Void.class
);

private static final RedisScript<List> DISCOVERY_SCRIPT = RedisScript.of(
private static final RedisScript<List> QUERY_SCRIPT = RedisScript.of(
"redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1]); \n" +
"local ret = redis.call('zrangebyscore', KEYS[1], ARGV[1], '+inf'); \n" +
"redis.call('pexpire', KEYS[1], ARGV[2]); \n" +
Expand Down Expand Up @@ -184,6 +184,14 @@ public final void deregister(R server) {
log.info("Server deregister: {} | {}", registryRole.name(), server);
}

@Override
public List<R> getRegisteredServers() {
List<String> registryServers = stringRedisTemplate.execute(
QUERY_SCRIPT, registryRedisKey, Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS
);
return deserializeRegistryServers(registryServers);
}

// ------------------------------------------------------------------Close

@PreDestroy
Expand Down Expand Up @@ -291,7 +299,7 @@ private void tryDiscoverServers() {
private void doDiscoverServers() throws Throwable {
RetryTemplate.execute(() -> {
List<String> discovered = stringRedisTemplate.execute(
DISCOVERY_SCRIPT, discoveryRedisKey, Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS
QUERY_SCRIPT, discoveryRedisKey, Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS
);

if (CollectionUtils.isEmpty(discovered)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ public abstract class ZookeeperServerRegistry<R extends Server, D extends Server
private static final int CREATE_EPHEMERAL_FAIL_RETRIES = 3;

private final CuratorFrameworkClient client;
private final String zkRegistryRootPath;

protected ZookeeperServerRegistry(ZookeeperRegistryProperties config) {
super(config.getNamespace(), Char.SLASH);
// zookeeper parent path must start with "/"
String registryRootPath0 = separator + registryRootPath;
String discoveryRootPath0 = separator + discoveryRootPath;
this.zkRegistryRootPath = separator + registryRootPath;
String zkDiscoveryRootPath = separator + discoveryRootPath;

CountDownLatch latch = new CountDownLatch(1);
try {
Expand All @@ -54,10 +55,10 @@ protected ZookeeperServerRegistry(ZookeeperRegistryProperties config) {
}
}
});
client.createPersistent(registryRootPath0);
client.createPersistent(discoveryRootPath0);
//client.listenChildChanged(discoveryRootPath0);
client.watchChildChanged(discoveryRootPath0, latch, this::doRefreshDiscoveryServers);
client.createPersistent(zkRegistryRootPath);
client.createPersistent(zkDiscoveryRootPath);
//client.listenChildChanged(zkDiscoveryRootPath);
client.watchChildChanged(zkDiscoveryRootPath, latch, this::doRefreshDiscoveryServers);
} catch (Exception e) {
throw new RegistryException("Zookeeper registry init error: " + config, e);
} finally {
Expand Down Expand Up @@ -99,6 +100,11 @@ public final void deregister(R server) {
}
}

@Override
public List<R> getRegisteredServers() throws Exception {
return deserializeRegistryServers(client.getChildren(zkRegistryRootPath));
}

// ------------------------------------------------------------------Close

@PreDestroy
Expand All @@ -117,7 +123,7 @@ public void close() {
// ------------------------------------------------------------------private methods

private String buildRegistryPath(R server) {
return separator + registryRootPath + separator + server.serialize();
return zkRegistryRootPath + separator + server.serialize();
}

private synchronized void doRefreshDiscoveryServers(List<String> list) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# 当使用database作为注册中心时需要配置
# 当使用database作为注册中心时使用该配置,同时需要在“application.yml”配置文件的“spring.profiles.include”项中增加“registry_database”,即“spring.profiles.include: xxx,registry_database”
disjob.registry.database:
namespace: mysql_namespace
session-timeout-ms: 30000
Expand Down
2 changes: 1 addition & 1 deletion sql/mysql-disjob.sql
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ CREATE TABLE `sched_group` (
`supervisor_token` VARCHAR(60) DEFAULT NULL COMMENT 'Supervisor访问Worker的密钥令牌',
`worker_token` VARCHAR(60) DEFAULT NULL COMMENT 'Worker访问Supervisor的密钥令牌',
`user_token` VARCHAR(60) DEFAULT NULL COMMENT 'User访问Supervisor的openapi接口密钥令牌(未部署Admin 或 提供类似开放平台 时使用)',
`own_user` VARCHAR(36) DEFAULT NULL COMMENT '负责人',
`own_user` VARCHAR(36) DEFAULT NULL COMMENT 'Group own user',
`alarm_users` VARCHAR(1024) DEFAULT NULL COMMENT '告警人员(多个逗号分隔)',
`web_hook` VARCHAR(255) DEFAULT NULL COMMENT '告警web hook地址',
`version` INT UNSIGNED NOT NULL DEFAULT '1' COMMENT '行记录版本号',
Expand Down

0 comments on commit 9c3c421

Please sign in to comment.