diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 1c3cfc86b743..98cbe5205235 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -80,4 +80,15 @@ public static void sleep(final long millis) { log.error("Current thread sleep error", interruptedException); } } + + public static void rethrowInterruptedException(InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Current thread: " + Thread.currentThread().getName() + " is interrupted", + interruptedException); + } + + public static void consumeInterruptedException(InterruptedException interruptedException) { + log.info("Current thread: {} is interrupted", Thread.currentThread().getName(), interruptedException); + Thread.currentThread().interrupt(); + } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java index 246e11735c5f..b3f018940027 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java @@ -32,6 +32,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -48,7 +49,7 @@ @Slf4j class NettyRemotingServer { - private final ServerBootstrap serverBootstrap = new ServerBootstrap(); + private Channel serverBootstrapChannel; @Getter private final String serverName; @@ -87,7 +88,7 @@ class NettyRemotingServer { void start() { if (isStarted.compareAndSet(false, true)) { - this.serverBootstrap + ServerBootstrap serverBootstrap = new ServerBootstrap() .group(this.bossGroup, this.workGroup) .channel(NettyUtils.getServerSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, true) @@ -104,23 +105,24 @@ protected void initChannel(SocketChannel ch) { } }); - ChannelFuture future; try { - future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); + final ChannelFuture channelFuture = serverBootstrap.bind(serverConfig.getListenPort()).sync(); + if (channelFuture.isSuccess()) { + log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort()); + this.serverBootstrapChannel = channelFuture.channel(); + } else { + throw new RemoteException( + String.format("%s bind %s fail", serverConfig.getServerName(), + serverConfig.getListenPort()), + channelFuture.cause()); + } + } catch (InterruptedException it) { + ThreadUtils.rethrowInterruptedException(it); } catch (Exception e) { throw new RemoteException( String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), e); } - - if (future.isSuccess()) { - log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort()); - return; - } - - throw new RemoteException( - String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()), - future.cause()); } } @@ -144,18 +146,25 @@ void registerMethodInvoker(ServerMethodInvoker methodInvoker) { void close() { if (isStarted.compareAndSet(true, false)) { + log.info("{} closing", serverConfig.getServerName()); try { + if (serverBootstrapChannel != null) { + serverBootstrapChannel.close().sync(); + log.info("{} stop bind at port: {}", serverConfig.getServerName(), serverConfig.getListenPort()); + } if (bossGroup != null) { this.bossGroup.shutdownGracefully(); } if (workGroup != null) { this.workGroup.shutdownGracefully(); } - methodInvokerExecutor.shutdown(); + methodInvokerExecutor.shutdownNow(); + } catch (InterruptedException it) { + ThreadUtils.consumeInterruptedException(it); } catch (Exception ex) { - log.error("netty server close exception", ex); + log.error("{} close failed", serverConfig.getServerName(), ex); } - log.info("netty server closed"); + log.info("{} closed", serverConfig.getServerName()); } } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java index 1e5a77edb332..649801d27087 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcServerTest.java @@ -27,12 +27,8 @@ class MasterRpcServerTest { private final MasterRpcServer masterRpcServer = new MasterRpcServer(new MasterConfig()); @Test - void testStart() { + void testStartAndClose() { Assertions.assertDoesNotThrow(masterRpcServer::start); - } - - @Test - void testClose() { Assertions.assertDoesNotThrow(masterRpcServer::close); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java index 994347366f51..7c0a4c3ae1d6 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.registry.api; import java.io.Closeable; +import java.io.IOException; import java.time.Duration; import java.util.Collection; @@ -109,4 +110,7 @@ public interface Registry extends Closeable { * Release the lock of the prefix {@param key} */ boolean releaseLock(String key); + + @Override + void close() throws IOException; } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java index 3d0c169e597c..a9a23dfa0fc3 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.registry.api.SubscribeListener; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -172,8 +173,8 @@ public void testChildren() { registry.put(master1, value, true); registry.put(master2, value, true); assertThat(registry.children("/nodes/children")).containsExactly("childGroup1"); - assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080", - "127.0.0.2:8080"); + assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn( + Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080")); } @Test diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index 11e3f62172eb..bd468f58b792 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry { JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) { this.jdbcRegistryProperties = jdbcRegistryProperties; this.jdbcRegistryServer = jdbcRegistryServer; - this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer); - log.info("Initialize Jdbc Registry..."); + this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer); + log.info("Initialized Jdbc Registry..."); } @Override @@ -259,13 +259,14 @@ public boolean releaseLock(String key) { @Override public void close() { - log.info("Closing Jdbc Registry..."); + log.info("Closing JdbcRegistry..."); // remove the current Ephemeral node, if can connect to jdbc - try (JdbcRegistryClient closed1 = jdbcRegistryClient) { - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow(); + JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow(); + try (final JdbcRegistryClient closed1 = jdbcRegistryClient) { + // ignore } catch (Exception e) { - log.error("Close Jdbc Registry error", e); + log.error("Close JdbcRegistry error", e); } - log.info("Closed Jdbc Registry..."); + log.info("Closed JdbcRegistry..."); } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java index b00424c8e498..e4415474442e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/client/JdbcRegistryClient.java @@ -20,7 +20,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener; @@ -41,14 +40,11 @@ public class JdbcRegistryClient implements IJdbcRegistryClient { private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" + OSUtils.getProcessID(); - private final JdbcRegistryProperties jdbcRegistryProperties; - private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify; private final IJdbcRegistryServer jdbcRegistryServer; - public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) { - this.jdbcRegistryProperties = jdbcRegistryProperties; + public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) { this.jdbcRegistryServer = jdbcRegistryServer; this.jdbcRegistryClientIdentify = new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), DEFAULT_CLIENT_NAME); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java index e04360bc6f03..f060abf5bac3 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java @@ -107,8 +107,8 @@ public void start() { purgeInvalidJdbcRegistryMetadata(); JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( this::purgeInvalidJdbcRegistryMetadata, - jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), - jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), + jdbcRegistryProperties.getSessionTimeout().toMillis(), + jdbcRegistryProperties.getSessionTimeout().toMillis(), TimeUnit.MILLISECONDS); jdbcRegistryDataManager.start(); jdbcRegistryServerState = JdbcRegistryServerState.STARTED; @@ -149,13 +149,13 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) { @Override public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) { checkNotNull(jdbcRegistryClient); - jdbcRegistryClients.remove(jdbcRegistryClient); - jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify()); + final JdbcRegistryClientIdentify clientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify(); + checkNotNull(clientIdentify); - JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify(); - checkNotNull(jdbcRegistryClientIdentify); + jdbcRegistryClients.removeIf(client -> clientIdentify.equals(client.getJdbcRegistryClientIdentify())); + jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify()); - doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId())); + doPurgeJdbcRegistryClientInDB(Lists.newArrayList(clientIdentify.getClientId())); } @Override