From 7a98dbccea5b035a6caab036944f24c895f06bab Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 19 Jul 2024 16:00:06 +0800 Subject: [PATCH 1/2] [Fix][Zeta] Fix hybrid deployment can not get worker when init --- .../engine/client/SeaTunnelClientTest.java | 13 ++++++++----- .../engine/server/SeaTunnelServer.java | 6 ++++++ .../AbstractResourceManager.java | 19 ++++++++++--------- .../opeartion/SyncWorkerProfileOperation.java | 6 +++++- .../resourcemanager/ResourceManagerTest.java | 5 +++++ 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 1510e7727f7..d7e55db4ec2 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -392,11 +392,14 @@ public void testSetJobIdDuplicate() { Assertions.assertThrows( Exception.class, () -> jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2()); - Assertions.assertEquals( - String.format( - "The job id %s has already been submitted and is not starting with a savepoint.", - jobId), - exception.getCause().getMessage()); + Assertions.assertTrue( + exception + .getCause() + .getMessage() + .contains( + String.format( + "The job id %s has already been submitted and is not starting with a savepoint.", + jobId))); } catch (Exception e) { throw new RuntimeException(e); } finally { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 765869fd030..b76af4c19a0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -84,6 +84,12 @@ public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) { /** Lazy load for Slot Service */ public SlotService getSlotService() { + // If the node is master node, the slot service is not needed. + if (EngineConfig.ClusterRole.MASTER.ordinal() + == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) { + return null; + } + if (slotService == null) { synchronized (this) { if (slotService == null) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index b830e5f0563..56e600d555d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -74,25 +74,26 @@ public void init() { private void initWorker() { log.info("initWorker... "); - List
aliveWorker = + List
aliveNode = nodeEngine.getClusterService().getMembers().stream() - .filter(Member::isLiteMember) .map(Member::getAddress) .collect(Collectors.toList()); - log.info("initWorker live nodes: " + aliveWorker); + log.info("init live nodes: {}", aliveNode); List> futures = - aliveWorker.stream() + aliveNode.stream() .map( - worker -> - sendToMember(new SyncWorkerProfileOperation(), worker) + node -> + sendToMember(new SyncWorkerProfileOperation(), node) .thenAccept( p -> { - registerWorker.put( - worker, (WorkerProfile) p); + if (p != null) { + registerWorker.put( + node, (WorkerProfile) p); + } })) .collect(Collectors.toList()); futures.forEach(CompletableFuture::join); - log.info("registerWorker: " + registerWorker); + log.info("registerWorker: {}", registerWorker); } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java index ebe85e3dafc..904629648ab 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/opeartion/SyncWorkerProfileOperation.java @@ -33,7 +33,11 @@ public class SyncWorkerProfileOperation extends Operation implements IdentifiedD @Override public void run() throws Exception { SeaTunnelServer server = getService(); - result = server.getSlotService().getWorkerProfile(); + if (server.getSlotService() != null) { + result = server.getSlotService().getWorkerProfile(); + } else { + result = null; + } } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java index abd4ccdc090..5ac803064a8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManagerTest.java @@ -54,6 +54,11 @@ public void before() { server.getSlotService(); } + @Test + public void testHaveWorkerWhenUseHybridDeployment() { + Assertions.assertEquals(1, resourceManager.workerCount(null)); + } + @Test public void testApplyRequest() throws ExecutionException, InterruptedException { List resourceProfiles = new ArrayList<>(); From abff0fc0a7fda537549e60417be810b2f1eb57f3 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 19 Jul 2024 16:40:04 +0800 Subject: [PATCH 2/2] update --- .../server/resourcemanager/AbstractResourceManager.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index 56e600d555d..6c04748ccca 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -89,6 +89,11 @@ private void initWorker() { if (p != null) { registerWorker.put( node, (WorkerProfile) p); + log.info( + "received new worker register: " + + ((WorkerProfile) + p) + .getAddress()); } })) .collect(Collectors.toList());