diff --git a/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/ServerRestTemplate.java b/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/ServerRestTemplate.java index 285e3e262..38ace9b4a 100644 --- a/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/ServerRestTemplate.java +++ b/disjob-registry/disjob-registry-api/src/main/java/cn/ponfee/disjob/registry/rpc/ServerRestTemplate.java @@ -56,6 +56,8 @@ T invoke(Server destinationServer, String path, HttpMethod httpMethod, Type Map authenticationHeaders = null; Worker.Current currentWorker = Worker.current(); if (destinationServer instanceof Supervisor && currentWorker != null) { + // 这里可能存在Supervisor-A同时也为Worker-A角色,当Supervisor-A远程调用另一个Supervisor-B, + // 此时会用Worker-A的身份认证信息去调用Supervisor-B,接收方Supervisor-B也会认为是Worker-A调用过来的,与实际情况不大相符 authenticationHeaders = currentWorker.createWorkerAuthenticationHeaders(); } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java index 040f4269d..47ecc8966 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/EventSubscribeService.java @@ -32,7 +32,7 @@ public class EventSubscribeService { private static final Logger LOG = LoggerFactory.getLogger(EventSubscribeService.class); - private static final ConcurrentMap CACHED = new ConcurrentHashMap<>(); + private static final ConcurrentMap MAP = new ConcurrentHashMap<>(); private final SchedGroupService schedGroupService; @@ -46,14 +46,14 @@ public EventSubscribeService(SchedGroupService schedGroupService) { public static void subscribe(EventParam param) { if (param != null && param.getType() != null) { // putIfAbsent不会更新param - CACHED.compute(param.getType(), (k, v) -> param); + MAP.compute(param.getType(), (k, v) -> param); } } private void process() { - Set types = new HashSet<>(CACHED.keySet()); + Set types = new HashSet<>(MAP.keySet()); for (Type type : types) { - ThrowingRunnable.doCaught(() -> process(CACHED.remove(type))); + ThrowingRunnable.doCaught(() -> process(MAP.remove(type))); } } diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/ServerInvokeService.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/ServerInvokeService.java index a1b7d295c..b8d50a9cf 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/ServerInvokeService.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/application/ServerInvokeService.java @@ -52,30 +52,29 @@ public class ServerInvokeService extends SingletonClassConstraint { private static final Logger LOG = LoggerFactory.getLogger(ServerInvokeService.class); + private final SupervisorRegistry supervisorRegistry; + private final Supervisor.Current currentSupervisor; private final DesignatedServerInvoker invokeSupervisor; private final DesignatedServerInvoker invokeWorker; - private final SupervisorRegistry supervisorRegistry; - public ServerInvokeService(HttpProperties http, - SupervisorRegistry supervisorRegistry, - SupervisorRpcService supervisorRpcServiceProvider, - @Nullable WorkerRpcService workerRpcServiceProvider, + public ServerInvokeService(SupervisorRegistry supervisorRegistry, + Supervisor.Current currentSupervisor, + SupervisorRpcService supervisorProvider, + HttpProperties http, + @Nullable WorkerRpcService workerProvider, @Nullable ObjectMapper objectMapper) { - RetryProperties retry = RetryProperties.of(0, 0); - this.invokeSupervisor = ServerRestProxy.create( - SupervisorRpcService.class, supervisorRpcServiceProvider, Supervisor.current(), http, retry, objectMapper - ); - this.invokeWorker = ServerRestProxy.create( - WorkerRpcService.class, workerRpcServiceProvider, Worker.current(), http, retry, objectMapper - ); this.supervisorRegistry = supervisorRegistry; + this.currentSupervisor = currentSupervisor; + RetryProperties retry = RetryProperties.of(0, 0); + this.invokeSupervisor = ServerRestProxy.create(SupervisorRpcService.class, supervisorProvider, currentSupervisor, http, retry, objectMapper); + this.invokeWorker = ServerRestProxy.create(WorkerRpcService.class, workerProvider, Worker.current(), http, retry, objectMapper); } // ------------------------------------------------------------public methods public List supervisors() throws Exception { List list = supervisorRegistry.getRegisteredServers(); - list = Collects.sorted(list, Comparator.comparing(e -> e.equals(Supervisor.current()) ? 0 : 1)); + list = Collects.sorted(list, Comparator.comparing(e -> e.equals(currentSupervisor) ? 0 : 1)); return MultithreadExecutors.call(list, this::getSupervisorMetrics, ThreadPoolExecutors.commonThreadPool()); } @@ -126,7 +125,7 @@ public void publishOtherSupervisors(EventParam eventParam) { try { List supervisors = supervisorRegistry.getRegisteredServers() .stream() - .filter(e -> !Supervisor.current().sameSupervisor(e)) + .filter(e -> !currentSupervisor.sameSupervisor(e)) .collect(Collectors.toList()); MultithreadExecutors.run( supervisors, diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerConfigurator.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerConfigurator.java index dbb6927b4..5570905da 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerConfigurator.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerConfigurator.java @@ -31,16 +31,17 @@ static synchronized void setWorkerThreadPool(WorkerThreadPool threadPool) { } public static WorkerMetrics metrics() { + Worker.Current currentWorker = Worker.current(); WorkerMetrics metrics = new WorkerMetrics(); metrics.setVersion(JobConstants.VERSION); - metrics.setWorkerId(Worker.current().getWorkerId()); - metrics.setStartupAt(Dates.toDate(Worker.current().getStartupAt())); + metrics.setWorkerId(currentWorker.getWorkerId()); + metrics.setStartupAt(Dates.toDate(currentWorker.getStartupAt())); metrics.setAlsoSupervisor(Supervisor.current() != null); metrics.setJvmThreadActiveCount(Thread.activeCount()); if (workerThreadPool != null) { metrics.setThreadPool(workerThreadPool.metrics()); } - metrics.setSignature(Worker.current().createWorkerSignatureToken()); + metrics.setSignature(currentWorker.createWorkerSignatureToken()); return metrics; }