Skip to content

Commit

Permalink
[Chore] Fix JdbcRegistryTestCase might failed due to purge dead clien…
Browse files Browse the repository at this point in the history
…ts interval is too small (#16894)
  • Loading branch information
ruanwenjun authored Dec 13, 2024
1 parent 566651c commit f57acab
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ public static void sleep(final long millis) {
log.error("Current thread sleep error", interruptedException);
}
}

public static void rethrowInterruptedException(InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException("Current thread: " + Thread.currentThread().getName() + " is interrupted",
interruptedException);
}

public static void consumeInterruptedException(InterruptedException interruptedException) {
log.info("Current thread: {} is interrupted", Thread.currentThread().getName(), interruptedException);
Thread.currentThread().interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand All @@ -48,7 +49,7 @@
@Slf4j
class NettyRemotingServer {

private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private Channel serverBootstrapChannel;

@Getter
private final String serverName;
Expand Down Expand Up @@ -87,7 +88,7 @@ class NettyRemotingServer {

void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
Expand All @@ -104,23 +105,24 @@ protected void initChannel(SocketChannel ch) {
}
});

ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
final ChannelFuture channelFuture = serverBootstrap.bind(serverConfig.getListenPort()).sync();
if (channelFuture.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
this.serverBootstrapChannel = channelFuture.channel();
} else {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(),
serverConfig.getListenPort()),
channelFuture.cause());
}
} catch (InterruptedException it) {
ThreadUtils.rethrowInterruptedException(it);
} catch (Exception e) {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
e);
}

if (future.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
return;
}

throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
future.cause());
}
}

Expand All @@ -144,18 +146,25 @@ void registerMethodInvoker(ServerMethodInvoker methodInvoker) {

void close() {
if (isStarted.compareAndSet(true, false)) {
log.info("{} closing", serverConfig.getServerName());
try {
if (serverBootstrapChannel != null) {
serverBootstrapChannel.close().sync();
log.info("{} stop bind at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
}
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
methodInvokerExecutor.shutdown();
methodInvokerExecutor.shutdownNow();
} catch (InterruptedException it) {
ThreadUtils.consumeInterruptedException(it);
} catch (Exception ex) {
log.error("netty server close exception", ex);
log.error("{} close failed", serverConfig.getServerName(), ex);
}
log.info("netty server closed");
log.info("{} closed", serverConfig.getServerName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ class MasterRpcServerTest {
private final MasterRpcServer masterRpcServer = new MasterRpcServer(new MasterConfig());

@Test
void testStart() {
void testStartAndClose() {
Assertions.assertDoesNotThrow(masterRpcServer::start);
}

@Test
void testClose() {
Assertions.assertDoesNotThrow(masterRpcServer::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.registry.api;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;

Expand Down Expand Up @@ -109,4 +110,7 @@ public interface Registry extends Closeable {
* Release the lock of the prefix {@param key}
*/
boolean releaseLock(String key);

@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.dolphinscheduler.registry.api.SubscribeListener;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -172,8 +173,8 @@ public void testChildren() {
registry.put(master1, value, true);
registry.put(master2, value, true);
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
"127.0.0.2:8080");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn(
Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry {
JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer);
log.info("Initialize Jdbc Registry...");
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer);
log.info("Initialized Jdbc Registry...");
}

@Override
Expand Down Expand Up @@ -259,13 +259,14 @@ public boolean releaseLock(String key) {

@Override
public void close() {
log.info("Closing Jdbc Registry...");
log.info("Closing JdbcRegistry...");
// remove the current Ephemeral node, if can connect to jdbc
try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
try (final JdbcRegistryClient closed1 = jdbcRegistryClient) {
// ignore
} catch (Exception e) {
log.error("Close Jdbc Registry error", e);
log.error("Close JdbcRegistry error", e);
}
log.info("Closed Jdbc Registry...");
log.info("Closed JdbcRegistry...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO;
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener;
Expand All @@ -41,14 +40,11 @@ public class JdbcRegistryClient implements IJdbcRegistryClient {

private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" + OSUtils.getProcessID();

private final JdbcRegistryProperties jdbcRegistryProperties;

private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify;

private final IJdbcRegistryServer jdbcRegistryServer;

public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClientIdentify =
new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), DEFAULT_CLIENT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void start() {
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
Expand Down Expand Up @@ -149,13 +149,13 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) {
@Override
public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) {
checkNotNull(jdbcRegistryClient);
jdbcRegistryClients.remove(jdbcRegistryClient);
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());
final JdbcRegistryClientIdentify clientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify();
checkNotNull(clientIdentify);

JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify();
checkNotNull(jdbcRegistryClientIdentify);
jdbcRegistryClients.removeIf(client -> clientIdentify.equals(client.getJdbcRegistryClientIdentify()));
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());

doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId()));
doPurgeJdbcRegistryClientInDB(Lists.newArrayList(clientIdentify.getClientId()));
}

@Override
Expand Down

0 comments on commit f57acab

Please sign in to comment.