Skip to content

Commit

Permalink
[Chore] Fix JdbcRegistryTestCase might failed due to purge dead clien…
Browse files Browse the repository at this point in the history
…ts interval is too small
  • Loading branch information
ruanwenjun committed Dec 12, 2024
1 parent 566651c commit bda16b4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand All @@ -48,7 +49,7 @@
@Slf4j
class NettyRemotingServer {

private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private Channel serverBootstrapChannel;

@Getter
private final String serverName;
Expand Down Expand Up @@ -87,7 +88,7 @@ class NettyRemotingServer {

void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(this.bossGroup, this.workGroup)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
Expand All @@ -104,23 +105,22 @@ protected void initChannel(SocketChannel ch) {
}
});

ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
final ChannelFuture channelFuture = serverBootstrap.bind(serverConfig.getListenPort()).sync();
if (channelFuture.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
this.serverBootstrapChannel = channelFuture.channel();
} else {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(),
serverConfig.getListenPort()),
channelFuture.cause());
}
} catch (Exception e) {
throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
e);
}

if (future.isSuccess()) {
log.info("{} bind success at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
return;
}

throw new RemoteException(
String.format("%s bind %s fail", serverConfig.getServerName(), serverConfig.getListenPort()),
future.cause());
}
}

Expand Down Expand Up @@ -151,11 +151,15 @@ void close() {
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
methodInvokerExecutor.shutdown();
if (serverBootstrapChannel != null) {
serverBootstrapChannel.close().sync();
log.info("{} stop bind at port: {}", serverConfig.getServerName(), serverConfig.getListenPort());
}
methodInvokerExecutor.shutdownNow();
} catch (Exception ex) {
log.error("netty server close exception", ex);
log.error("{} close exception", serverConfig.getServerName(), ex);
}
log.info("netty server closed");
log.info("{} closed", serverConfig.getServerName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.registry.api;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;

Expand Down Expand Up @@ -109,4 +110,7 @@ public interface Registry extends Closeable {
* Release the lock of the prefix {@param key}
*/
boolean releaseLock(String key);

@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.dolphinscheduler.registry.api.SubscribeListener;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -172,8 +173,8 @@ public void testChildren() {
registry.put(master1, value, true);
registry.put(master2, value, true);
assertThat(registry.children("/nodes/children")).containsExactly("childGroup1");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactly("127.0.0.1:8080",
"127.0.0.2:8080");
assertThat(registry.children("/nodes/children/childGroup1")).containsExactlyElementsIn(
Arrays.asList("127.0.0.1:8080", "127.0.0.2:8080"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public final class JdbcRegistry implements Registry {
JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryProperties, jdbcRegistryServer);
log.info("Initialize Jdbc Registry...");
this.jdbcRegistryClient = new JdbcRegistryClient(jdbcRegistryServer);
log.info("Initialized Jdbc Registry...");
}

@Override
Expand Down Expand Up @@ -261,8 +261,9 @@ public boolean releaseLock(String key) {
public void close() {
log.info("Closing Jdbc Registry...");
// remove the current Ephemeral node, if can connect to jdbc
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();
try (JdbcRegistryClient closed1 = jdbcRegistryClient) {
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow();

} catch (Exception e) {
log.error("Close Jdbc Registry error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO;
import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener;
Expand All @@ -41,14 +40,11 @@ public class JdbcRegistryClient implements IJdbcRegistryClient {

private static final String DEFAULT_CLIENT_NAME = NetUtils.getHost() + "_" + OSUtils.getProcessID();

private final JdbcRegistryProperties jdbcRegistryProperties;

private final JdbcRegistryClientIdentify jdbcRegistryClientIdentify;

private final IJdbcRegistryServer jdbcRegistryServer;

public JdbcRegistryClient(JdbcRegistryProperties jdbcRegistryProperties, IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryProperties = jdbcRegistryProperties;
public JdbcRegistryClient(IJdbcRegistryServer jdbcRegistryServer) {
this.jdbcRegistryServer = jdbcRegistryServer;
this.jdbcRegistryClientIdentify =
new JdbcRegistryClientIdentify(CodeGenerateUtils.genCode(), DEFAULT_CLIENT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public void start() {
purgeInvalidJdbcRegistryMetadata();
JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay(
this::purgeInvalidJdbcRegistryMetadata,
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
jdbcRegistryProperties.getSessionTimeout().toMillis(),
TimeUnit.MILLISECONDS);
jdbcRegistryDataManager.start();
jdbcRegistryServerState = JdbcRegistryServerState.STARTED;
Expand Down Expand Up @@ -149,12 +149,14 @@ public void registerClient(IJdbcRegistryClient jdbcRegistryClient) {
@Override
public void deregisterClient(IJdbcRegistryClient jdbcRegistryClient) {
checkNotNull(jdbcRegistryClient);
jdbcRegistryClients.remove(jdbcRegistryClient);
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());

JdbcRegistryClientIdentify jdbcRegistryClientIdentify = jdbcRegistryClient.getJdbcRegistryClientIdentify();
final JdbcRegistryClientIdentify jdbcRegistryClientIdentify =
jdbcRegistryClient.getJdbcRegistryClientIdentify();
checkNotNull(jdbcRegistryClientIdentify);

jdbcRegistryClients.removeIf(iJdbcRegistryClient -> jdbcRegistryClientIdentify
.equals(iJdbcRegistryClient.getJdbcRegistryClientIdentify()));
jdbcRegistryClientDTOMap.remove(jdbcRegistryClient.getJdbcRegistryClientIdentify());

doPurgeJdbcRegistryClientInDB(Lists.newArrayList(jdbcRegistryClientIdentify.getClientId()));
}

Expand Down

0 comments on commit bda16b4

Please sign in to comment.