Skip to content

Commit

Permalink
Retry connect to provider server by catching ClosedChannelException (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zrlw authored Mar 12, 2025
1 parent 5f04865 commit 243da4d
Showing 1 changed file with 89 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.AbstractClient;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -86,79 +88,100 @@ public ChannelPipeline getPipeline() {
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
InetSocketAddress connectAddress = getConnectAddress();
ChannelFuture future = bootstrap.connect(connectAddress);
long connectTimeout = getConnectTimeout();
long deadline = start + connectTimeout;
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel "
+ newChannel);
while (true) {
boolean ret = future.awaitUninterruptibly(connectTimeout, TimeUnit.MILLISECONDS);

if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// copy reference
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel
+ " on create new netty channel " + newChannel);
}
// Close old channel
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info(
"Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
} else {
NettyClient.this.channel = newChannel;
}
}
break;
} else if (future.getCause() != null) {
Throwable cause = future.getCause();

if (cause instanceof ClosedChannelException) {
// Netty3.2.10 ClosedChannelException issue, see https://github.com/netty/netty/issues/138
connectTimeout = deadline - System.currentTimeMillis();
if (connectTimeout > 0) {
// 6-1 - Retry connect to provider server by Netty3.2.10 ClosedChannelException issue#138.
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"Netty3.2.10 ClosedChannelException issue#138",
"",
"Retry connect to provider server.");
future = bootstrap.connect(connectAddress);
continue;
}
} else {
NettyClient.this.channel = newChannel;
}
RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ ", error message is:" + cause.getMessage(),
cause);

// 6-1 - Failed to connect to provider server by other reason.
logger.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
"Failed to connect to provider server by other reason.",
cause);

throw remotingException;
} else {

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ " client-side timeout " + getConnectTimeout() + "ms (elapsed: "
+ (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());

// 6-2 - Client-side timeout.
logger.error(
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
"provider crash",
"",
"Client-side timeout.",
remotingException);

throw remotingException;
}
} else if (future.getCause() != null) {
Throwable cause = future.getCause();

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress()
+ ", error message is:" + cause.getMessage(),
cause);

// 6-1 - Failed to connect to provider server by other reason.
logger.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
"Failed to connect to provider server by other reason.",
cause);

throw remotingException;
} else {

RemotingException remotingException = new RemotingException(
this,
"client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());

// 6-2 - Client-side timeout.
logger.error(
TRANSPORT_CLIENT_CONNECT_TIMEOUT,
"provider crash",
"",
"Client-side timeout.",
remotingException);

throw remotingException;
}
} finally {
if (!isConnected()) {
Expand Down

0 comments on commit 243da4d

Please sign in to comment.