Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Feb 3, 2024
1 parent 7c877e0 commit 837de0b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ <T> T invoke(Server destinationServer, String path, HttpMethod httpMethod, Type
Map<String, String> authenticationHeaders = null;
Worker.Current currentWorker = Worker.current();
if (destinationServer instanceof Supervisor && currentWorker != null) {
// 这里可能存在Supervisor-A同时也为Worker-A角色,当Supervisor-A远程调用另一个Supervisor-B,
// 此时会用Worker-A的身份认证信息去调用Supervisor-B,接收方Supervisor-B也会认为是Worker-A调用过来的,与实际情况不大相符
authenticationHeaders = currentWorker.createWorkerAuthenticationHeaders();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class EventSubscribeService {
private static final Logger LOG = LoggerFactory.getLogger(EventSubscribeService.class);

private static final ConcurrentMap<Type, EventParam> CACHED = new ConcurrentHashMap<>();
private static final ConcurrentMap<Type, EventParam> MAP = new ConcurrentHashMap<>();

private final SchedGroupService schedGroupService;

Expand All @@ -46,14 +46,14 @@ public EventSubscribeService(SchedGroupService schedGroupService) {
public static void subscribe(EventParam param) {
if (param != null && param.getType() != null) {
// putIfAbsent不会更新param
CACHED.compute(param.getType(), (k, v) -> param);
MAP.compute(param.getType(), (k, v) -> param);
}
}

private void process() {
Set<Type> types = new HashSet<>(CACHED.keySet());
Set<Type> types = new HashSet<>(MAP.keySet());
for (Type type : types) {
ThrowingRunnable.doCaught(() -> process(CACHED.remove(type)));
ThrowingRunnable.doCaught(() -> process(MAP.remove(type)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,29 @@
public class ServerInvokeService extends SingletonClassConstraint {
private static final Logger LOG = LoggerFactory.getLogger(ServerInvokeService.class);

private final SupervisorRegistry supervisorRegistry;
private final Supervisor.Current currentSupervisor;
private final DesignatedServerInvoker<SupervisorRpcService> invokeSupervisor;
private final DesignatedServerInvoker<WorkerRpcService> invokeWorker;
private final SupervisorRegistry supervisorRegistry;

public ServerInvokeService(HttpProperties http,
SupervisorRegistry supervisorRegistry,
SupervisorRpcService supervisorRpcServiceProvider,
@Nullable WorkerRpcService workerRpcServiceProvider,
public ServerInvokeService(SupervisorRegistry supervisorRegistry,
Supervisor.Current currentSupervisor,
SupervisorRpcService supervisorProvider,
HttpProperties http,
@Nullable WorkerRpcService workerProvider,
@Nullable ObjectMapper objectMapper) {
RetryProperties retry = RetryProperties.of(0, 0);
this.invokeSupervisor = ServerRestProxy.create(
SupervisorRpcService.class, supervisorRpcServiceProvider, Supervisor.current(), http, retry, objectMapper
);
this.invokeWorker = ServerRestProxy.create(
WorkerRpcService.class, workerRpcServiceProvider, Worker.current(), http, retry, objectMapper
);
this.supervisorRegistry = supervisorRegistry;
this.currentSupervisor = currentSupervisor;
RetryProperties retry = RetryProperties.of(0, 0);
this.invokeSupervisor = ServerRestProxy.create(SupervisorRpcService.class, supervisorProvider, currentSupervisor, http, retry, objectMapper);
this.invokeWorker = ServerRestProxy.create(WorkerRpcService.class, workerProvider, Worker.current(), http, retry, objectMapper);
}

// ------------------------------------------------------------public methods

public List<SupervisorMetricsResponse> supervisors() throws Exception {
List<Supervisor> list = supervisorRegistry.getRegisteredServers();
list = Collects.sorted(list, Comparator.comparing(e -> e.equals(Supervisor.current()) ? 0 : 1));
list = Collects.sorted(list, Comparator.comparing(e -> e.equals(currentSupervisor) ? 0 : 1));
return MultithreadExecutors.call(list, this::getSupervisorMetrics, ThreadPoolExecutors.commonThreadPool());
}

Expand Down Expand Up @@ -126,7 +125,7 @@ public void publishOtherSupervisors(EventParam eventParam) {
try {
List<Supervisor> supervisors = supervisorRegistry.getRegisteredServers()
.stream()
.filter(e -> !Supervisor.current().sameSupervisor(e))
.filter(e -> !currentSupervisor.sameSupervisor(e))
.collect(Collectors.toList());
MultithreadExecutors.run(
supervisors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ static synchronized void setWorkerThreadPool(WorkerThreadPool threadPool) {
}

public static WorkerMetrics metrics() {
Worker.Current currentWorker = Worker.current();
WorkerMetrics metrics = new WorkerMetrics();
metrics.setVersion(JobConstants.VERSION);
metrics.setWorkerId(Worker.current().getWorkerId());
metrics.setStartupAt(Dates.toDate(Worker.current().getStartupAt()));
metrics.setWorkerId(currentWorker.getWorkerId());
metrics.setStartupAt(Dates.toDate(currentWorker.getStartupAt()));
metrics.setAlsoSupervisor(Supervisor.current() != null);
metrics.setJvmThreadActiveCount(Thread.activeCount());
if (workerThreadPool != null) {
metrics.setThreadPool(workerThreadPool.metrics());
}
metrics.setSignature(Worker.current().createWorkerSignatureToken());
metrics.setSignature(currentWorker.createWorkerSignatureToken());
return metrics;
}

Expand Down

0 comments on commit 837de0b

Please sign in to comment.